Merge branch 'dev' of github.com:mesos/spark into dev

This commit is contained in:
Matei Zaharia 2012-06-15 23:36:30 -07:00
Родитель 8b0bd5dd8d 5f54bdf98b
Коммит b5cf47cda3
12 изменённых файлов: 278 добавлений и 138 удалений

Просмотреть файл

@ -38,11 +38,13 @@ import spark.broadcast._
import spark.partial.ApproximateEvaluator import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult import spark.partial.PartialResult
import spark.scheduler.ShuffleMapTask
import spark.scheduler.DAGScheduler import spark.scheduler.DAGScheduler
import spark.scheduler.TaskScheduler import spark.scheduler.TaskScheduler
import spark.scheduler.local.LocalScheduler import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.MesosScheduler import spark.scheduler.mesos.MesosScheduler
import spark.scheduler.mesos.CoarseMesosScheduler import spark.scheduler.mesos.CoarseMesosScheduler
import spark.storage.BlockManagerMaster
class SparkContext( class SparkContext(
master: String, master: String,
@ -258,6 +260,7 @@ class SparkContext(
// Stop the SparkContext // Stop the SparkContext
def stop() { def stop() {
remote.shutdownServerModule()
dagScheduler.stop() dagScheduler.stop()
dagScheduler = null dagScheduler = null
taskScheduler = null taskScheduler = null
@ -266,8 +269,11 @@ class SparkContext(
env.cacheTracker.stop() env.cacheTracker.stop()
env.shuffleFetcher.stop() env.shuffleFetcher.stop()
env.shuffleManager.stop() env.shuffleManager.stop()
env.blockManager.stop()
BlockManagerMaster.stopBlockManagerMaster()
env.connectionManager.stop() env.connectionManager.stop()
SparkEnv.set(null) SparkEnv.set(null)
ShuffleMapTask.clearCache()
} }
// Wait for the scheduler to be registered with the cluster manager // Wait for the scheduler to be registered with the cluster manager

Просмотреть файл

@ -36,7 +36,6 @@ class ConnectionManager(port: Int) extends Logging {
} }
val selector = SelectorProvider.provider.openSelector() val selector = SelectorProvider.provider.openSelector()
/*val handleMessageExecutor = new ThreadPoolExecutor(4, 4, 600, TimeUnit.SECONDS, new LinkedBlockingQueue()) */
val handleMessageExecutor = Executors.newFixedThreadPool(4) val handleMessageExecutor = Executors.newFixedThreadPool(4)
val serverChannel = ServerSocketChannel.open() val serverChannel = ServerSocketChannel.open()
val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection] val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
@ -59,7 +58,7 @@ class ConnectionManager(port: Int) extends Logging {
logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id) logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id)
val thisInstance = this val thisInstance = this
var selectorThread = new Thread("connection-manager-thread") { val selectorThread = new Thread("connection-manager-thread") {
override def run() { override def run() {
thisInstance.run() thisInstance.run()
} }
@ -331,6 +330,7 @@ class ConnectionManager(port: Int) extends Logging {
} }
def stop() { def stop() {
if (!selectorThread.isAlive) {
selectorThread.interrupt() selectorThread.interrupt()
selectorThread.join() selectorThread.join()
selector.close() selector.close()
@ -343,6 +343,7 @@ class ConnectionManager(port: Int) extends Logging {
logInfo("ConnectionManager stopped") logInfo("ConnectionManager stopped")
} }
} }
}
object ConnectionManager { object ConnectionManager {

Просмотреть файл

@ -190,6 +190,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
allowLocal: Boolean) allowLocal: Boolean)
(implicit m: ClassManifest[U]): Array[U] = (implicit m: ClassManifest[U]): Array[U] =
{ {
if (partitions.size == 0) {
return new Array[U](0)
}
val waiter = new JobWaiter(partitions.size) val waiter = new JobWaiter(partitions.size)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, waiter)) eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, waiter))

Просмотреть файл

