зеркало из https://github.com/microsoft/spark.git
Avoid acquiring locks in BlockManager when fetching shuffle outputs
This commit is contained in:
Родитель
0bc63f7ef1
Коммит
975009d688
|
@ -196,6 +196,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
*/
|
||||
def getLocal(blockId: String): Option[Iterator[Any]] = {
|
||||
logDebug("Getting local block " + blockId)
|
||||
|
||||
// As an optimization for map output fetches, if the block is for a shuffle, return it
|
||||
// without acquiring a lock; the disk store never deletes (recent) items so this should work
|
||||
if (blockId.startsWith("shuffle_")) {
|
||||
return diskStore.getValues(blockId) match {
|
||||
case Some(iterator) =>
|
||||
Some(iterator)
|
||||
case None =>
|
||||
throw new Exception("Block " + blockId + " not found on disk, though it should be")
|
||||
}
|
||||
}
|
||||
|
||||
locker.getLock(blockId).synchronized {
|
||||
val info = blockInfo.get(blockId)
|
||||
if (info != null) {
|
||||
|
@ -266,6 +278,18 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
def getLocalBytes(blockId: String): Option[ByteBuffer] = {
|
||||
// TODO: This whole thing is very similar to getLocal; we need to refactor it somehow
|
||||
logDebug("Getting local block " + blockId + " as bytes")
|
||||
|
||||
// As an optimization for map output fetches, if the block is for a shuffle, return it
|
||||
// without acquiring a lock; the disk store never deletes (recent) items so this should work
|
||||
if (blockId.startsWith("shuffle_")) {
|
||||
return diskStore.getBytes(blockId) match {
|
||||
case Some(bytes) =>
|
||||
Some(bytes)
|
||||
case None =>
|
||||
throw new Exception("Block " + blockId + " not found on disk, though it should be")
|
||||
}
|
||||
}
|
||||
|
||||
locker.getLock(blockId).synchronized {
|
||||
val info = blockInfo.get(blockId)
|
||||
if (info != null) {
|
||||
|
|
Загрузка…
Ссылка в новой задаче