Set SPARK_LAUNCH_WITH_SCALA=0 in Executor Runner

This commit is contained in:
Denny 2012-09-07 11:39:44 -07:00
Родитель 8bb3c73977
Коммит 9ead8ab14e
2 изменённых файлов: 20 добавлений и 3 удалений

Просмотреть файл

@ -26,14 +26,14 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
/* Start the Master */
val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
val (masterActorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
masterUrl = "spark://" + localIpAddress + ":" + masterPort
threadPool.execute(new Runnable {
def run() {
val actor = actorSystem.actorOf(
val actor = masterActorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 8080)), name = "Master")
masterActor = actor
actorSystem.awaitTermination()
masterActorSystem.awaitTermination()
}
})
@ -52,6 +52,20 @@ class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
})
}
// Shutdown hook that kills actors on shutdown.
Runtime.getRuntime.addShutdownHook(
new Thread() {
override def run() {
masterActorSystem.stop(masterActor)
masterActorSystem.shutdown()
// Since above is asynchronous wait for the master actor to shut down
while(!masterActor.isTerminated) {
Thread.sleep(10)
}
}
})
return masterUrl
}

Просмотреть файл

@ -131,6 +131,9 @@ class ExecutorRunner(
}
env.put("SPARK_CORES", cores.toString)
env.put("SPARK_MEMORY", memory.toString)
// In case we are running this from within the Spark Shell
// so we are not creating a parent process.
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
// Redirect its stdout and stderr to files