More work on deploy code (adding Worker class)

This commit is contained in:
Matei Zaharia 2012-06-30 16:43:27 -07:00
Родитель 2fb6e7d71e
Коммит 408b5a1332
18 изменённых файлов: 351 добавлений и 127 удалений

Просмотреть файл

@ -0,0 +1,6 @@
<head><title>Hello world!</title></head>
<p>Hello world!</p>

Просмотреть файл

@ -0,0 +1,6 @@
<head><title>Hello world!</title></head>
<p>Hello world!</p>

Просмотреть файл

@ -200,9 +200,27 @@ object Utils {
* Use unit suffixes (Byte, Kilobyte, Megabyte, Gigabyte, Terabyte and
* Petabyte) in order to reduce the number of digits to four or less. For
* example, 4,000,000 is returned as 4MB.
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
* This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
* environment variable.
def memoryStringToMb(str: String): Int = {
val lower = str.toLowerCase
if (lower.endsWith("k")) {
(lower.substring(0, lower.length-1).toLong / 1024).toInt
} else if (lower.endsWith("m")) {
lower.substring(0, lower.length-1).toInt
} else if (lower.endsWith("g")) {
lower.substring(0, lower.length-1).toInt * 1024
} else if (lower.endsWith("t")) {
lower.substring(0, lower.length-1).toInt * 1024 * 1024
} else {// no suffix, so it's just a number in bytes
(lower.toLong / 1024 / 1024).toInt
* Convert a memory quantity in bytes to a human-readable string such as "4.0 MB".
def memoryBytesToString(size: Long): String = {
val TB = 1L << 40
@ -225,4 +243,11 @@ object Utils {
"%.1f %s".formatLocal(Locale.US, value, unit)
* Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB".
def memoryMegabytesToString(megabytes: Long): String = {
memoryBytesToString(megabytes * 1024L * 1024L)

Просмотреть файл

@ -0,0 +1,6 @@
package spark.deploy
sealed trait DeployMessage
case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends DeployMessage
case class RegisteredSlave(clusterId: String, slaveId: Int) extends DeployMessage

Просмотреть файл

@ -1,44 +0,0 @@
package spark.deploy
import{ActorRef, Props, Actor, ActorSystem}
import spark.{Logging, Utils}
import scala.collection.immutable.{::, Nil}
import spark.util.{AkkaUtils, IntParam}
import cc.spray.Directives
sealed trait MasterMessage
case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends MasterMessage
class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
def startWebUi() {
val webUi = new MasterWebUI(context.system, self)
try {
AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler)
} catch {
case e: Exception =>
logError("Failed to create web UI", e)
override def receive = {
case RegisterSlave(host, slavePort, cores, memory) =>
logInfo("Registering slave %s:%d with %d cores, %s RAM".format(
host, slavePort, cores, Utils.memoryBytesToString(memory * 1024L)))
object Master {
def main(args: Array[String]) {
val params = new MasterArguments(args)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", params.ip, params.port)
val actor = actorSystem.actorOf(
Props(new Master(params.ip, boundPort, params.webUiPort)), name = "Master")

Просмотреть файл

@ -1,12 +0,0 @@
package spark.deploy
import{ActorRef, ActorSystem}
import cc.spray.Directives
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val handler = {
path("") {
get { _.complete("Hello world!") }

Просмотреть файл

@ -1,10 +0,0 @@
package spark.deploy
class Worker(cores: Int, memoryMb: Int) {
object Worker {
def main(args: Array[String]) {

Просмотреть файл

@ -0,0 +1,94 @@
package spark.deploy.master
import scala.collection.mutable.HashMap
import{Terminated, ActorRef, Props, Actor}
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import java.text.SimpleDateFormat
import java.util.Date
import spark.deploy.{RegisteredSlave, RegisterSlave}
class SlaveInfo(
val id: Int,
val host: String,
val port: Int,
val cores: Int,
val memory: Int,
val actor: ActorRef) {
var coresUsed = 0
var memoryUsed = 0
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val clusterId = newClusterId()
var nextSlaveId = 0
var nextJobId = 0
val slaves = new HashMap[Int, SlaveInfo]
val actorToSlave = new HashMap[ActorRef, SlaveInfo]
override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
logInfo("Cluster ID: " + clusterId)
def startWebUi() {
val webUi = new MasterWebUI(context.system, self)
try {
AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler)
} catch {
case e: Exception =>
logError("Failed to create web UI", e)
override def receive = {
case RegisterSlave(host, slavePort, cores, memory) => {
logInfo("Registering slave %s:%d with %d cores, %s RAM".format(
host, slavePort, cores, Utils.memoryMegabytesToString(memory)))
val id = newSlaveId()
slaves(id) = new SlaveInfo(id, host, slavePort, cores, memory, sender)
actorToSlave(sender) = slaves(id)
sender ! RegisteredSlave(clusterId, id)
case Terminated(actor) => {
logInfo("Slave disconnected: " + actor)
actorToSlave.get(actor) match {
case Some(slave) =>
logInfo("Removing slave " +
slaves -=
actorToSlave -= actor
case None =>
logError("Did not have any slave registered for " + actor)
def newClusterId(): String = {
val date = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date())
"%s-%04d".format(date, (math.random * 10000).toInt)
def newSlaveId(): Int = {
nextSlaveId += 1
nextSlaveId - 1
object Master {
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
val actor = actorSystem.actorOf(
Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master")

Просмотреть файл

@ -1,4 +1,4 @@
package spark.deploy
package spark.deploy.master
import spark.util.IntParam
import spark.Utils
@ -7,9 +7,9 @@ import spark.Utils
* Command-line parser for the master.
class MasterArguments(args: Array[String]) {
var ip: String = Utils.localIpAddress()
var port: Int = 7077
var webUiPort: Int = 8080
var ip = Utils.localIpAddress()
var port = 7077
var webUiPort = 8080
@ -45,7 +45,7 @@ class MasterArguments(args: Array[String]) {
"Options:\n" +
" -i IP, --ip IP IP address or DNS name to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port PORT Port for web UI (default: 8080)\n")
" --webui-port PORT Port for web UI (default: 8080)")

Просмотреть файл

@ -0,0 +1,17 @@
package spark.deploy.master
import{ActorRef, ActorSystem}
import cc.spray.Directives
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui"
val handler = {
get {
path("") {
getFromResource(RESOURCE_DIR + "/index.html")
} ~

Просмотреть файл

@ -0,0 +1,58 @@
package spark.deploy.worker
import scala.collection.mutable.HashMap
import{Terminated, ActorRef, Props, Actor}
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import java.text.SimpleDateFormat
import java.util.Date
import spark.deploy.{RegisteredSlave, RegisterSlave}
class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int)
extends Actor with Logging {
var coresUsed = 0
var memoryUsed = 0
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
override def preStart() {
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
ip, port, cores, Utils.memoryMegabytesToString(memory)))
def startWebUi() {
val webUi = new WorkerWebUI(context.system, self)
try {
AkkaUtils.startSprayServer(context.system, ip, webUiPort, webUi.handler)
} catch {
case e: Exception =>
logError("Failed to create web UI", e)
override def receive = {
case RegisteredSlave(clusterId, slaveId) => {
logInfo("Registered with cluster ID " + clusterId + ", slave ID " + slaveId)
case Terminated(actor) => {
logError("Master disconnected!")
object Worker {
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
val actor = actorSystem.actorOf(
Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory)),
name = "Worker")

Просмотреть файл

@ -0,0 +1,75 @@
package spark.deploy.worker
import spark.util.IntParam
import spark.util.MemoryParam
import spark.Utils
* Command-line parser for the master.
class WorkerArguments(args: Array[String]) {
var ip = Utils.localIpAddress()
var port = 0
var webUiPort = 8081
var cores = inferDefaultCores()
var memory = inferDefaultMemory()
def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
ip = value
case ("--port" | "-p") :: IntParam(value) :: tail =>
port = value
case ("--cores" | "-c") :: IntParam(value) :: tail =>
cores = value
case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
memory = value
case "--webui-port" :: IntParam(value) :: tail =>
webUiPort = value
case ("--help" | "-h") :: tail =>
case Nil => {}
case _ =>
* Print usage and exit JVM with the given exit code.
def printUsageAndExit(exitCode: Int) {
"Usage: spark-worker [options]\n" +
"\n" +
"Options:\n" +
" -c CORES, --cores CORES Number of cores to use\n" +
" -m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)\n" +
" -i IP, --ip IP IP address or DNS name to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: random)\n" +
" --webui-port PORT Port for web UI (default: 8081)")
def inferDefaultCores(): Int = {
def inferDefaultMemory(): Int = {
val bean = ManagementFactory.getOperatingSystemMXBean
(bean.getTotalPhysicalMemorySize / 1024 / 1024).toInt

Просмотреть файл

@ -0,0 +1,17 @@
package spark.deploy.worker
import{ActorRef, ActorSystem}
import cc.spray.Directives
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
val handler = {
get {
path("") {
getFromResource(RESOURCE_DIR + "/index.html")
} ~

Просмотреть файл

@ -45,7 +45,7 @@ class MesosScheduler(
// Memory used by each executor (in megabytes)
if (System.getenv("SPARK_MEM") != null) {
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else {
@ -467,25 +467,3 @@ class MesosScheduler(
object MesosScheduler {
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
* This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
* environment variable.
def memoryStringToMb(str: String): Int = {
val lower = str.toLowerCase
if (lower.endsWith("k")) {
(lower.substring(0, lower.length-1).toLong / 1024).toInt
} else if (lower.endsWith("m")) {
lower.substring(0, lower.length-1).toInt
} else if (lower.endsWith("g")) {
lower.substring(0, lower.length-1).toInt * 1024
} else if (lower.endsWith("t")) {
lower.substring(0, lower.length-1).toInt * 1024 * 1024
} else {// no suffix, so it's just a number in bytes
(lower.toLong / 1024 / 1024).toInt

Просмотреть файл

@ -0,0 +1,17 @@
package spark.util
import spark.Utils
* An extractor object for parsing JVM memory strings, such as "10g", into an Int representing
* the number of megabytes. Supports the same formats as Utils.memoryStringToMb.
object MemoryParam {
def unapply(str: String): Option[Int] = {
try {
} catch {
case e: NumberFormatException => None

Просмотреть файл

@ -1,30 +0,0 @@
package spark
import org.scalatest.FunSuite
import spark.scheduler.mesos.MesosScheduler
class MesosSchedulerSuite extends FunSuite {
assert(MesosScheduler.memoryStringToMb("1") == 0)
assert(MesosScheduler.memoryStringToMb("1048575") == 0)
assert(MesosScheduler.memoryStringToMb("3145728") == 3)
assert(MesosScheduler.memoryStringToMb("1024k") == 1)
assert(MesosScheduler.memoryStringToMb("5000k") == 4)
assert(MesosScheduler.memoryStringToMb("4024k") == MesosScheduler.memoryStringToMb("4024K"))
assert(MesosScheduler.memoryStringToMb("1024m") == 1024)
assert(MesosScheduler.memoryStringToMb("5000m") == 5000)
assert(MesosScheduler.memoryStringToMb("4024m") == MesosScheduler.memoryStringToMb("4024M"))
assert(MesosScheduler.memoryStringToMb("2g") == 2048)
assert(MesosScheduler.memoryStringToMb("3g") == MesosScheduler.memoryStringToMb("3G"))
assert(MesosScheduler.memoryStringToMb("2t") == 2097152)
assert(MesosScheduler.memoryStringToMb("3t") == MesosScheduler.memoryStringToMb("3T"))

Просмотреть файл

@ -26,5 +26,25 @@ class UtilsSuite extends FunSuite {
assert(Utils.memoryStringToMb("1") == 0)
assert(Utils.memoryStringToMb("1048575") == 0)
assert(Utils.memoryStringToMb("3145728") == 3)
assert(Utils.memoryStringToMb("1024k") == 1)
assert(Utils.memoryStringToMb("5000k") == 4)
assert(Utils.memoryStringToMb("4024k") == Utils.memoryStringToMb("4024K"))
assert(Utils.memoryStringToMb("1024m") == 1024)
assert(Utils.memoryStringToMb("5000m") == 5000)
assert(Utils.memoryStringToMb("4024m") == Utils.memoryStringToMb("4024M"))
assert(Utils.memoryStringToMb("2g") == 2048)
assert(Utils.memoryStringToMb("3g") == Utils.memoryStringToMb("3G"))
assert(Utils.memoryStringToMb("2t") == 2097152)
assert(Utils.memoryStringToMb("3t") == Utils.memoryStringToMb("3T"))

Просмотреть файл

for jar in `find $CORE_DIR/lib -name '*jar'`; do