Merge pull request #511 from xiongrenyi/directstream

add try finally block to parallel job since driver does not exit if the job fails
This commit is contained in:
Renyi Xiong 2016-07-28 13:19:49 -07:00 коммит произвёл GitHub
Родитель c753dbb75c fe89bf75e7
Коммит d20d2812b6
2 изменённых файлов: 58 добавлений и 11 удалений

Просмотреть файл

@ -260,12 +260,9 @@ class CSharpStateDStream(
private[streaming] def runParallelJob(validTime: Time, rdd: Option[RDD[Array[Byte]]]): Unit = {
if (numParallelJobs > 0) {
val lastCompletedBatch = parent.ssc.progressListener.lastCompletedBatch
val lastBatchCompleted = validTime - slideDuration == zeroTime ||
lastCompletedBatch.isDefined && lastCompletedBatch.get.batchTime >= validTime - slideDuration
logInfo(s"Last batch completed: $lastBatchCompleted")
// if last batch already completed, no need to submit a parallel job
if (!lastBatchCompleted) {
if (context.progressListener.numUnprocessedBatches > 0) {
logInfo(s"numUnprocessedBatches: ${context.progressListener.numUnprocessedBatches}")
if (jobExecutor == null) {
jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numParallelJobs, "mobius-parallel-job-executor")
}
@ -274,10 +271,13 @@ class CSharpStateDStream(
val rddid = rdd.get.id
val runnable = new Runnable {
override def run(): Unit = {
logInfo(s"Starting rdd: $rddid $validTime")
parent.ssc.sc.runJob(rdd.get, (iterator: Iterator[Array[Byte]]) => {})
logInfo(s"Finished rdd: $rddid $validTime")
queue.put(rddid)
try {
logInfo(s"Starting rdd: $rddid $validTime")
parent.ssc.sc.runJob(rdd.get, (iterator: Iterator[Array[Byte]]) => {})
logInfo(s"Finished rdd: $rddid $validTime")
} finally {
queue.put(rddid)
}
}
}
jobExecutor.execute(runnable)

Просмотреть файл

@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.csharp.SparkCLRFunSuite
import org.apache.spark.rdd.{LocalRDDCheckpointData, RDD}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.scheduler.{BatchInfo, StreamingListenerBatchCompleted}
import org.apache.spark.streaming.scheduler.{StreamingListenerBatchSubmitted, BatchInfo, StreamingListenerBatchCompleted}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
@ -17,9 +17,12 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import scala.collection.mutable.{ArrayBuffer, Queue}
import scala.reflect.ClassTag
private class MockSparkContext(config: SparkConf) extends SparkContext(config) {
private class MockSparkContext(config: SparkConf, throwException: Boolean = false) extends SparkContext(config) {
var numParallelJobs = 0
override def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
if (throwException) {
throw new Exception("test")
}
numParallelJobs = numParallelJobs + 1
Array.empty
}
@ -133,6 +136,16 @@ class CSharpDStreamSuite extends SparkCLRFunSuite with BeforeAndAfterAll with Be
actual = sc.numParallelJobs
assert(sc.numParallelJobs == 0, s", first batch expected parallel jobs: 0; actual: $actual")
val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo(
batchTime = Time(1000L),
streamIdToInputInfo = Map(),
submissionTime = 0L,
null,
null,
outputOperationInfos = Map()
))
// mark previous batch as submitted
ssc.progressListener.onBatchSubmitted(batchSubmitted)
// unblock current batch by running a parallel job if previous batch not completed yet
sc.numParallelJobs = 0
ds.runParallelJob(new Time(2000), Some(sc.emptyRDD[Array[Byte]]))
@ -162,6 +175,40 @@ class CSharpDStreamSuite extends SparkCLRFunSuite with BeforeAndAfterAll with Be
sc.stop()
}
}
test("runParallelJob in UpdateStateByKey with exception") {
val conf = new SparkConf().setAppName("test").setMaster("local").set("spark.testing", "true")
val sc = new MockSparkContext(conf, true)
val ssc = new StreamingContext(sc, new Duration(1000))
val parent = ssc.binaryRecordsStream("test", 0)
try {
ssc.conf.set("spark.mobius.streaming.parallelJobs", "1")
var ds = new MockCSharpStateDStream(parent, null, null, null)
ds.zeroTime = new Time(0)
val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo(
batchTime = Time(1000L),
streamIdToInputInfo = Map(),
submissionTime = 0L,
null,
null,
outputOperationInfos = Map()
))
// mark previous batch as submitted
ssc.progressListener.onBatchSubmitted(batchSubmitted)
// unblock current batch by running a parallel job if previous batch not completed yet
sc.numParallelJobs = 0
ds.runParallelJob(new Time(2000), Some(sc.emptyRDD[Array[Byte]]))
var actual = sc.numParallelJobs
assert(sc.numParallelJobs == 0, s", parallel job failed, expected completed parallel jobs: 0; actual: $actual")
} finally {
sc.stop()
}
}
}
class CSharpInputDStreamSuite extends SparkCLRFunSuite {