This commit is contained in:
Denny 2012-09-04 21:13:25 -07:00
Родитель 22dde6e020
Коммит 1588d4dbe6
2 изменённых файлов: 63 добавлений и 2 удалений

Просмотреть файл

@ -34,6 +34,8 @@ import org.apache.mesos.{Scheduler, MesosNativeLibrary}
import spark.broadcast._
import spark.deploy.LocalSparkCluster
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
@ -104,8 +106,8 @@ class SparkContext(
case SPARK_LOCALCLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerlave) =>
val scheduler = new ClusterScheduler(this)
val sparkUrl = spark.deploy.DeployUtils.startLocalSparkCluster(numSlaves.toInt,
coresPerSlave.toInt, memoryPerlave.toInt)
val localCluster = new LocalSparkCluster(numSlaves.toInt, coresPerSlave.toInt, memoryPerlave.toInt)
val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
scheduler.initialize(backend)
scheduler

Просмотреть файл

@ -0,0 +1,59 @@
package spark.deploy
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import spark.deploy.worker.Worker
import spark.deploy.master.Master
import spark.util.AkkaUtils
import spark.{Logging, Utils}
import scala.collection.mutable.ArrayBuffer
class LocalSparkCluster(numSlaves : Int, coresPerSlave : Int,
memoryPerSlave : Int) extends Logging {
val threadPool = Utils.newDaemonFixedThreadPool(numSlaves + 1)
val localIpAddress = Utils.localIpAddress
var masterActor : ActorRef = _
var masterPort : Int = _
var masterUrl : String = _
val slaveActors = ArrayBuffer[ActorRef]()
def start() : String = {
logInfo("Starting a local Spark cluster with " + numSlaves + " slaves.")
/* Start the Master */
val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
masterUrl = "spark://" + localIpAddress + ":" + masterPort
threadPool.execute(new Runnable {
def run() {
val actor = actorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 8080)), name = "Master")
masterActor = actor
actorSystem.awaitTermination()
}
})
/* Start the Slaves */
(1 to numSlaves + 1).foreach { slaveNum =>
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
threadPool.execute(new Runnable {
def run() {
val actor = actorSystem.actorOf(
Props(new Worker(localIpAddress, boundPort, 8080 + slaveNum, coresPerSlave, memoryPerSlave, masterUrl)),
name = "Worker")
slaveActors += actor
actorSystem.awaitTermination()
}
})
}
return masterUrl
}
}