End runJob() with a SparkException when a task fails too many times in

one of the cluster schedulers.
This commit is contained in:
Matei Zaharia 2012-08-31 17:47:43 -07:00
Родитель a480dec6b2
Коммит 389fb4cc54
5 изменённых файлов: 66 добавлений и 12 удалений

Просмотреть файл

@ -38,6 +38,11 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
eventQueue.put(HostLost(host))
}
// Called by TaskScheduler to cancel an entier TaskSet due to repeated failures.
override def taskSetFailed(taskSet: TaskSet, reason: String) {
eventQueue.put(TaskSetFailed(taskSet, reason))
}
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
// this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
// as more failure events come in
@ -258,6 +263,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
case completion: CompletionEvent =>
handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
abortStage(idToStage(taskSet.stageId), reason)
case StopDAGScheduler =>
// Cancel any active jobs
for (job <- activeJobs) {
@ -475,18 +483,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
handleHostLost(bmAddress.ip)
}
case _ =>
// Non-fetch failure -- probably a bug in the job, so bail out
// TODO: Cancel all tasks that are still running
resultStageToJob.get(stage) match {
case Some(job) =>
val error = new SparkException("Task failed: " + task + ", reason: " + event.reason)
job.listener.jobFailed(error)
activeJobs -= job
resultStageToJob -= stage
case None =>
logInfo("Ignoring result from " + task + " because its job has finished")
}
case other =>
// Non-fetch failure -- probably a bug in user code; abort all jobs depending on this stage
abortStage(idToStage(task.stageId), task + " failed: " + other)
}
}
@ -510,6 +509,53 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
}
/**
* Aborts all jobs depending on a particular Stage. This is called in response to a task set
* being cancelled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
*/
def abortStage(failedStage: Stage, reason: String) {
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
for (resultStage <- dependentStages) {
val job = resultStageToJob(resultStage)
job.listener.jobFailed(new SparkException("Job failed: " + reason))
activeJobs -= job
resultStageToJob -= resultStage
}
if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
}
}
/**
* Return true if one of stage's ancestors is target.
*/
def stageDependsOn(stage: Stage, target: Stage): Boolean = {
if (stage == target) {
return true
}
val visitedRdds = new HashSet[RDD[_]]
val visitedStages = new HashSet[Stage]
def visit(rdd: RDD[_]) {
if (!visitedRdds(rdd)) {
visitedRdds += rdd
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
if (!mapStage.isAvailable) {
visitedStages += mapStage
visit(mapStage.rdd)
} // Otherwise there's no need to follow the dependency back
case narrowDep: NarrowDependency[_] =>
visit(narrowDep.rdd)
}
}
}
}
visit(stage.rdd)
visitedRdds.contains(target.rdd)
}
def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)

Просмотреть файл

@ -29,4 +29,6 @@ case class CompletionEvent(
case class HostLost(host: String) extends DAGSchedulerEvent
case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
case object StopDAGScheduler extends DAGSchedulerEvent

Просмотреть файл

@ -13,4 +13,7 @@ trait TaskSchedulerListener {
// A node was lost from the cluster.
def hostLost(host: String): Unit
// The TaskScheduler wants to abort an entire task set.
def taskSetFailed(taskSet: TaskSet, reason: String): Unit
}

Просмотреть файл

@ -6,4 +6,6 @@ package spark.scheduler
*/
class TaskSet(val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int) {
val id: String = stageId + "." + attempt
override def toString: String = "TaskSet " + id
}

Просмотреть файл

@ -342,6 +342,7 @@ class TaskSetManager(
failed = true
causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.listener.taskSetFailed(taskSet, message)
sched.taskSetFinished(this)
}