Detect correctly when one has disconnected from a standalone cluster.

SPARK-617 #resolve
This commit is contained in:
Matei Zaharia 2012-11-11 21:06:57 -08:00
Родитель acf8272324
Коммит 173e0354c0
1 изменённых файлов: 13 добавлений и 1 удалений

Просмотреть файл

@ -35,6 +35,7 @@ private[spark] class Client(
class ClientActor extends Actor with Logging { class ClientActor extends Actor with Logging {
var master: ActorRef = null var master: ActorRef = null
var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
override def preStart() { override def preStart() {
@ -43,6 +44,7 @@ private[spark] class Client(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try { try {
master = context.actorFor(akkaUrl) master = context.actorFor(akkaUrl)
masterAddress = master.path.address
master ! RegisterJob(jobDescription) master ! RegisterJob(jobDescription)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing context.watch(master) // Doesn't work with remote actors, but useful for testing
@ -72,7 +74,17 @@ private[spark] class Client(
listener.executorRemoved(fullId, message.getOrElse("")) listener.executorRemoved(fullId, message.getOrElse(""))
} }
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => case Terminated(actor_) if actor_ == master =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)
case RemoteClientDisconnected(transport, address) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)
case RemoteClientShutdown(transport, address) if address == masterAddress =>
logError("Connection to master failed; stopping client") logError("Connection to master failed; stopping client")
markDisconnected() markDisconnected()
context.stop(self) context.stop(self)