зеркало из https://github.com/microsoft/spark.git
Randomize the order of block fetches in getMultiple
This commit is contained in:
Родитель
53a5681c8a
Коммит
c2da64409a
|
@ -316,7 +316,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
// be requested initially and ones that will be added to a queue of blocks to request.
|
// be requested initially and ones that will be added to a queue of blocks to request.
|
||||||
val initialRequestBlocks = new HashMap[BlockManagerId, ArrayBuffer[BlockMessage]]()
|
val initialRequestBlocks = new HashMap[BlockManagerId, ArrayBuffer[BlockMessage]]()
|
||||||
var initialRequests = 0
|
var initialRequests = 0
|
||||||
for ((address, blockIds) <- blocksByAddress) {
|
val blocksToGetLater = new ArrayBuffer[(BlockManagerId, BlockMessage)]
|
||||||
|
for ((address, blockIds) <- Utils.randomize(blocksByAddress)) {
|
||||||
if (address == blockManagerId) {
|
if (address == blockManagerId) {
|
||||||
localBlockIds ++= blockIds
|
localBlockIds ++= blockIds
|
||||||
} else {
|
} else {
|
||||||
|
@ -328,11 +329,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
.append(blockMessage)
|
.append(blockMessage)
|
||||||
initialRequests += 1
|
initialRequests += 1
|
||||||
} else {
|
} else {
|
||||||
blocksToRequest.enqueue((address, blockMessage))
|
blocksToGetLater.append((address, blockMessage))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Add the remaining blocks into a queue to pull later in a random order
|
||||||
|
blocksToRequest ++= Utils.randomize(blocksToGetLater)
|
||||||
|
|
||||||
// Send out initial request(s) for 'numParallelFetches' blocks.
|
// Send out initial request(s) for 'numParallelFetches' blocks.
|
||||||
for ((bmId, blockMessages) <- initialRequestBlocks) {
|
for ((bmId, blockMessages) <- initialRequestBlocks) {
|
||||||
|
|
Загрузка…
Ссылка в новой задаче