зеркало из https://github.com/microsoft/spark.git
Synchronization bug fix in broadcast implementations
This commit is contained in:
Родитель
3883532545
Коммит
31ffe8d528
|
@ -307,9 +307,11 @@ extends Broadcast[T] with Logging with Serializable {
|
|||
var threadPool = Utils.newDaemonFixedThreadPool(MultiTracker.MaxChatSlots)
|
||||
|
||||
while (hasBlocks.get < totalBlocks) {
|
||||
var numThreadsToCreate =
|
||||
math.min(listOfSources.size, MultiTracker.MaxChatSlots) -
|
||||
var numThreadsToCreate = 0
|
||||
listOfSources.synchronized {
|
||||
numThreadsToCreate = math.min(listOfSources.size, MultiTracker.MaxChatSlots) -
|
||||
threadPool.getActiveCount
|
||||
}
|
||||
|
||||
while (hasBlocks.get < totalBlocks && numThreadsToCreate > 0) {
|
||||
var peerToTalkTo = pickPeerToTalkToRandom
|
||||
|
@ -732,10 +734,14 @@ extends Broadcast[T] with Logging with Serializable {
|
|||
// Stop broadcast if at least one worker has connected and
|
||||
// everyone connected so far are done. Comparing with
|
||||
// listOfSources.size - 1, because it includes the Guide itself
|
||||
if (listOfSources.size > 1 &&
|
||||
setOfCompletedSources.size == listOfSources.size - 1) {
|
||||
stopBroadcast = true
|
||||
logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
|
||||
listOfSources.synchronized {
|
||||
setOfCompletedSources.synchronized {
|
||||
if (listOfSources.size > 1 &&
|
||||
setOfCompletedSources.size == listOfSources.size - 1) {
|
||||
stopBroadcast = true
|
||||
logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -290,15 +290,17 @@ extends Broadcast[T] with Logging with Serializable {
|
|||
clientSocket = serverSocket.accept
|
||||
} catch {
|
||||
case e: Exception => {
|
||||
logError("GuideMultipleRequests Timeout.")
|
||||
|
||||
// Stop broadcast if at least one worker has connected and
|
||||
// everyone connected so far are done.
|
||||
// Comparing with listOfSources.size - 1, because the Guide itself
|
||||
// is included
|
||||
if (listOfSources.size > 1 &&
|
||||
setOfCompletedSources.size == listOfSources.size - 1) {
|
||||
stopBroadcast = true
|
||||
// everyone connected so far are done. Comparing with
|
||||
// listOfSources.size - 1, because it includes the Guide itself
|
||||
listOfSources.synchronized {
|
||||
setOfCompletedSources.synchronized {
|
||||
if (listOfSources.size > 1 &&
|
||||
setOfCompletedSources.size == listOfSources.size - 1) {
|
||||
stopBroadcast = true
|
||||
logInfo("GuideMultipleRequests Timeout. stopBroadcast == true.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -490,7 +492,7 @@ extends Broadcast[T] with Logging with Serializable {
|
|||
serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout)
|
||||
clientSocket = serverSocket.accept
|
||||
} catch {
|
||||
case e: Exception => logError("ServeMultipleRequests Timeout.")
|
||||
case e: Exception => { }
|
||||
}
|
||||
|
||||
if (clientSocket != null) {
|
||||
|
|
Загрузка…
Ссылка в новой задаче