Made Akka thread pool and message batch sizes configurable

This commit is contained in:
root 2012-10-07 04:19:54 +00:00
Родитель ce915cadee
Коммит a73b25826b
1 изменённых файлов: 5 добавлений и 3 удалений

Просмотреть файл

@ -23,6 +23,8 @@ private[spark] object AkkaUtils {
* ActorSystem itself and its port (which is hard to get from Akka). * ActorSystem itself and its port (which is hard to get from Akka).
*/ */
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = { def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
val akkaConf = ConfigFactory.parseString(""" val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on akka.daemonic = on
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
@ -31,9 +33,9 @@ private[spark] object AkkaUtils {
akka.remote.netty.hostname = "%s" akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d akka.remote.netty.port = %d
akka.remote.netty.connection-timeout = 1s akka.remote.netty.connection-timeout = 1s
akka.remote.netty.execution-pool-size = 8 akka.remote.netty.execution-pool-size = %d
akka.actor.default-dispatcher.throughput = 30 akka.actor.default-dispatcher.throughput = %d
""".format(host, port)) """.format(host, port, akkaThreads, akkaBatchSize))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)