diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 538e057926..d7bd832e52 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -34,6 +34,8 @@ import org.apache.mesos.{Scheduler, MesosNativeLibrary} import spark.broadcast._ +import spark.deploy.LocalSparkCluster + import spark.partial.ApproximateEvaluator import spark.partial.PartialResult @@ -81,9 +83,11 @@ class SparkContext( val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r + // Regular expression for simulating a Spark cluster of [N, cores, memory] locally + val SPARK_LOCALCLUSTER_REGEX = """local-cluster\[([0-9]+)\,([0-9]+),([0-9]+)]""".r // Regular expression for connecting to Spark deploy clusters val SPARK_REGEX = """(spark://.*)""".r - + master match { case "local" => new LocalScheduler(1, 0) @@ -99,6 +103,17 @@ class SparkContext( val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) scheduler.initialize(backend) scheduler + + case SPARK_LOCALCLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) => + val scheduler = new ClusterScheduler(this) + val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt) + val sparkUrl = localCluster.start() + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) + scheduler.initialize(backend) + backend.shutdownHook = (backend: SparkDeploySchedulerBackend) => { + localCluster.stop() + } + scheduler case _ => MesosNativeLibrary.load() diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala new file mode 100644 index 0000000000..da74df4dcf --- /dev/null +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -0,0 +1,68 @@ +package spark.deploy + +import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} + +import spark.deploy.worker.Worker +import spark.deploy.master.Master +import spark.util.AkkaUtils +import spark.{Logging, Utils} + +import scala.collection.mutable.ArrayBuffer + +class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int, + memoryPerSlave : Int) extends Logging { + + val threadPool = Utils.newDaemonFixedThreadPool(numSlaves + 1) + val localIpAddress = Utils.localIpAddress + + var masterActor : ActorRef = _ + var masterActorSystem : ActorSystem = _ + var masterPort : Int = _ + var masterUrl : String = _ + + val slaveActorSystems = ArrayBuffer[ActorSystem]() + val slaveActors = ArrayBuffer[ActorRef]() + + def start() : String = { + + logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.") + + /* Start the Master */ + val (masterActorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0) + masterUrl = "spark://" + localIpAddress + ":" + masterPort + threadPool.execute(new Runnable { + def run() { + val actor = masterActorSystem.actorOf( + Props(new Master(localIpAddress, masterPort, 8080)), name = "Master") + masterActor = actor + masterActorSystem.awaitTermination() + } + }) + + /* Start the Slaves */ + (1 to numSlaves).foreach { slaveNum => + val (actorSystem, boundPort) = + AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0) + slaveActorSystems += actorSystem + threadPool.execute(new Runnable { + def run() { + val actor = actorSystem.actorOf( + Props(new Worker(localIpAddress, boundPort, 8080 + slaveNum, coresPerSlave, memoryPerSlave, masterUrl)), + name = "Worker") + slaveActors += actor + actorSystem.awaitTermination() + } + }) + } + + return masterUrl + } + + def stop() { + logInfo("Shutting down local Spark cluster.") + masterActorSystem.shutdown() + slaveActorSystems.foreach(_.shutdown()) + } + + +} \ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 3e24380810..393f4a3ee6 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -35,6 +35,19 @@ class ExecutorRunner( override def run() { fetchAndRunExecutor() } } workerThread.start() + + // Shutdown hook that kills actors on shutdown. + Runtime.getRuntime.addShutdownHook( + new Thread() { + override def run() { + if(process != null) { + logInfo("Shutdown Hook killing process.") + process.destroy() + process.waitFor() + } + } + }) + } /** Stop this executor runner, including killing the process it launched */ @@ -131,6 +144,9 @@ class ExecutorRunner( } env.put("SPARK_CORES", cores.toString) env.put("SPARK_MEMORY", memory.toString) + // In case we are running this from within the Spark Shell + // so we are not creating a parent process. + env.put("SPARK_LAUNCH_WITH_SCALA", "0") process = builder.start() // Redirect its stdout and stderr to files diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 0bd2d15479..ec3ff38d5c 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -16,6 +16,7 @@ class SparkDeploySchedulerBackend( var client: Client = null var stopping = false + var shutdownHook : (SparkDeploySchedulerBackend) => Unit = _ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt @@ -61,6 +62,9 @@ class SparkDeploySchedulerBackend( stopping = true; super.stop() client.stop() + if (shutdownHook != null) { + shutdownHook(this) + } } def connected(jobId: String) {