diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 6883fb70f9..b5f67d1253 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -62,7 +62,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial @transient val dependencies: List[Dependency[_]] // Record user function generating this RDD - val origin = getOriginDescription + val origin = Utils.getSparkCallSite // Optionally overridden by subclasses to specify how they are partitioned val partitioner: Option[Partitioner] = None @@ -127,38 +127,6 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } } - // Describe which spark and user functions generated this RDD. Only works if called from - // constructor. - def getOriginDescription : String = { - val trace = Thread.currentThread().getStackTrace().filter( el => - (!el.getMethodName().contains("getStackTrace"))) - - // Keep crawling up the stack trace until we find the first function not inside of the spark - // package. We track the last (shallowest) contiguous Spark method. This might be an RDD - // transformation, a SparkContext function (such as parallelize), or anything else that leads - // to instantiation of an RDD. We also track the first (deepest) user method, file, and line. - var lastSparkMethod = "" - var firstUserMethod = "" - var firstUserFile = "" - var firstUserLine = -1 - var finished = false - - for (el <- trace) { - if (!finished) { - if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) { - lastSparkMethod = el.getMethodName() - } - else { - firstUserMethod = el.getMethodName() - firstUserLine = el.getLineNumber() - firstUserFile = el.getFileName() - finished = true - } - } - } - "%s at: %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine) - } - // Transformations (return a new RDD) def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index eb7d69e816..802277a251 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -12,7 +12,7 @@ import scala.io.Source /** * Various utility methods used by Spark. */ -object Utils extends Logging { +private object Utils extends Logging { /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -115,10 +115,8 @@ object Utils extends Logging { val out = new FileOutputStream(dest) copyStream(in, out, true) } - - - - /* Download a file from a given URL to the local filesystem */ + + /** Download a file from a given URL to the local filesystem */ def downloadFile(url: URL, localPath: String) { val in = url.openStream() val out = new FileOutputStream(localPath) @@ -349,4 +347,38 @@ object Utils extends Logging { def execute(command: Seq[String]) { execute(command, new File(".")) } + + + /** + * When called inside a class in the spark package, returns the name of the user code class + * (outside the spark package) that called into Spark, as well as which Spark method they called. + * This is used, for example, to tell users where in their code each RDD got created. + */ + def getSparkCallSite: String = { + val trace = Thread.currentThread().getStackTrace().filter( el => + (!el.getMethodName().contains("getStackTrace"))) + + // Keep crawling up the stack trace until we find the first function not inside of the spark + // package. We track the last (shallowest) contiguous Spark method. This might be an RDD + // transformation, a SparkContext function (such as parallelize), or anything else that leads + // to instantiation of an RDD. We also track the first (deepest) user method, file, and line. + var lastSparkMethod = "" + var firstUserFile = "" + var firstUserLine = 0 + var finished = false + + for (el <- trace) { + if (!finished) { + if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) { + lastSparkMethod = el.getMethodName() + } + else { + firstUserLine = el.getLineNumber() + firstUserFile = el.getFileName() + finished = true + } + } + } + "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine) + } } diff --git a/core/src/main/scala/spark/scheduler/ActiveJob.scala b/core/src/main/scala/spark/scheduler/ActiveJob.scala index 0ecff9ce77..e09b92d667 100644 --- a/core/src/main/scala/spark/scheduler/ActiveJob.scala +++ b/core/src/main/scala/spark/scheduler/ActiveJob.scala @@ -10,6 +10,7 @@ class ActiveJob( val finalStage: Stage, val func: (TaskContext, Iterator[_]) => _, val partitions: Array[Int], + val callSite: String, val listener: JobListener) { val numPartitions = partitions.length diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 4944f41e3a..0d9922766a 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -121,7 +121,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]], priority: Int): Stage = { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of splits is unknown - logInfo("Registering RDD " + rdd.id + ": " + rdd) + logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") cacheTracker.registerRDD(rdd.id, rdd.splits.size) if (shuffleDep != None) { mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.splits.size) @@ -144,7 +144,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with visited += r // Kind of ugly: need to register RDDs with the cache here since // we can't do it in its constructor because # of splits is unknown - logInfo("Registering parent RDD " + r.id + ": " + r) + logInfo("Registering parent RDD " + r.id + " (" + r.origin + ")") cacheTracker.registerRDD(r.id, r.splits.size) for (dep <- r.dependencies) { dep match { @@ -200,11 +200,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } val waiter = new JobWaiter(partitions.size) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] - eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, waiter)) + val callSite = Utils.getSparkCallSite + eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter)) waiter.getResult() match { case JobSucceeded(results: Seq[_]) => + logInfo("Finished " + callSite) return results.asInstanceOf[Seq[U]].toArray case JobFailed(exception: Exception) => + logInfo("Failed to run " + callSite) throw exception } } @@ -219,7 +222,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.splits.size).toArray - eventQueue.put(JobSubmitted(rdd, func2, partitions, false, listener)) + eventQueue.put(JobSubmitted(rdd, func2, partitions, false, Utils.getSparkCallSite, listener)) return listener.getResult() // Will throw an exception if the job fails } @@ -239,13 +242,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with } event match { - case JobSubmitted(finalRDD, func, partitions, allowLocal, listener) => + case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) => val runId = nextRunId.getAndIncrement() val finalStage = newStage(finalRDD, None, runId) - val job = new ActiveJob(runId, finalStage, func, partitions, listener) + val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener) updateCacheLocs() - logInfo("Got job " + job.runId + " with " + partitions.length + " output partitions") - logInfo("Final stage: " + finalStage) + logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + + " output partitions") + logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { @@ -337,8 +341,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing == Nil) { - logInfo("Submitting " + stage + " from " + stage.rdd.origin + - ", which has no missing parents") + logInfo("Submitting " + stage + " (" + stage.origin + "), which has no missing parents") submitMissingTasks(stage) running += stage } else { @@ -425,7 +428,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with stage.addOutputLoc(smt.partition, bmAddress) } if (running.contains(stage) && pendingTasks(stage).isEmpty) { - logInfo(stage + " finished; looking for newly runnable stages") + logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages") running -= stage logInfo("running: " + running) logInfo("waiting: " + waiting) @@ -439,7 +442,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with if (stage.outputLocs.count(_ == Nil) != 0) { // Some tasks had failed; let's resubmit this stage // TODO: Lower-level scheduler should also deal with this - logInfo("Resubmitting " + stage + " because some of its tasks had failed: " + + logInfo("Resubmitting " + stage + " (" + stage.origin + + ") because some of its tasks had failed: " + stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", ")) submitStage(stage) } else { @@ -453,8 +457,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with waiting --= newlyRunnable running ++= newlyRunnable for (stage <- newlyRunnable.sortBy(_.id)) { - logInfo("Submitting " + stage + " from " + stage.rdd.origin + - " which is now runnable") + logInfo("Submitting " + stage + " (" + stage.origin + "), which is now runnable") submitMissingTasks(stage) } } @@ -471,12 +474,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with running -= failedStage failed += failedStage // TODO: Cancel running tasks in the stage - logInfo("Marking " + failedStage + " for resubmision due to a fetch failure") + logInfo("Marking " + failedStage + " (" + failedStage.origin + + ") for resubmision due to a fetch failure") // Mark the map whose fetch failed as broken in the map stage val mapStage = shuffleToMapStage(shuffleId) mapStage.removeOutputLoc(mapId, bmAddress) mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress) - logInfo("The failed fetch was from " + mapStage + "; marking it for resubmission") + logInfo("The failed fetch was from " + mapStage + " (" + mapStage.origin + + "); marking it for resubmission") failed += mapStage // Remember that a fetch failed now; this is used to resubmit the broken // stages later, after a small wait (to give other tasks the chance to fail) diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala index 1322aae3a3..11f0ef6245 100644 --- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala @@ -17,6 +17,7 @@ case class JobSubmitted( func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean, + callSite: String, listener: JobListener) extends DAGSchedulerEvent diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index cd660c9085..b3ef8ac565 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -80,6 +80,8 @@ class Stage( return id } + def origin: String = rdd.origin + override def toString = "Stage " + id // + ": [RDD = " + rdd.id + ", isShuffle = " + isShuffleMap + "]" override def hashCode(): Int = id