зеркало из https://github.com/microsoft/spark.git
Merge pull request #233 from rxin/dev
Fixed #232: DirectBuffer's cleaner was empty and Spark tried to invoke clean on it.
This commit is contained in:
Коммит
8981804c71
|
@ -49,7 +49,7 @@ extends Exception(message)
|
|||
|
||||
class BlockLocker(numLockers: Int) {
|
||||
private val hashLocker = Array.fill(numLockers)(new Object())
|
||||
|
||||
|
||||
def getLock(blockId: String): Object = {
|
||||
return hashLocker(math.abs(blockId.hashCode % numLockers))
|
||||
}
|
||||
|
@ -68,13 +68,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
|
||||
private[storage] val diskStore: BlockStore = new DiskStore(this,
|
||||
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
|
||||
|
||||
|
||||
val connectionManager = new ConnectionManager(0)
|
||||
implicit val futureExecContext = connectionManager.futureExecContext
|
||||
|
||||
|
||||
val connectionManagerId = connectionManager.id
|
||||
val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port)
|
||||
|
||||
|
||||
// TODO: This will be removed after cacheTracker is removed from the code base.
|
||||
var cacheTracker: CacheTracker = null
|
||||
|
||||
|
@ -123,7 +123,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
if (level == null) {
|
||||
throw new IllegalArgumentException("Storage level is null")
|
||||
}
|
||||
|
||||
|
||||
// If there was earlier info about the block, then use earlier tellMaster
|
||||
val oldInfo = blockInfo.get(blockId)
|
||||
val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster
|
||||
|
@ -134,12 +134,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
// If level is valid, store the block info, else remove the block info
|
||||
if (level.isValid) {
|
||||
blockInfo.put(blockId, new BlockInfo(level, newTellMaster))
|
||||
logDebug("Info for block " + blockId + " updated with new level as " + level)
|
||||
logDebug("Info for block " + blockId + " updated with new level as " + level)
|
||||
} else {
|
||||
blockInfo.remove(blockId)
|
||||
logDebug("Info for block " + blockId + " removed as new level is null or invalid")
|
||||
logDebug("Info for block " + blockId + " removed as new level is null or invalid")
|
||||
}
|
||||
|
||||
|
||||
// Tell master if necessary
|
||||
if (newTellMaster) {
|
||||
master.mustHeartBeat(HeartBeat(
|
||||
|
@ -182,11 +182,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
def getLocal(blockId: String): Option[Iterator[Any]] = {
|
||||
logDebug("Getting local block " + blockId)
|
||||
locker.getLock(blockId).synchronized {
|
||||
// Check storage level of block
|
||||
// Check storage level of block
|
||||
val level = getLevel(blockId)
|
||||
if (level != null) {
|
||||
logDebug("Level for block " + blockId + " is " + level + " on local machine")
|
||||
|
||||
|
||||
// Look for the block in memory
|
||||
if (level.useMemory) {
|
||||
logDebug("Getting block " + blockId + " from memory")
|
||||
|
@ -218,8 +218,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
} else {
|
||||
logDebug("Block " + blockId + " not registered locally")
|
||||
}
|
||||
}
|
||||
return None
|
||||
}
|
||||
return None
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -433,7 +433,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
throw new IllegalArgumentException("Storage level is null or invalid")
|
||||
}
|
||||
|
||||
val startTimeMs = System.currentTimeMillis
|
||||
val startTimeMs = System.currentTimeMillis
|
||||
var bytes: ByteBuffer = null
|
||||
|
||||
// If we need to replicate the data, we'll want access to the values, but because our
|
||||
|
@ -441,21 +441,21 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
// the put serializes data, we'll remember the bytes, above; but for the case where
|
||||
// it doesn't, such as MEMORY_ONLY_DESER, let's rely on the put returning an Iterator.
|
||||
var valuesAfterPut: Iterator[Any] = null
|
||||
|
||||
|
||||
locker.getLock(blockId).synchronized {
|
||||
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
|
||||
+ " to get into synchronized block")
|
||||
|
||||
// Check and warn if block with same id already exists
|
||||
|
||||
// Check and warn if block with same id already exists
|
||||
if (getLevel(blockId) != null) {
|
||||
logWarning("Block " + blockId + " already exists in local machine")
|
||||
return
|
||||
}
|
||||
|
||||
if (level.useMemory && level.useDisk) {
|
||||
// If saving to both memory and disk, then serialize only once
|
||||
// If saving to both memory and disk, then serialize only once
|
||||
memoryStore.putValues(blockId, values, level, true) match {
|
||||
case Left(newValues) =>
|
||||
case Left(newValues) =>
|
||||
diskStore.putValues(blockId, newValues, level, true) match {
|
||||
case Right(newBytes) => bytes = newBytes
|
||||
case _ => throw new Exception("Unexpected return value")
|
||||
|
@ -465,7 +465,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
diskStore.putBytes(blockId, newBytes, level)
|
||||
}
|
||||
} else if (level.useMemory) {
|
||||
// If only save to memory
|
||||
// If only save to memory
|
||||
memoryStore.putValues(blockId, values, level, true) match {
|
||||
case Right(newBytes) => bytes = newBytes
|
||||
case Left(newIterator) => valuesAfterPut = newIterator
|
||||
|
@ -484,7 +484,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
}
|
||||
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
|
||||
|
||||
// Replicate block if required
|
||||
// Replicate block if required
|
||||
if (level.replication > 1) {
|
||||
// Serialize the block if not already done
|
||||
if (bytes == null) {
|
||||
|
@ -494,7 +494,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
}
|
||||
bytes = dataSerialize(valuesAfterPut)
|
||||
}
|
||||
replicate(blockId, bytes, level)
|
||||
replicate(blockId, bytes, level)
|
||||
}
|
||||
|
||||
BlockManager.dispose(bytes)
|
||||
|
@ -522,10 +522,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
if (level == null || !level.isValid) {
|
||||
throw new IllegalArgumentException("Storage level is null or invalid")
|
||||
}
|
||||
|
||||
val startTimeMs = System.currentTimeMillis
|
||||
|
||||
// Initiate the replication before storing it locally. This is faster as
|
||||
|
||||
val startTimeMs = System.currentTimeMillis
|
||||
|
||||
// Initiate the replication before storing it locally. This is faster as
|
||||
// data is already serialized and ready for sending
|
||||
val replicationFuture = if (level.replication > 1) {
|
||||
val bufferView = bytes.duplicate() // Doesn't copy the bytes, just creates a wrapper
|
||||
|
@ -561,7 +561,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
if (blockId.startsWith("rdd")) {
|
||||
notifyTheCacheTracker(blockId)
|
||||
}
|
||||
|
||||
|
||||
// If replication had started, then wait for it to finish
|
||||
if (level.replication > 1) {
|
||||
if (replicationFuture == null) {
|
||||
|
@ -571,10 +571,10 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
}
|
||||
|
||||
if (level.replication > 1) {
|
||||
logDebug("PutBytes for block " + blockId + " with replication took " +
|
||||
logDebug("PutBytes for block " + blockId + " with replication took " +
|
||||
Utils.getUsedTimeMs(startTimeMs))
|
||||
} else {
|
||||
logDebug("PutBytes for block " + blockId + " without replication took " +
|
||||
logDebug("PutBytes for block " + blockId + " without replication took " +
|
||||
Utils.getUsedTimeMs(startTimeMs))
|
||||
}
|
||||
}
|
||||
|
@ -588,7 +588,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
|||
new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
|
||||
if (cachedPeers == null) {
|
||||
cachedPeers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1))
|
||||
}
|
||||
}
|
||||
for (peer: BlockManagerId <- cachedPeers) {
|
||||
val start = System.nanoTime
|
||||
data.rewind()
|
||||
|
@ -708,7 +708,9 @@ object BlockManager extends Logging {
|
|||
def dispose(buffer: ByteBuffer) {
|
||||
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
|
||||
logDebug("Unmapping " + buffer)
|
||||
buffer.asInstanceOf[DirectBuffer].cleaner().clean()
|
||||
if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
|
||||
buffer.asInstanceOf[DirectBuffer].cleaner().clean()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче