From c2da64409a7059ea6a1560dfc517d137f4fe88eb Mon Sep 17 00:00:00 2001 From: root Date: Thu, 6 Sep 2012 23:16:26 +0000 Subject: [PATCH] Randomize the order of block fetches in getMultiple --- core/src/main/scala/spark/storage/BlockManager.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 11a1cbb73e..cb7b0c8bc1 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -316,7 +316,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // be requested initially and ones that will be added to a queue of blocks to request. val initialRequestBlocks = new HashMap[BlockManagerId, ArrayBuffer[BlockMessage]]() var initialRequests = 0 - for ((address, blockIds) <- blocksByAddress) { + val blocksToGetLater = new ArrayBuffer[(BlockManagerId, BlockMessage)] + for ((address, blockIds) <- Utils.randomize(blocksByAddress)) { if (address == blockManagerId) { localBlockIds ++= blockIds } else { @@ -328,11 +329,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m .append(blockMessage) initialRequests += 1 } else { - blocksToRequest.enqueue((address, blockMessage)) + blocksToGetLater.append((address, blockMessage)) } } } } + // Add the remaining blocks into a queue to pull later in a random order + blocksToRequest ++= Utils.randomize(blocksToGetLater) // Send out initial request(s) for 'numParallelFetches' blocks. for ((bmId, blockMessages) <- initialRequestBlocks) {