This commit is contained in:
Renyi Xiong 2016-05-12 12:18:31 -07:00
Родитель 82ea61c8d9
Коммит 0e42f223cf
1 изменённых файлов: 32 добавлений и 0 удалений

Просмотреть файл

@ -11,12 +11,14 @@ import java.io.DataInputStream
import java.io.DataOutputStream
import java.net.Socket
import java.util.{ArrayList => JArrayList}
import java.util.concurrent.ThreadPoolExecutor
import scala.collection.JavaConversions._
import scala.language.existentials
import org.apache.spark.api.java._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.ThreadUtils
import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.api.java._
@ -232,10 +234,40 @@ class CSharpStateDStream(
override val mustCheckpoint = true
private val numParallelJobs = parent.ssc.sc.getConf.getInt("spark.mobius.streaming.parallelJobs", 1)
@transient private var jobExecutor : ThreadPoolExecutor = null
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
val lastState = getOrCompute(validTime - slideDuration)
val rdd = parent.getOrCompute(validTime)
if (rdd.isDefined) {
if (numParallelJobs > 0) {
val lastCompletedBatch = parent.ssc.progressListener.lastCompletedBatch
val lastBatchCompleted = validTime - slideDuration == zeroTime ||
lastCompletedBatch.isDefined && lastCompletedBatch.get.batchTime >= validTime - slideDuration
logInfo(s"Last batch completed: $lastBatchCompleted")
// if last batch already completed, no need to submit a parallel job
if (!lastBatchCompleted) {
if (jobExecutor == null) {
jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numParallelJobs, "mobius-parallel-job-executor")
}
rdd.get.cache()
val queue = new java.util.concurrent.LinkedBlockingQueue[Int]
val rddid = rdd.get.id
val runnable = new Runnable {
override def run(): Unit = {
logInfo(s"Starting rdd: $rddid $validTime")
parent.ssc.sc.runJob(rdd.get, (iterator: Iterator[Array[Byte]]) => {})
logInfo(s"Finished rdd: $rddid $validTime")
queue.put(rddid)
}
}
jobExecutor.execute(runnable)
logInfo(s"Waiting rdd: $rddid $validTime")
queue.take()
logInfo(s"Taken rdd: $rddid $validTime")
}
}
CSharpDStream.callCSharpTransform(List(lastState, rdd), validTime, reduceFunc,
List(serializationMode, serializationMode2))
} else {