зеркало из https://github.com/microsoft/spark.git
Peers now gossip about their neighbors when they talk.
This commit is contained in:
Родитель
8034728afc
Коммит
520bbdc7e3
|
@ -548,14 +548,24 @@ extends Broadcast[T] with Logging {
|
|||
var newPeerToTalkTo = oisSource.readObject.asInstanceOf[SourceInfo]
|
||||
// Update listOfSources
|
||||
addToListOfSources (newPeerToTalkTo)
|
||||
|
||||
|
||||
// Turn the timer OFF, if the sender responds before timeout
|
||||
timeOutTimer.cancel
|
||||
|
||||
// Receive gossiped listOfSources and add to local list
|
||||
var gossipedLOS = oisSource.readObject.asInstanceOf[ListBuffer[SourceInfo]]
|
||||
addToListOfSources (gossipedLOS)
|
||||
|
||||
// Send the latest SourceInfo
|
||||
oosSource.writeObject(getLocalSourceInfo)
|
||||
oosSource.flush
|
||||
|
||||
// Send gossiped listOfSources
|
||||
listOfSources.synchronized {
|
||||
oosSource.writeObject(listOfSources)
|
||||
}
|
||||
oosSource.flush
|
||||
|
||||
var keepReceiving = true
|
||||
|
||||
while (hasBlocks < totalBlocks && keepReceiving) {
|
||||
|
@ -1017,15 +1027,24 @@ extends Broadcast[T] with Logging {
|
|||
oos.writeObject(getLocalSourceInfo)
|
||||
oos.flush
|
||||
|
||||
// Gossip local listOfSources
|
||||
listOfSources.synchronized {
|
||||
oos.writeObject(listOfSources)
|
||||
}
|
||||
oos.flush
|
||||
|
||||
// Receive latest SourceInfo from the receiver
|
||||
var rxSourceInfo = ois.readObject.asInstanceOf[SourceInfo]
|
||||
// logInfo("rxSourceInfo: " + rxSourceInfo + " with " + rxSourceInfo.hasBlocksBitVector)
|
||||
|
||||
if (rxSourceInfo.listenPort == SourceInfo.StopBroadcast) {
|
||||
stopBroadcast = true
|
||||
} else {
|
||||
// Carry on
|
||||
addToListOfSources (rxSourceInfo)
|
||||
|
||||
// Received gossiped listOfSources
|
||||
var gossipedLOS = ois.readObject.asInstanceOf[ListBuffer[SourceInfo]]
|
||||
addToListOfSources (gossipedLOS)
|
||||
}
|
||||
|
||||
val startTime = System.currentTimeMillis
|
||||
|
|
Загрузка…
Ссылка в новой задаче