@ -56,6 +56,13 @@ object ShuffleMapTask {
} }
} }
} }
def clearCache() {
synchronized {
serializedInfoCache.clear()
deserializedInfoCache.clear()
}
}
} }
class ShuffleMapTask( class ShuffleMapTask(

Просмотреть файл

@ -27,6 +27,7 @@ import spark.SizeEstimator
import spark.SparkEnv import spark.SparkEnv
import spark.SparkException import spark.SparkException
import spark.Utils import spark.Utils
import spark.util.ByteBufferInputStream
import spark.network._ import spark.network._
class BlockManagerId(var ip: String, var port: Int) extends Externalizable { class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
@ -65,19 +66,15 @@ class BlockLocker(numLockers: Int) {
} }
/**
* A start towards a block manager class. This will eventually be used for both RDD persistence
* and shuffle outputs.
*
* TODO: Should make the communication with Master or Peers code more robust and log friendly.
*/
class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging { class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging {
case class BlockInfo(level: StorageLevel, tellMaster: Boolean)
private val NUM_LOCKS = 337 private val NUM_LOCKS = 337
private val locker = new BlockLocker(NUM_LOCKS) private val locker = new BlockLocker(NUM_LOCKS)
private val storageLevels = Collections.synchronizedMap(new JHashMap[String, StorageLevel]) private val blockInfo = Collections.synchronizedMap(new JHashMap[String, BlockInfo])
private val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private val diskStore: BlockStore = new DiskStore(this, private val diskStore: BlockStore = new DiskStore(this,
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
@ -87,7 +84,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
val connectionManagerId = connectionManager.id val connectionManagerId = connectionManager.id
val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port) val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port)
// TODO(Haoyuan): This will be removed after cacheTracker is removed from the code base. // TODO: This will be removed after cacheTracker is removed from the code base.
var cacheTracker: CacheTracker = null var cacheTracker: CacheTracker = null
initLogging() initLogging()
@ -104,12 +101,54 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
* Initialize the BlockManager. Register to the BlockManagerMaster, and start the * Initialize the BlockManager. Register to the BlockManagerMaster, and start the
* BlockManagerWorker actor. * BlockManagerWorker actor.
*/ */
def initialize() { private def initialize() {
BlockManagerMaster.mustRegisterBlockManager( BlockManagerMaster.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory, maxMemory)) RegisterBlockManager(blockManagerId, maxMemory, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this) BlockManagerWorker.startBlockManagerWorker(this)
} }
/**
* Get storage level of local block. If no info exists for the block, then returns null.
*/
def getLevel(blockId: String): StorageLevel = {
val info = blockInfo.get(blockId)
if (info != null) info.level else null
}
/**
* Change storage level for a local block and tell master is necesary.
* If new level is invalid, then block info (if it exists) will be silently removed.
*/
def setLevel(blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
if (level == null) {
throw new IllegalArgumentException("Storage level is null")
}
// If there was earlier info about the block, then use earlier tellMaster
val oldInfo = blockInfo.get(blockId)
val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster
if (oldInfo != null && oldInfo.tellMaster != tellMaster) {
logWarning("Ignoring tellMaster setting as it is different from earlier setting")
}
// If level is valid, store the block info, else remove the block info
if (level.isValid) {
blockInfo.put(blockId, new BlockInfo(level, newTellMaster))
logDebug("Info for block " + blockId + " updated with new level as " + level)
} else {
blockInfo.remove(blockId)
logDebug("Info for block " + blockId + " removed as new level is null or invalid")
}
// Tell master if necessary
if (newTellMaster) {
logDebug("Told master about block " + blockId)
notifyMaster(HeartBeat(blockManagerId, blockId, level, 0, 0))
} else {
logDebug("Did not tell master about block " + blockId)
}
}
/** /**
* Get locations of the block. * Get locations of the block.
*/ */
@ -122,9 +161,9 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
} }
/** /**
* Get locations of an array of blocks * Get locations of an array of blocks.
*/ */
def getLocationsMultipleBlockIds(blockIds: Array[String]): Array[Seq[String]] = { def getLocations(blockIds: Array[String]): Array[Seq[String]] = {
val startTimeMs = System.currentTimeMillis val startTimeMs = System.currentTimeMillis
val locations = BlockManagerMaster.mustGetLocationsMultipleBlockIds( val locations = BlockManagerMaster.mustGetLocationsMultipleBlockIds(
GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray
@ -132,12 +171,18 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
return locations return locations
} }
/**
* Get block from local block manager.
*/
def getLocal(blockId: String): Option[Iterator[Any]] = { def getLocal(blockId: String): Option[Iterator[Any]] = {
logDebug("Getting block " + blockId) if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
logDebug("Getting local block " + blockId)
locker.getLock(blockId).synchronized { locker.getLock(blockId).synchronized {
// Check storage level of block // Check storage level of block
val level = storageLevels.get(blockId) val level = getLevel(blockId)
if (level != null) { if (level != null) {
logDebug("Level for block " + blockId + " is " + level + " on local machine") logDebug("Level for block " + blockId + " is " + level + " on local machine")
@ -181,12 +226,20 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
return None return None
} }
/**
* Get block from remote block managers.
*/
def getRemote(blockId: String): Option[Iterator[Any]] = { def getRemote(blockId: String): Option[Iterator[Any]] = {
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
logDebug("Getting remote block " + blockId)
// Get locations of block // Get locations of block
val locations = BlockManagerMaster.mustGetLocations(GetLocations(blockId)) val locations = BlockManagerMaster.mustGetLocations(GetLocations(blockId))
// Get block from remote locations // Get block from remote locations
for (loc <- locations) { for (loc <- locations) {
logDebug("Getting remote block " + blockId + " from " + loc)
val data = BlockManagerWorker.syncGetBlock( val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port)) GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port))
if (data != null) { if (data != null) {
@ -200,16 +253,19 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
} }
/** /**
* Read a block from the block manager. * Get a block from the block manager (either local or remote).
*/ */
def get(blockId: String): Option[Iterator[Any]] = { def get(blockId: String): Option[Iterator[Any]] = {
getLocal(blockId).orElse(getRemote(blockId)) getLocal(blockId).orElse(getRemote(blockId))
} }
/** /**
* Read many blocks from block manager using their BlockManagerIds. * Get many blocks from local and remote block manager using their BlockManagerIds.
*/ */
def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): HashMap[String, Option[Iterator[Any]]] = { def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): HashMap[String, Option[Iterator[Any]]] = {
if (blocksByAddress == null) {
throw new IllegalArgumentException("BlocksByAddress is null")
}
logDebug("Getting " + blocksByAddress.map(_._2.size).sum + " blocks") logDebug("Getting " + blocksByAddress.map(_._2.size).sum + " blocks")
var startTime = System.currentTimeMillis var startTime = System.currentTimeMillis
val blocks = new HashMap[String,Option[Iterator[Any]]]() val blocks = new HashMap[String,Option[Iterator[Any]]]()
@ -235,7 +291,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage) val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
(cmId, future) (cmId, future)
} }
logDebug("Started remote gets for " + remoteBlockIds.size + " blocks in " + Utils.getUsedTimeMs(startTime) + " ms") logDebug("Started remote gets for " + remoteBlockIds.size + " blocks in " +
Utils.getUsedTimeMs(startTime) + " ms")
// Get the local blocks while remote blocks are being fetched // Get the local blocks while remote blocks are being fetched
startTime = System.currentTimeMillis startTime = System.currentTimeMillis
@ -276,7 +333,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
throw new BlockException(oneBlockId, "Could not get blocks from " + cmId) throw new BlockException(oneBlockId, "Could not get blocks from " + cmId)
} }
} }
logDebug("Got remote " + count + " blocks from " + cmId.host + " in " + Utils.getUsedTimeMs(startTime) + " ms") logDebug("Got remote " + count + " blocks from " + cmId.host + " in " +
Utils.getUsedTimeMs(startTime) + " ms")
} }
logDebug("Got all blocks in " + Utils.getUsedTimeMs(startTime) + " ms") logDebug("Got all blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
@ -284,29 +342,32 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
} }
/** /**
* Write a new block to the block manager. * Put a new block of values to the block manager.
*/ */
def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true) { def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true) {
if (!level.useDisk && !level.useMemory) { if (blockId == null) {
throw new IllegalArgumentException("Storage level has neither useMemory nor useDisk set") throw new IllegalArgumentException("Block Id is null")
}
if (values == null) {
throw new IllegalArgumentException("Values is null")
}
if (level == null || !level.isValid) {
throw new IllegalArgumentException("Storage level is null or invalid")
} }
val startTimeMs = System.currentTimeMillis val startTimeMs = System.currentTimeMillis
var bytes: ByteBuffer = null var bytes: ByteBuffer = null
locker.getLock(blockId).synchronized { locker.getLock(blockId).synchronized {
logDebug("Put for block " + blockId + " used " + Utils.getUsedTimeMs(startTimeMs) logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block") + " to get into synchronized block")
// Check and warn if block with same id already exists // Check and warn if block with same id already exists
if (storageLevels.get(blockId) != null) { if (getLevel(blockId) != null) {
logWarning("Block " + blockId + " already exists in local machine") logWarning("Block " + blockId + " already exists in local machine")
return return
} }
// Store the storage level
storageLevels.put(blockId, level)
if (level.useMemory && level.useDisk) { if (level.useMemory && level.useDisk) {
// If saving to both memory and disk, then serialize only once // If saving to both memory and disk, then serialize only once
memoryStore.putValues(blockId, values, level) match { memoryStore.putValues(blockId, values, level) match {
@ -333,11 +394,10 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
} }
} }
if (tellMaster) { // Store the storage level
notifyMaster(HeartBeat(blockManagerId, blockId, level, 0, 0)) setLevel(blockId, level, tellMaster)
logDebug("Put block " + blockId + " after notifying the master " + Utils.getUsedTimeMs(startTimeMs))
}
} }
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
// Replicate block if required // Replicate block if required
if (level.replication > 1) { if (level.replication > 1) {
@ -347,21 +407,32 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
replicate(blockId, bytes, level) replicate(blockId, bytes, level)
} }
// TODO(Haoyuan): This code will be removed when CacheTracker is gone. // TODO: This code will be removed when CacheTracker is gone.
if (blockId.startsWith("rdd")) { if (blockId.startsWith("rdd")) {
notifyTheCacheTracker(blockId) notifyTheCacheTracker(blockId)
} }
logDebug("Put block " + blockId + " after notifying the CacheTracker " + Utils.getUsedTimeMs(startTimeMs)) logDebug("Put block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs))
} }
/**
* Put a new block of serialized bytes to the block manager.
*/
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) { def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
val startTime = System.currentTimeMillis if (blockId == null) {
if (!level.useDisk && !level.useMemory) { throw new IllegalArgumentException("Block Id is null")
throw new IllegalArgumentException("Storage level has neither useMemory nor useDisk set")
} else if (level.deserialized) {
throw new IllegalArgumentException("Storage level cannot have deserialized when putBytes is used")
} }
if (bytes == null) {
throw new IllegalArgumentException("Bytes is null")
}
if (level == null || !level.isValid) {
throw new IllegalArgumentException("Storage level is null or invalid")
}
val startTimeMs = System.currentTimeMillis
// Initiate the replication before storing it locally. This is faster as
// data is already serialized and ready for sending
val replicationFuture = if (level.replication > 1) { val replicationFuture = if (level.replication > 1) {
future { future {
replicate(blockId, bytes, level) replicate(blockId, bytes, level)
@ -371,13 +442,12 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
} }
locker.getLock(blockId).synchronized { locker.getLock(blockId).synchronized {
logDebug("PutBytes for block " + blockId + " used " + Utils.getUsedTimeMs(startTime) logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block") + " to get into synchronized block")
if (storageLevels.get(blockId) != null) { if (getLevel(blockId) != null) {
logWarning("Block " + blockId + " already exists") logWarning("Block " + blockId + " already exists")
return return
} }
storageLevels.put(blockId, level)
if (level.useMemory) { if (level.useMemory) {
memoryStore.putBytes(blockId, bytes, level) memoryStore.putBytes(blockId, bytes, level)
@ -385,15 +455,17 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
if (level.useDisk) { if (level.useDisk) {
diskStore.putBytes(blockId, bytes, level) diskStore.putBytes(blockId, bytes, level)
} }
if (tellMaster) {
notifyMaster(HeartBeat(blockManagerId, blockId, level, 0, 0)) // Store the storage level
} setLevel(blockId, level, tellMaster)
} }
// TODO: This code will be removed when CacheTracker is gone.
if (blockId.startsWith("rdd")) { if (blockId.startsWith("rdd")) {
notifyTheCacheTracker(blockId) notifyTheCacheTracker(blockId)
} }
// If replication had started, then wait for it to finish
if (level.replication > 1) { if (level.replication > 1) {
if (replicationFuture == null) { if (replicationFuture == null) {
throw new Exception("Unexpected") throw new Exception("Unexpected")
@ -403,12 +475,17 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
val finishTime = System.currentTimeMillis val finishTime = System.currentTimeMillis
if (level.replication > 1) { if (level.replication > 1) {
logDebug("PutBytes with replication took " + (finishTime - startTime) + " ms") logDebug("PutBytes for block " + blockId + " with replication took " +
Utils.getUsedTimeMs(startTimeMs))
} else { } else {
logDebug("PutBytes without replication took " + (finishTime - startTime) + " ms") logDebug("PutBytes for block " + blockId + " without replication took " +
Utils.getUsedTimeMs(startTimeMs))
}
} }
} /**
* Replicate block to another node.
*/
private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) { private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) {
val tLevel: StorageLevel = val tLevel: StorageLevel =
@ -429,8 +506,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
} }
} }
// TODO(Haoyuan): This code will be removed when CacheTracker is gone. // TODO: This code will be removed when CacheTracker is gone.
def notifyTheCacheTracker(key: String) { private def notifyTheCacheTracker(key: String) {
val rddInfo = key.split(":") val rddInfo = key.split(":")
val rddId: Int = rddInfo(1).toInt val rddId: Int = rddInfo(1).toInt
val splitIndex: Int = rddInfo(2).toInt val splitIndex: Int = rddInfo(2).toInt
@ -448,8 +525,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
/** /**
* Write a block consisting of a single object. * Write a block consisting of a single object.
*/ */
def putSingle(blockId: String, value: Any, level: StorageLevel) { def putSingle(blockId: String, value: Any, level: StorageLevel, tellMaster: Boolean = true) {
put(blockId, Iterator(value), level) put(blockId, Iterator(value), level, tellMaster)
} }
/** /**
@ -457,7 +534,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
*/ */
def dropFromMemory(blockId: String) { def dropFromMemory(blockId: String) {
locker.getLock(blockId).synchronized { locker.getLock(blockId).synchronized {
val level = storageLevels.get(blockId) val level = getLevel(blockId)
if (level == null) { if (level == null) {
logWarning("Block " + blockId + " cannot be removed from memory as it does not exist") logWarning("Block " + blockId + " cannot be removed from memory as it does not exist")
return return
@ -467,14 +544,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
return return
} }
memoryStore.remove(blockId) memoryStore.remove(blockId)
if (!level.useDisk) { val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
storageLevels.remove(blockId) setLevel(blockId, newLevel)
} else {
val newLevel = level.clone
newLevel.useMemory = false
storageLevels.remove(blockId)
storageLevels.put(blockId, newLevel)
}
} }
} }
@ -489,13 +560,23 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = { def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = {
/*serializer.newInstance().deserializeMany(bytes)*/ /*serializer.newInstance().deserializeMany(bytes)*/
val ser = serializer.newInstance() val ser = serializer.newInstance()
return ser.deserializeStream(new FastByteArrayInputStream(bytes.array())).toIterator bytes.rewind()
return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator
} }
private def notifyMaster(heartBeat: HeartBeat) { private def notifyMaster(heartBeat: HeartBeat) {
BlockManagerMaster.mustHeartBeat(heartBeat) BlockManagerMaster.mustHeartBeat(heartBeat)
} }
def stop() {
connectionManager.stop()
blockInfo.clear()
memoryStore.clear()
diskStore.clear()
logInfo("BlockManager stopped")
} }
}
object BlockManager extends Logging { object BlockManager extends Logging {
def getMaxMemoryFromSystemProperties(): Long = { def getMaxMemoryFromSystemProperties(): Long = {

Просмотреть файл

@ -1,6 +1,7 @@
package spark.storage package spark.storage
import java.io._ import java.io._
import java.util.{HashMap => JHashMap}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
@ -86,7 +87,9 @@ case class RemoveHost(
host: String) host: String)
extends ToBlockManagerMaster extends ToBlockManagerMaster
class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging { class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
class BlockManagerInfo( class BlockManagerInfo(
timeMs: Long, timeMs: Long,
maxMem: Long, maxMem: Long,
@ -94,7 +97,7 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
private var lastSeenMs = timeMs private var lastSeenMs = timeMs
private var remainedMem = maxMem private var remainedMem = maxMem
private var remainedDisk = maxDisk private var remainedDisk = maxDisk
private val blocks = new HashMap[String, StorageLevel] private val blocks = new JHashMap[String, StorageLevel]
def updateLastSeenMs() { def updateLastSeenMs() {
lastSeenMs = System.currentTimeMillis() / 1000 lastSeenMs = System.currentTimeMillis() / 1000
@ -104,8 +107,8 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
synchronized { synchronized {
updateLastSeenMs() updateLastSeenMs()
if (blocks.contains(blockId)) { if (blocks.containsKey(blockId)) {
val oriLevel: StorageLevel = blocks(blockId) val oriLevel: StorageLevel = blocks.get(blockId)
if (oriLevel.deserialized) { if (oriLevel.deserialized) {
remainedMem += deserializedSize remainedMem += deserializedSize
@ -118,8 +121,8 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
} }
} }
blocks += (blockId -> storageLevel) if (storageLevel.isValid) {
blocks.put(blockId, storageLevel)
if (storageLevel.deserialized) { if (storageLevel.deserialized) {
remainedMem -= deserializedSize remainedMem -= deserializedSize
} }
@ -129,8 +132,7 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
if (storageLevel.useDisk) { if (storageLevel.useDisk) {
remainedDisk -= size remainedDisk -= size
} }
} else {
if (!(storageLevel.deserialized || storageLevel.useMemory || storageLevel.useDisk)) {
blocks.remove(blockId) blocks.remove(blockId)
} }
} }
@ -150,10 +152,14 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
override def toString(): String = { override def toString(): String = {
return "BlockManagerInfo " + timeMs + " " + remainedMem + " " + remainedDisk return "BlockManagerInfo " + timeMs + " " + remainedMem + " " + remainedDisk
} }
def clear() {
blocks.clear()
}
} }
private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo] private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
private val blockIdMap = new HashMap[String, Pair[Int, HashSet[BlockManagerId]]] private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging() initLogging()
@ -215,7 +221,6 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
val startTimeMs = System.currentTimeMillis() val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " " + blockId + " " val tmp = " " + blockManagerId + " " + blockId + " "
logDebug("Got in heartBeat 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockId == null) { if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs() blockManagerInfo(blockManagerId).updateLastSeenMs()
@ -224,29 +229,24 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
} }
blockManagerInfo(blockManagerId).addBlock(blockId, storageLevel, deserializedSize, size) blockManagerInfo(blockManagerId).addBlock(blockId, storageLevel, deserializedSize, size)
logDebug("Got in heartBeat 2" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
var locations: HashSet[BlockManagerId] = null var locations: HashSet[BlockManagerId] = null
if (blockIdMap.contains(blockId)) { if (blockInfo.containsKey(blockId)) {
locations = blockIdMap(blockId)._2 locations = blockInfo.get(blockId)._2
} else { } else {
locations = new HashSet[BlockManagerId] locations = new HashSet[BlockManagerId]
blockIdMap += (blockId -> (storageLevel.replication, locations)) blockInfo.put(blockId, (storageLevel.replication, locations))
} }
logDebug("Got in heartBeat 3" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
if (storageLevel.deserialized || storageLevel.useDisk || storageLevel.useMemory) { if (storageLevel.isValid) {
locations += blockManagerId locations += blockManagerId
} else { } else {
locations.remove(blockManagerId) locations.remove(blockManagerId)
} }
logDebug("Got in heartBeat 4" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
if (locations.size == 0) { if (locations.size == 0) {
blockIdMap.remove(blockId) blockInfo.remove(blockId)
} }
logDebug("Got in heartBeat 5" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
self.reply(true) self.reply(true)
} }
@ -254,9 +254,9 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
val startTimeMs = System.currentTimeMillis() val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockId + " " val tmp = " " + blockId + " "
logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs)) logDebug("Got in getLocations 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockIdMap.contains(blockId)) { if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockIdMap(blockId)._2) res.appendAll(blockInfo.get(blockId)._2)
logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at " logDebug("Got in getLocations 1" + tmp + " as "+ res.toSeq + " at "
+ Utils.getUsedTimeMs(startTimeMs)) + Utils.getUsedTimeMs(startTimeMs))
self.reply(res.toSeq) self.reply(res.toSeq)
@ -271,9 +271,9 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
def getLocations(blockId: String): Seq[BlockManagerId] = { def getLocations(blockId: String): Seq[BlockManagerId] = {
val tmp = blockId val tmp = blockId
logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp) logDebug("Got in getLocationsMultipleBlockIds Sub 0 " + tmp)
if (blockIdMap.contains(blockId)) { if (blockInfo.containsKey(blockId)) {
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(blockIdMap(blockId)._2) res.appendAll(blockInfo.get(blockId)._2)
logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq) logDebug("Got in getLocationsMultipleBlockIds Sub 1 " + tmp + " " + res.toSeq)
return res.toSeq return res.toSeq
} else { } else {
@ -293,24 +293,18 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
} }
private def getPeers(blockManagerId: BlockManagerId, size: Int) { private def getPeers(blockManagerId: BlockManagerId, size: Int) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in getPeers 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
res.appendAll(peers) res.appendAll(peers)
res -= blockManagerId res -= blockManagerId
val rand = new Random(System.currentTimeMillis()) val rand = new Random(System.currentTimeMillis())
logDebug("Got in getPeers 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
while (res.length > size) { while (res.length > size) {
res.remove(rand.nextInt(res.length)) res.remove(rand.nextInt(res.length))
} }
logDebug("Got in getPeers 2" + tmp + Utils.getUsedTimeMs(startTimeMs))
self.reply(res.toSeq) self.reply(res.toSeq)
} }
private def getPeers_Deterministic(blockManagerId: BlockManagerId, size: Int) { private def getPeers_Deterministic(blockManagerId: BlockManagerId, size: Int) {
val startTimeMs = System.currentTimeMillis()
var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId] var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
@ -329,7 +323,6 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging {
res += peers(index % peers.size) res += peers(index % peers.size)
} }
val resStr = res.map(_.toString).reduceLeft(_ + ", " + _) val resStr = res.map(_.toString).reduceLeft(_ + ", " + _)
logDebug("Got peers for " + blockManagerId + " as [" + resStr + "]")
self.reply(res.toSeq) self.reply(res.toSeq)
} }
} }
@ -358,6 +351,14 @@ object BlockManagerMaster extends Logging {
} }
} }
def stopBlockManagerMaster() {
if (masterActor != null) {
masterActor.stop()
masterActor = null
logInfo("BlockManagerMaster stopped")
}
}
def notifyADeadHost(host: String) { def notifyADeadHost(host: String) {
(masterActor ? RemoveHost(host + ":" + DEFAULT_MANAGER_PORT)).as[Any] match { (masterActor ? RemoveHost(host + ":" + DEFAULT_MANAGER_PORT)).as[Any] match {
case Some(true) => case Some(true) =>

Просмотреть файл

@ -79,7 +79,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
private def getBlock(id: String): ByteBuffer = { private def getBlock(id: String): ByteBuffer = {
val startTimeMs = System.currentTimeMillis() val startTimeMs = System.currentTimeMillis()
logDebug("Getblock " + id + " started from " + startTimeMs) logDebug("Getblock " + id + " started from " + startTimeMs)
val block = blockManager.get(id) val block = blockManager.getLocal(id)
val buffer = block match { val buffer = block match {
case Some(tValues) => { case Some(tValues) => {
val values = tValues.asInstanceOf[Iterator[Any]] val values = tValues.asInstanceOf[Iterator[Any]]

Просмотреть файл

@ -31,6 +31,8 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging {
def dataSerialize(values: Iterator[Any]): ByteBuffer = blockManager.dataSerialize(values) def dataSerialize(values: Iterator[Any]): ByteBuffer = blockManager.dataSerialize(values)
def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = blockManager.dataDeserialize(bytes) def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = blockManager.dataDeserialize(bytes)
def clear() { }
} }
/** /**
@ -118,6 +120,14 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
} }
override def clear() {
memoryStore.synchronized {
memoryStore.clear()
}
blockDropper.shutdown()
logInfo("MemoryStore cleared")
}
private def drop(blockId: String) { private def drop(blockId: String) {
blockDropper.submit(new Runnable() { blockDropper.submit(new Runnable() {
def run() { def run() {
@ -147,8 +157,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
for (blockId <- droppedBlockIds) { for (blockId <- droppedBlockIds) {
drop(blockId) drop(blockId)
} }
droppedBlockIds.clear()
droppedBlockIds.clear
} }
} }

Просмотреть файл

@ -33,6 +33,8 @@ class StorageLevel(
false false
} }
def isValid() = ((useMemory || useDisk) && (replication > 0))
def toInt(): Int = { def toInt(): Int = {
var ret = 0 var ret = 0
if (useDisk) { if (useDisk) {

Просмотреть файл

@ -10,6 +10,36 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter{
BlockManagerMaster.startBlockManagerMaster(true, true) BlockManagerMaster.startBlockManagerMaster(true, true)
} }
test("manager-master interaction") {
val store = new BlockManager(2000, new KryoSerializer)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_DESER)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_DESER, false)
// Checking whether blocks are in memory
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store")
// Checking whether master knows about the blocks or not
assert(BlockManagerMaster.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
assert(BlockManagerMaster.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
assert(BlockManagerMaster.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
// Setting storage level of a1 and a2 to invalid; they should be removed from store and master
store.setLevel("a1", new StorageLevel(false, false, false, 1))
store.setLevel("a2", new StorageLevel(true, false, false, 0))
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(BlockManagerMaster.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
assert(BlockManagerMaster.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
}
test("in-memory LRU storage") { test("in-memory LRU storage") {
val store = new BlockManager(1000, new KryoSerializer) val store = new BlockManager(1000, new KryoSerializer)
val a1 = new Array[Byte](400) val a1 = new Array[Byte](400)
@ -21,14 +51,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter{
assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store")
Thread.sleep(100) Thread.sleep(100)
assert(store.getSingle("a1") == None, "a1 was in store") assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3 // At this point a2 was gotten last, so LRU will getSingle rid of a3
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER)
assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store")
Thread.sleep(100) Thread.sleep(100)
assert(store.getSingle("a3") == None, "a3 was in store") assert(store.getSingle("a3") === None, "a3 was in store")
} }
test("in-memory LRU storage with serialization") { test("in-memory LRU storage with serialization") {
@ -42,14 +72,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter{
Thread.sleep(100) Thread.sleep(100)
assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") != None, "a3 was not in store") assert(store.getSingle("a3") != None, "a3 was not in store")
assert(store.getSingle("a1") == None, "a1 was in store") assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3 // At this point a2 was gotten last, so LRU will getSingle rid of a3
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER) store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_DESER)
Thread.sleep(100) Thread.sleep(100)
assert(store.getSingle("a1") != None, "a1 was not in store") assert(store.getSingle("a1") != None, "a1 was not in store")
assert(store.getSingle("a2") != None, "a2 was not in store") assert(store.getSingle("a2") != None, "a2 was not in store")
assert(store.getSingle("a3") == None, "a1 was in store") assert(store.getSingle("a3") === None, "a1 was in store")
} }
test("on-disk storage") { test("on-disk storage") {
@ -132,7 +162,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter{
assert(store.get("list2").get.size == 2) assert(store.get("list2").get.size == 2)
assert(store.get("list3") != None, "list3 was not in store") assert(store.get("list3") != None, "list3 was not in store")
assert(store.get("list3").get.size == 2) assert(store.get("list3").get.size == 2)
assert(store.get("list1") == None, "list1 was in store") assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2) assert(store.get("list2").get.size == 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3 // At this point list2 was gotten last, so LRU will getSingle rid of list3
@ -142,7 +172,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter{
assert(store.get("list1").get.size == 2) assert(store.get("list1").get.size == 2)
assert(store.get("list2") != None, "list2 was not in store") assert(store.get("list2") != None, "list2 was not in store")
assert(store.get("list2").get.size == 2) assert(store.get("list2").get.size == 2)
assert(store.get("list3") == None, "list1 was in store") assert(store.get("list3") === None, "list1 was in store")
} }
test("LRU with mixed storage levels and streams") { test("LRU with mixed storage levels and streams") {
@ -158,25 +188,25 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter{
Thread.sleep(100) Thread.sleep(100)
// At this point LRU should not kick in because list3 is only on disk // At this point LRU should not kick in because list3 is only on disk
assert(store.get("list1") != None, "list2 was not in store") assert(store.get("list1") != None, "list2 was not in store")
assert(store.get("list1").get.size == 2) assert(store.get("list1").get.size === 2)
assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size == 2) assert(store.get("list2").get.size === 2)
assert(store.get("list3") != None, "list1 was not in store") assert(store.get("list3") != None, "list1 was not in store")
assert(store.get("list3").get.size == 2) assert(store.get("list3").get.size === 2)
assert(store.get("list1") != None, "list2 was not in store") assert(store.get("list1") != None, "list2 was not in store")
assert(store.get("list1").get.size == 2) assert(store.get("list1").get.size === 2)
assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size == 2) assert(store.get("list2").get.size === 2)
assert(store.get("list3") != None, "list1 was not in store") assert(store.get("list3") != None, "list1 was not in store")
assert(store.get("list3").get.size == 2) assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out // Now let's add in list4, which uses both disk and memory; list1 should drop out
store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY) store.put("list4", list4.iterator, StorageLevel.DISK_AND_MEMORY)
assert(store.get("list1") == None, "list1 was in store") assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2") != None, "list3 was not in store") assert(store.get("list2") != None, "list3 was not in store")
assert(store.get("list2").get.size == 2) assert(store.get("list2").get.size === 2)
assert(store.get("list3") != None, "list1 was not in store") assert(store.get("list3") != None, "list1 was not in store")
assert(store.get("list3").get.size == 2) assert(store.get("list3").get.size === 2)
assert(store.get("list4") != None, "list4 was not in store") assert(store.get("list4") != None, "list4 was not in store")
assert(store.get("list4").get.size == 2) assert(store.get("list4").get.size === 2)
} }
} }

Просмотреть файл

@ -4,4 +4,4 @@ if [ "$MESOS_HOME" != "" ]; then
EXTRA_ARGS="-Djava.library.path=$MESOS_HOME/lib/java" EXTRA_ARGS="-Djava.library.path=$MESOS_HOME/lib/java"
fi fi
export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd) export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd)
java -Xmx800M -XX:MaxPermSize=150m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@" java -Xmx1200M -XX:MaxPermSize=200m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"