Print and track user call sites in more places in Spark

This commit is contained in:
Matei Zaharia 2012-09-28 17:42:00 -07:00
Родитель 9f6efbf06a
Коммит 3d7267999d
6 изменённых файлов: 63 добавлений и 54 удалений

Просмотреть файл

@ -62,7 +62,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
@transient val dependencies: List[Dependency[_]]
// Record user function generating this RDD
val origin = getOriginDescription
val origin = Utils.getSparkCallSite
// Optionally overridden by subclasses to specify how they are partitioned
val partitioner: Option[Partitioner] = None
@ -127,38 +127,6 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
}
// Describe which spark and user functions generated this RDD. Only works if called from
// constructor.
def getOriginDescription : String = {
val trace = Thread.currentThread().getStackTrace().filter( el =>
(!el.getMethodName().contains("getStackTrace")))
// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
// transformation, a SparkContext function (such as parallelize), or anything else that leads
// to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
var lastSparkMethod = "<not_found>"
var firstUserMethod = "<not_found>"
var firstUserFile = "<not_found>"
var firstUserLine = -1
var finished = false
for (el <- trace) {
if (!finished) {
if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) {
lastSparkMethod = el.getMethodName()
}
else {
firstUserMethod = el.getMethodName()
firstUserLine = el.getLineNumber()
firstUserFile = el.getFileName()
finished = true
}
}
}
"%s at: %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine)
}
// Transformations (return a new RDD)
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

Просмотреть файл

@ -12,7 +12,7 @@ import scala.io.Source
/**
* Various utility methods used by Spark.
*/
object Utils extends Logging {
private object Utils extends Logging {
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@ -115,10 +115,8 @@ object Utils extends Logging {
val out = new FileOutputStream(dest)
copyStream(in, out, true)
}
/* Download a file from a given URL to the local filesystem */
/** Download a file from a given URL to the local filesystem */
def downloadFile(url: URL, localPath: String) {
val in = url.openStream()
val out = new FileOutputStream(localPath)
@ -349,4 +347,38 @@ object Utils extends Logging {
def execute(command: Seq[String]) {
execute(command, new File("."))
}
/**
* When called inside a class in the spark package, returns the name of the user code class
* (outside the spark package) that called into Spark, as well as which Spark method they called.
* This is used, for example, to tell users where in their code each RDD got created.
*/
def getSparkCallSite: String = {
val trace = Thread.currentThread().getStackTrace().filter( el =>
(!el.getMethodName().contains("getStackTrace")))
// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
// transformation, a SparkContext function (such as parallelize), or anything else that leads
// to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
var lastSparkMethod = "<unknown>"
var firstUserFile = "<unknown>"
var firstUserLine = 0
var finished = false
for (el <- trace) {
if (!finished) {
if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) {
lastSparkMethod = el.getMethodName()
}
else {
firstUserLine = el.getLineNumber()
firstUserFile = el.getFileName()
finished = true
}
}
}
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
}
}

Просмотреть файл

@ -10,6 +10,7 @@ class ActiveJob(
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
val callSite: String,
val listener: JobListener) {
val numPartitions = partitions.length

Просмотреть файл

@ -121,7 +121,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]], priority: Int): Stage = {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of splits is unknown
logInfo("Registering RDD " + rdd.id + ": " + rdd)
logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
cacheTracker.registerRDD(rdd.id, rdd.splits.size)
if (shuffleDep != None) {
mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.splits.size)
@ -144,7 +144,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of splits is unknown
logInfo("Registering parent RDD " + r.id + ": " + r)
logInfo("Registering parent RDD " + r.id + " (" + r.origin + ")")
cacheTracker.registerRDD(r.id, r.splits.size)
for (dep <- r.dependencies) {
dep match {
@ -200,11 +200,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
val waiter = new JobWaiter(partitions.size)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, waiter))
val callSite = Utils.getSparkCallSite
eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter))
waiter.getResult() match {
case JobSucceeded(results: Seq[_]) =>
logInfo("Finished " + callSite)
return results.asInstanceOf[Seq[U]].toArray
case JobFailed(exception: Exception) =>
logInfo("Failed to run " + callSite)
throw exception
}
}
@ -219,7 +222,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.splits.size).toArray
eventQueue.put(JobSubmitted(rdd, func2, partitions, false, listener))
eventQueue.put(JobSubmitted(rdd, func2, partitions, false, Utils.getSparkCallSite, listener))
return listener.getResult() // Will throw an exception if the job fails
}
@ -239,13 +242,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
event match {
case JobSubmitted(finalRDD, func, partitions, allowLocal, listener) =>
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) =>
val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId)
val job = new ActiveJob(runId, finalStage, func, partitions, listener)
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)
updateCacheLocs()
logInfo("Got job " + job.runId + " with " + partitions.length + " output partitions")
logInfo("Final stage: " + finalStage)
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
" output partitions")
logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
@ -337,8 +341,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
logInfo("Submitting " + stage + " from " + stage.rdd.origin +
", which has no missing parents")
logInfo("Submitting " + stage + " (" + stage.origin + "), which has no missing parents")
submitMissingTasks(stage)
running += stage
} else {
@ -425,7 +428,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
stage.addOutputLoc(smt.partition, bmAddress)
}
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
logInfo(stage + " finished; looking for newly runnable stages")
logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages")
running -= stage
logInfo("running: " + running)
logInfo("waiting: " + waiting)
@ -439,7 +442,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (stage.outputLocs.count(_ == Nil) != 0) {
// Some tasks had failed; let's resubmit this stage
// TODO: Lower-level scheduler should also deal with this
logInfo("Resubmitting " + stage + " because some of its tasks had failed: " +
logInfo("Resubmitting " + stage + " (" + stage.origin +
") because some of its tasks had failed: " +
stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
submitStage(stage)
} else {
@ -453,8 +457,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
waiting --= newlyRunnable
running ++= newlyRunnable
for (stage <- newlyRunnable.sortBy(_.id)) {
logInfo("Submitting " + stage + " from " + stage.rdd.origin +
" which is now runnable")
logInfo("Submitting " + stage + " (" + stage.origin + "), which is now runnable")
submitMissingTasks(stage)
}
}
@ -471,12 +474,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
running -= failedStage
failed += failedStage
// TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " for resubmision due to a fetch failure")
logInfo("Marking " + failedStage + " (" + failedStage.origin +
") for resubmision due to a fetch failure")
// Mark the map whose fetch failed as broken in the map stage
val mapStage = shuffleToMapStage(shuffleId)
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
logInfo("The failed fetch was from " + mapStage + "; marking it for resubmission")
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.origin +
"); marking it for resubmission")
failed += mapStage
// Remember that a fetch failed now; this is used to resubmit the broken
// stages later, after a small wait (to give other tasks the chance to fail)

Просмотреть файл

@ -17,6 +17,7 @@ case class JobSubmitted(
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
callSite: String,
listener: JobListener)
extends DAGSchedulerEvent

Просмотреть файл

@ -80,6 +80,8 @@ class Stage(
return id
}
def origin: String = rdd.origin
override def toString = "Stage " + id // + ": [RDD = " + rdd.id + ", isShuffle = " + isShuffleMap + "]"
override def hashCode(): Int = id