diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 5c535ed502..6723523d5b 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -26,14 +26,14 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int, logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.") /* Start the Master */ - val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0) + val (masterActorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0) masterUrl = "spark://" + localIpAddress + ":" + masterPort threadPool.execute(new Runnable { def run() { - val actor = actorSystem.actorOf( + val actor = masterActorSystem.actorOf( Props(new Master(localIpAddress, masterPort, 8080)), name = "Master") masterActor = actor - actorSystem.awaitTermination() + masterActorSystem.awaitTermination() } }) @@ -52,6 +52,20 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int, }) } + + // Shutdown hook that kills actors on shutdown. + Runtime.getRuntime.addShutdownHook( + new Thread() { + override def run() { + masterActorSystem.stop(masterActor) + masterActorSystem.shutdown() + // Since above is asynchronous wait for the master actor to shut down + while(!masterActor.isTerminated) { + Thread.sleep(10) + } + } + }) + return masterUrl } diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 3e24380810..7ad216f90e 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -131,6 +131,9 @@ class ExecutorRunner( } env.put("SPARK_CORES", cores.toString) env.put("SPARK_MEMORY", memory.toString) + // In case we are running this from within the Spark Shell + // so we are not creating a parent process. + env.put("SPARK_LAUNCH_WITH_SCALA", "0") process = builder.start() // Redirect its stdout and stderr to files