Add option to turn on/off local checkpoint for CSharpStateDStream

tawan0109 2016-07-08 17:51:13 +08:00
Родитель e75eb189e0
Коммит d3999ca43d
3 изменённых файлов: 320 добавлений и 22 удалений

@ -30,6 +30,10 @@ import scala.language.existentials
object CSharpDStream {
// Variables for debugging
var debugMode = false
var debugRDD: Option[RDD[_]] = None
* helper function for DStream.foreachRDD().
@ -43,6 +47,11 @@ object CSharpDStream {
def callCSharpTransform(rdds: List[Option[RDD[_]]], time: Time, cSharpfunc: Array[Byte],
serializationModeList: List[String]): Option[RDD[Array[Byte]]] = {
//debug mode enabled
if (debugMode) {
return debugRDD.asInstanceOf[Option[RDD[Array[Byte]]]]
var socket: Socket = null
try {
socket = CSharpBackend.callbackSockets.poll()
@ -229,13 +238,22 @@ class CSharpStateDStream(
serializationMode2: String)
extends DStream[Array[Byte]](parent.ssc) {
val localCheckpointEnabled ="spark.mobius.streaming.localCheckpoint.enabled", false)
logInfo("Local checkpoint is enabled: " + localCheckpointEnabled)
if (localCheckpointEnabled) {
val replicas ="spark.mobius.streaming.localCheckpoint.replicas", 3)
logInfo("spark.mobius.localCheckpoint.replicas is set to " + replicas)
super.persist(StorageLevel(true, true, false, false, replicas))
} else {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override val mustCheckpoint = true
override val mustCheckpoint = !localCheckpointEnabled
private val numParallelJobs ="spark.mobius.streaming.parallelJobs", 0)
@transient private[streaming] var jobExecutor : ThreadPoolExecutor = null
@ -275,8 +293,12 @@ class CSharpStateDStream(
val rdd = parent.getOrCompute(validTime)
if (rdd.isDefined) {
runParallelJob(validTime, rdd)
CSharpDStream.callCSharpTransform(List(lastState, rdd), validTime, reduceFunc,
val state = CSharpDStream.callCSharpTransform(List(lastState, rdd), validTime, reduceFunc,
List(serializationMode, serializationMode2))
if(localCheckpointEnabled && state.isDefined) {
} else {

@ -5,26 +5,29 @@
package org.apache.spark.streaming.kafka
import scala.annotation.tailrec
import scala.collection.mutable
import scala.reflect.ClassTag
import java.util.concurrent.{ConcurrentHashMap, TimeUnit, ScheduledExecutorService}
import java.beans.Transient
import{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter}
import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit}
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.rdd.{EmptyRDD, UnionRDD, RDD}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{EmptyRDD, RDD, UnionRDD}
import org.apache.spark.Logging
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.api.csharp.{CSharpDStream, RddPreComputeProcessor, RddPreComputeRecord}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.util.ThreadUtils
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.streaming.api.csharp.{RddPreComputeRecord, RddPreComputeProcessor, CSharpDStream}
import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
* A stream of the new DynamicPartitionKafkaRDD to support where
@ -36,7 +39,8 @@ import org.apache.spark.streaming.api.csharp.{RddPreComputeRecord, RddPreCompute
* so that you can control exactly-once semantics.
* For an easy interface to Kafka-managed offsets,
* see {@link org.apache.spark.streaming.kafka.KafkaCluster}
* @param kafkaParams Kafka <a href="">
* @param kafkaParams Kafka <a href="">
* configuration parameters</a>.
* Requires "" or "bootstrap.servers" to be set with Kafka broker(s),
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
@ -103,6 +107,26 @@ class DynamicPartitionKafkaInputDStream[
@transient private[streaming] var preComputeOutputMap: ConcurrentHashMap[Time, List[RddPreComputeRecord[R]]] = null
@transient private var rddPreComputeProcessor: RddPreComputeProcessor[R] = null
@transient private val dc = kafkaParams.getOrElse("", "unknown_cluster").replace(" ", "_")
@transient private val enableOffsetCheckpoint = context.sparkContext.getConf.getBoolean("spark.mobius.streaming.kafka.enableOffsetCheckpoint", false)
@transient private val offsetCheckpointDir: Path = if(enableOffsetCheckpoint && context.sparkContext.checkpointDir.isDefined) new Path(new Path(context.sparkContext.checkpointDir.get).getParent, "kafka-checkpoint") else null
@transient private lazy val fs: FileSystem = if(enableOffsetCheckpoint && offsetCheckpointDir != null) offsetCheckpointDir.getFileSystem(SparkHadoopUtil.get.conf) else null
@transient private val replayBatches = context.sparkContext.getConf.getInt("spark.mobius.streaming.kafka.replayBatches", 0)
@transient private val maxRetainedOffsetCheckpoints = context.sparkContext.getConf.getInt("spark.mobius.streaming.kafka.maxRetainedOffsetCheckpoints", 0) match {
case n if n > replayBatches => n
case _: Int => replayBatches
if (enableOffsetCheckpoint && fs != null) {
if (!fs.exists(offsetCheckpointDir)) {
} else if (!fs.isDirectory(offsetCheckpointDir)) {
throw new SparkException("Offset checkpoint dir: " + offsetCheckpointDir + " is not a directory.")
import OffsetRangeCheckpointHandler._
private def refreshPartitions(): Unit = {
if (!hasFetchedAllPartitions) {
kc.getPartitions2(topics) match {
@ -299,10 +323,22 @@ class DynamicPartitionKafkaInputDStream[
offsetsRangeForNextBatch = None
case (from, until) =>
val rdd = DynamicPartitionKafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, from, until, messageHandler, numPartitions)
if(enableOffsetCheckpoint) {
val offsetRanges = { case (tp, fo) =>
val uo = until(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
// Save offset ranges to staging directory
logInfo(s"Save offset range to HDFS, time: $validTime, topic: $topics, path: $offsetCheckpointDir")
offsetRanges.groupBy(_.topic).foreach{ case (topic, ranges) => writeCheckpoint(validTime, ranges.toList, topic, dc, offsetCheckpointDir, fs) }
Some(callCSharpTransform(rdd, validTime))
@ -369,10 +405,40 @@ class DynamicPartitionKafkaInputDStream[
if (enableOffsetCheckpoint && offsetCheckpointDir != null) {
commitCheckpoint(offsetCheckpointDir, time, new Duration(replayBatches * slideDuration.milliseconds), maxRetainedOffsetCheckpoints, fs)
override def start(): Unit = {
if (enableOffsetCheckpoint && offsetCheckpointDir != null) {
logInfo("Offset range checkpoint is enabled.")
val offsetRanges = topics.flatMap(readCheckpoint(offsetCheckpointDir, new Duration(replayBatches * slideDuration.milliseconds), _, dc, fs))
.flatten.groupBy(range => range.topicAndPartition())
.map { case (tp, ranges) => tp ->(ranges.minBy(_.fromOffset).fromOffset, ranges.maxBy(_.untilOffset).untilOffset) }
logInfo("Find " + offsetRanges.size + " offset ranges for topics:" + topics + ", dc:" + dc)
if (!offsetRanges.isEmpty) {
val leaders = if (kafkaParams("").isEmpty)
Map[TopicAndPartition, (String, Int)]()
// remove those topics which don't have leaders
val filteredOffsetRanges = offsetRanges.filterKeys(leaders.contains(_))
offsetsRangeForNextBatch = Some( { case (tp, (minFromOffset, maxUntilOffset)) => tp -> minFromOffset }, { case (tp, (minFromOffset, maxUntilOffset)) => tp -> new LeaderOffset(leaders(tp)._1, leaders(tp)._2, maxUntilOffset)
logInfo(s"Loaded offset range from checkpoint, topics: $topics, dc: $dc")
// remove all *staging directories
deleteUncommittedCheckpoint(offsetCheckpointDir, fs)
@ -426,3 +492,144 @@ class DynamicPartitionKafkaInputDStream[
private[streaming] object OffsetRangeCheckpointHandler extends Logging {
val LOCK = new Object()
val PREFIX = "offsets-"
val REGEX = (PREFIX + """([\d]+)$""").r
val STAGING_REGEX = (PREFIX + """([\d]+)\.staging$""").r
val SEP = "\t"
@Transient var lastCommittedBatchTim: Time = null
def checkpointDirForBatch(checkpointPath: Path, time: Time)(implicit staging: Boolean = false): Path = {
if (staging) new Path(checkpointPath, PREFIX + time.milliseconds + ".staging") else new Path(checkpointPath, PREFIX + time.milliseconds)
def checkpointPath(time: Time, topic: String, dc: String, checkpointDir: Path, staging: Boolean): Path = {
new Path(checkpointDirForBatch(checkpointDir, time)(staging), s"$dc-$topic")
def commitCheckpoint(checkpointDir: Path, batchTime: Time, replayDuration: Duration, maxRetainedCheckpoints: Int, fs: FileSystem): Unit = {
logInfo("Attempt to commit offsets checkpoint, batchTime:" + batchTime)
LOCK.synchronized {
Utils.tryWithSafeFinally {
// When offset ranges checkpoint enabled, the 1st batch is to replay the checkpointed offsets,
// in this case don't need to commit the offsets
if (lastCommittedBatchTim == null || batchTime == lastCommittedBatchTim) {
val offsetsCheckpointDir = checkpointDirForBatch(checkpointDir, batchTime)
if (fs.exists(offsetsCheckpointDir)) {
logInfo(s"Checkpoint directory for time: $batchTime already exists: $offsetsCheckpointDir")
} else {
val stagingDir = checkpointDirForBatch(checkpointDir, batchTime)(true)
if (!fs.exists(stagingDir)) {
logError(s"Checkpoint staging directory for time: $batchTime does not exists, path: $stagingDir.")
} else {
fs.rename(stagingDir, offsetsCheckpointDir)
logInfo(s"Successfully commit offset ranges for time: $batchTime, path: $offsetsCheckpointDir")
} {
lastCommittedBatchTim = batchTime
//remove expired offset checkpoint
fs.listStatus(checkpointDir, new PathFilter() {
def accept(path: Path): Boolean = {
}).sortBy(_.getPath.getName).reverse.drop(maxRetainedCheckpoints).foreach(s => {
fs.delete(s.getPath, true)
logWarning("Deleted offset checkpoint: " + s.getPath)
def writeCheckpoint(batchTime: Time, offsetRanges: List[OffsetRange], topic: String, dc: String, checkpointDir: Path, fs: FileSystem): Path = {
// Remove existing staging directory first
val offsetCheckpointPath = checkpointPath(batchTime, topic, dc, checkpointDir, true)
// Write checkpoint file
if (fs.exists(offsetCheckpointPath)) {
logError("Offset checkpoint file " + offsetCheckpointPath.toString + "already exists, this should not happen, delete it.")
fs.delete(offsetCheckpointPath, true)
val bo = new BufferedWriter(new OutputStreamWriter(fs.create(offsetCheckpointPath), "UTF-8"))
Utils.tryWithSafeFinally {
offsetRanges.sortBy(_.fromOffset).foreach { kv =>
offsetRanges.foreach { range =>
//schema: {topic}\t{patition}\t{fromOffset}\t{untilOffset}
logInfo(s"Checkpointed offset ranges for dc $dc time $batchTime offsetRange ${offsetRanges.size}")
} {
def readOffsets(path: Path, fs: FileSystem): Seq[OffsetRange] = {
logInfo("Trying to read offsets from " + path)
val br = new BufferedReader(new InputStreamReader(, "UTF-8"))
Utils.tryWithSafeFinally {
val offsetRanges = Iterator.continually(br.readLine()).takeWhile(_ != null).map(_.split(SEP)).map(r => OffsetRange(r(0), r(1).toInt, r(2).toLong, r(3).toLong)).toSeq
logInfo(s"Read offset ranges from $path partitions ${offsetRanges.length}")
} {
def readCheckpoint(checkpointPath: Path, replayDuration: Duration, topic: String, dc: String, fs: FileSystem): Option[Seq[OffsetRange]] = {
// Try to find offset checkpoint files
val checkpointFiles = fs.listStatus(checkpointPath, new PathFilter() {
def accept(path: Path): Boolean = {
if (checkpointFiles.isEmpty) {
logInfo("Could not find any offset checkpoint files.")
return None
val offsetRanges = new ListBuffer[OffsetRange]()
val latestReplayBatchTime = new Time(checkpointFiles.head.getName.substring(PREFIX.length).toLong)
logInfo("Batch time for the latest offset checkpoint file is: " + latestReplayBatchTime.milliseconds)
val earliestReplayBatchTime = latestReplayBatchTime - replayDuration
logInfo("Batch time for the earliest offset checkpoint file is: " + earliestReplayBatchTime.milliseconds)
checkpointFiles.filter(p => new Time(p.getName.substring(PREFIX.length).toLong) > earliestReplayBatchTime).foreach(path => {
logInfo("Attempting to load checkpoint from directory " + path)
val statues = fs.listStatus(path)
if (statues != null) {
val paths ="$dc-$topic"))
paths.foreach(p => offsetRanges.appendAll(readOffsets(p, fs)))
} else {
logWarning("Listing " + path + "returned null")
logInfo(s"Read offsets with topic: $topic, dc: $dc, ranges: " + offsetRanges)
def deleteUncommittedCheckpoint(checkpointPath: Path, fs: FileSystem): Unit = {
LOCK.synchronized {
fs.listStatus(checkpointPath, new PathFilter() {
def accept(path: Path): Boolean = {
}).foreach { p =>
fs.delete(p.getPath, true)
logInfo("Delete staging directory: " + p.getPath)

@ -4,13 +4,17 @@
package org.apache.spark.streaming.api.csharp
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.scheduler.{BatchInfo, StreamingListenerBatchCompleted}
import org.apache.spark.streaming.{StreamingContext, Duration, Time}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.csharp.SparkCLRFunSuite
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.csharp.SparkCLRFunSuite
import org.apache.spark.rdd.{LocalRDDCheckpointData, RDD}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.scheduler.{BatchInfo, StreamingListenerBatchCompleted}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import scala.collection.mutable.{ArrayBuffer, Queue}
import scala.reflect.ClassTag
private class MockSparkContext(config: SparkConf) extends SparkContext(config) {
@ -32,7 +36,72 @@ private class MockCSharpStateDStream(
override def slideDuration: Duration = new Duration(1000)
class CSharpDStreamSuite extends SparkCLRFunSuite {
class CSharpDStreamSuite extends SparkCLRFunSuite with BeforeAndAfterAll with BeforeAndAfter {
private var conf: SparkConf = null
private var sc: SparkContext = null
before {
StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
if (sc != null && !sc.stopped.get) {
after {
StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
if (sc != null && !sc.stopped.get) {
test("CSharpStateDStream - local checkpoint enabled") {
val replicas = 2
conf = new SparkConf().setMaster("local[*]").setAppName("CSharpStateDStream")
conf.set("spark.mobius.streaming.localCheckpoint.enabled", "true")
conf.set("spark.mobius.streaming.localCheckpoint.replicas", replicas.toString)
// set reserved memory to 50M so the min system memory only needs 50M * 1.5 = 70.5M
conf.set("spark.testing.reservedMemory", "" + 50 * 1024 * 1024)
sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))
// Set up CSharpDStream
val expectedRDD = sc.makeRDD(ArrayBuffer.fill(10)("a".getBytes))
CSharpDStream.debugMode = true
CSharpDStream.debugRDD = Some(expectedRDD)
// Construct CSharpStateDStream
val parentDStream = ssc.queueStream[Array[Byte]](Queue(sc.makeRDD(ArrayBuffer.fill(10)("a".getBytes))), true)
val stateDStream = new CSharpStateDStream(parentDStream, new Array[Byte](0), "byte", "byte")
val rddCount = new AtomicInteger(0) // counter to indicate how many batches have finished
stateDStream.foreachRDD { stateRDD =>
// asserts
assert(stateRDD.getStorageLevel.replication == replicas)
// check whether first batch finished
val startWaitingTimeInMills = System.currentTimeMillis()
val maxWaitingTimeInSecs = 20
while(rddCount.get() < 1) {
if((System.currentTimeMillis() - startWaitingTimeInMills) > maxWaitingTimeInSecs * 1000) {
// if waiting time exceeds `maxWaitingTimeInSecs` secs, will fail current test.
fail(s"Total running time exceeds $maxWaitingTimeInSecs, fail current test.")
assert(rddCount.get() >= 1)
test("runParallelJob in UpdateStateByKey") {