зеркало из https://github.com/microsoft/spark.git
Merge pull request #189 from dennybritz/feature/localcluster
Simulating a Spark standalone cluster locally
This commit is contained in:
Коммит
8d2fcc2832
|
@ -34,6 +34,8 @@ import org.apache.mesos.{Scheduler, MesosNativeLibrary}
|
|||
|
||||
import spark.broadcast._
|
||||
|
||||
import spark.deploy.LocalSparkCluster
|
||||
|
||||
import spark.partial.ApproximateEvaluator
|
||||
import spark.partial.PartialResult
|
||||
|
||||
|
@ -81,9 +83,11 @@ class SparkContext(
|
|||
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
|
||||
// Regular expression for local[N, maxRetries], used in tests with failing tasks
|
||||
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
|
||||
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
|
||||
val SPARK_LOCALCLUSTER_REGEX = """local-cluster\[([0-9]+)\,([0-9]+),([0-9]+)]""".r
|
||||
// Regular expression for connecting to Spark deploy clusters
|
||||
val SPARK_REGEX = """(spark://.*)""".r
|
||||
|
||||
|
||||
master match {
|
||||
case "local" =>
|
||||
new LocalScheduler(1, 0)
|
||||
|
@ -99,6 +103,17 @@ class SparkContext(
|
|||
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
|
||||
scheduler.initialize(backend)
|
||||
scheduler
|
||||
|
||||
case SPARK_LOCALCLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
|
||||
val scheduler = new ClusterScheduler(this)
|
||||
val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt)
|
||||
val sparkUrl = localCluster.start()
|
||||
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
|
||||
scheduler.initialize(backend)
|
||||
backend.shutdownHook = (backend: SparkDeploySchedulerBackend) => {
|
||||
localCluster.stop()
|
||||
}
|
||||
scheduler
|
||||
|
||||
case _ =>
|
||||
MesosNativeLibrary.load()
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
package spark.deploy
|
||||
|
||||
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
|
||||
|
||||
import spark.deploy.worker.Worker
|
||||
import spark.deploy.master.Master
|
||||
import spark.util.AkkaUtils
|
||||
import spark.{Logging, Utils}
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
|
||||
memoryPerSlave : Int) extends Logging {
|
||||
|
||||
val threadPool = Utils.newDaemonFixedThreadPool(numSlaves + 1)
|
||||
val localIpAddress = Utils.localIpAddress
|
||||
|
||||
var masterActor : ActorRef = _
|
||||
var masterActorSystem : ActorSystem = _
|
||||
var masterPort : Int = _
|
||||
var masterUrl : String = _
|
||||
|
||||
val slaveActorSystems = ArrayBuffer[ActorSystem]()
|
||||
val slaveActors = ArrayBuffer[ActorRef]()
|
||||
|
||||
def start() : String = {
|
||||
|
||||
logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
|
||||
|
||||
/* Start the Master */
|
||||
val (masterActorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
|
||||
masterUrl = "spark://" + localIpAddress + ":" + masterPort
|
||||
threadPool.execute(new Runnable {
|
||||
def run() {
|
||||
val actor = masterActorSystem.actorOf(
|
||||
Props(new Master(localIpAddress, masterPort, 8080)), name = "Master")
|
||||
masterActor = actor
|
||||
masterActorSystem.awaitTermination()
|
||||
}
|
||||
})
|
||||
|
||||
/* Start the Slaves */
|
||||
(1 to numSlaves).foreach { slaveNum =>
|
||||
val (actorSystem, boundPort) =
|
||||
AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
|
||||
slaveActorSystems += actorSystem
|
||||
threadPool.execute(new Runnable {
|
||||
def run() {
|
||||
val actor = actorSystem.actorOf(
|
||||
Props(new Worker(localIpAddress, boundPort, 8080 + slaveNum, coresPerSlave, memoryPerSlave, masterUrl)),
|
||||
name = "Worker")
|
||||
slaveActors += actor
|
||||
actorSystem.awaitTermination()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return masterUrl
|
||||
}
|
||||
|
||||
def stop() {
|
||||
logInfo("Shutting down local Spark cluster.")
|
||||
masterActorSystem.shutdown()
|
||||
slaveActorSystems.foreach(_.shutdown())
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -35,6 +35,19 @@ class ExecutorRunner(
|
|||
override def run() { fetchAndRunExecutor() }
|
||||
}
|
||||
workerThread.start()
|
||||
|
||||
// Shutdown hook that kills actors on shutdown.
|
||||
Runtime.getRuntime.addShutdownHook(
|
||||
new Thread() {
|
||||
override def run() {
|
||||
if(process != null) {
|
||||
logInfo("Shutdown Hook killing process.")
|
||||
process.destroy()
|
||||
process.waitFor()
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
/** Stop this executor runner, including killing the process it launched */
|
||||
|
@ -131,6 +144,9 @@ class ExecutorRunner(
|
|||
}
|
||||
env.put("SPARK_CORES", cores.toString)
|
||||
env.put("SPARK_MEMORY", memory.toString)
|
||||
// In case we are running this from within the Spark Shell
|
||||
// so we are not creating a parent process.
|
||||
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
|
||||
process = builder.start()
|
||||
|
||||
// Redirect its stdout and stderr to files
|
||||
|
|
|
@ -16,6 +16,7 @@ class SparkDeploySchedulerBackend(
|
|||
|
||||
var client: Client = null
|
||||
var stopping = false
|
||||
var shutdownHook : (SparkDeploySchedulerBackend) => Unit = _
|
||||
|
||||
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
|
||||
|
||||
|
@ -61,6 +62,9 @@ class SparkDeploySchedulerBackend(
|
|||
stopping = true;
|
||||
super.stop()
|
||||
client.stop()
|
||||
if (shutdownHook != null) {
|
||||
shutdownHook(this)
|
||||
}
|
||||
}
|
||||
|
||||
def connected(jobId: String) {
|
||||
|
|
Загрузка…
Ссылка в новой задаче