From d3999ca43d5073a743f94ea862c9ca3299cdd4b5 Mon Sep 17 00:00:00 2001 From: tawan0109 Date: Fri, 8 Jul 2016 17:51:13 +0800 Subject: [PATCH] Add option to turn on/off local checkpoint for CSharpStateDStream --- .../streaming/api/csharp/CSharpDStream.scala | 28 ++- .../DynamicPartitionKafkaInputDStream.scala | 231 +++++++++++++++++- .../api/csharp/CSharpDStreamSuite.scala | 83 ++++++- 3 files changed, 320 insertions(+), 22 deletions(-) diff --git a/scala/src/main/org/apache/spark/streaming/api/csharp/CSharpDStream.scala b/scala/src/main/org/apache/spark/streaming/api/csharp/CSharpDStream.scala index e522d7d..75f95ba 100644 --- a/scala/src/main/org/apache/spark/streaming/api/csharp/CSharpDStream.scala +++ b/scala/src/main/org/apache/spark/streaming/api/csharp/CSharpDStream.scala @@ -30,6 +30,10 @@ import scala.language.existentials object CSharpDStream { + // Variables for debugging + var debugMode = false + var debugRDD: Option[RDD[_]] = None + /** * helper function for DStream.foreachRDD(). */ @@ -43,6 +47,11 @@ object CSharpDStream { def callCSharpTransform(rdds: List[Option[RDD[_]]], time: Time, cSharpfunc: Array[Byte], serializationModeList: List[String]): Option[RDD[Array[Byte]]] = { + //debug mode enabled + if (debugMode) { + return debugRDD.asInstanceOf[Option[RDD[Array[Byte]]]] + } + var socket: Socket = null try { socket = CSharpBackend.callbackSockets.poll() @@ -229,13 +238,22 @@ class CSharpStateDStream( serializationMode2: String) extends DStream[Array[Byte]](parent.ssc) { - super.persist(StorageLevel.MEMORY_ONLY) + val localCheckpointEnabled = context.sc.conf.getBoolean("spark.mobius.streaming.localCheckpoint.enabled", false) + logInfo("Local checkpoint is enabled: " + localCheckpointEnabled) + + if (localCheckpointEnabled) { + val replicas = context.sc.conf.getInt("spark.mobius.streaming.localCheckpoint.replicas", 3) + logInfo("spark.mobius.localCheckpoint.replicas is set to " + replicas) + super.persist(StorageLevel(true, true, false, false, replicas)) + } else { + super.persist(StorageLevel.MEMORY_ONLY) + } override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration - override val mustCheckpoint = true + override val mustCheckpoint = !localCheckpointEnabled private val numParallelJobs = parent.ssc.sc.getConf.getInt("spark.mobius.streaming.parallelJobs", 0) @transient private[streaming] var jobExecutor : ThreadPoolExecutor = null @@ -275,8 +293,12 @@ class CSharpStateDStream( val rdd = parent.getOrCompute(validTime) if (rdd.isDefined) { runParallelJob(validTime, rdd) - CSharpDStream.callCSharpTransform(List(lastState, rdd), validTime, reduceFunc, + val state = CSharpDStream.callCSharpTransform(List(lastState, rdd), validTime, reduceFunc, List(serializationMode, serializationMode2)) + if(localCheckpointEnabled && state.isDefined) { + state.get.localCheckpoint() + } + state } else { lastState } diff --git a/scala/src/main/org/apache/spark/streaming/api/kafka/DynamicPartitionKafkaInputDStream.scala b/scala/src/main/org/apache/spark/streaming/api/kafka/DynamicPartitionKafkaInputDStream.scala index fb15256..792efaa 100644 --- a/scala/src/main/org/apache/spark/streaming/api/kafka/DynamicPartitionKafkaInputDStream.scala +++ b/scala/src/main/org/apache/spark/streaming/api/kafka/DynamicPartitionKafkaInputDStream.scala @@ -5,26 +5,29 @@ package org.apache.spark.streaming.kafka -import scala.annotation.tailrec -import scala.collection.mutable -import scala.reflect.ClassTag - -import java.util.concurrent.{ConcurrentHashMap, TimeUnit, ScheduledExecutorService} +import java.beans.Transient +import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter} +import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, TimeUnit} import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.Decoder - -import org.apache.spark.rdd.{EmptyRDD, UnionRDD, RDD} +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.{EmptyRDD, RDD, UnionRDD} import org.apache.spark.storage.StorageLevel -import org.apache.spark.Logging -import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.api.csharp.{CSharpDStream, RddPreComputeProcessor, RddPreComputeRecord} import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler.StreamInputInfo -import org.apache.spark.util.ThreadUtils +import org.apache.spark.streaming.{Duration, StreamingContext, Time} +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.{Logging, SparkException} -import org.apache.spark.streaming.api.csharp.{RddPreComputeRecord, RddPreComputeProcessor, CSharpDStream} +import scala.annotation.tailrec +import scala.collection.mutable +import scala.collection.mutable.ListBuffer +import scala.reflect.ClassTag /** * A stream of the new DynamicPartitionKafkaRDD to support where @@ -36,7 +39,8 @@ import org.apache.spark.streaming.api.csharp.{RddPreComputeRecord, RddPreCompute * so that you can control exactly-once semantics. * For an easy interface to Kafka-managed offsets, * see {@link org.apache.spark.streaming.kafka.KafkaCluster} - * @param kafkaParams Kafka + * + * @param kafkaParams Kafka * configuration parameters. * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), * NOT zookeeper servers, specified in host1:port1,host2:port2 form. @@ -103,6 +107,26 @@ class DynamicPartitionKafkaInputDStream[ @transient private[streaming] var preComputeOutputMap: ConcurrentHashMap[Time, List[RddPreComputeRecord[R]]] = null @transient private var rddPreComputeProcessor: RddPreComputeProcessor[R] = null + @transient private val dc = kafkaParams.getOrElse("cluster.id", "unknown_cluster").replace(" ", "_") + @transient private val enableOffsetCheckpoint = context.sparkContext.getConf.getBoolean("spark.mobius.streaming.kafka.enableOffsetCheckpoint", false) + @transient private val offsetCheckpointDir: Path = if(enableOffsetCheckpoint && context.sparkContext.checkpointDir.isDefined) new Path(new Path(context.sparkContext.checkpointDir.get).getParent, "kafka-checkpoint") else null + @transient private lazy val fs: FileSystem = if(enableOffsetCheckpoint && offsetCheckpointDir != null) offsetCheckpointDir.getFileSystem(SparkHadoopUtil.get.conf) else null + @transient private val replayBatches = context.sparkContext.getConf.getInt("spark.mobius.streaming.kafka.replayBatches", 0) + @transient private val maxRetainedOffsetCheckpoints = context.sparkContext.getConf.getInt("spark.mobius.streaming.kafka.maxRetainedOffsetCheckpoints", 0) match { + case n if n > replayBatches => n + case _: Int => replayBatches + } + + if (enableOffsetCheckpoint && fs != null) { + if (!fs.exists(offsetCheckpointDir)) { + fs.mkdirs(offsetCheckpointDir) + } else if (!fs.isDirectory(offsetCheckpointDir)) { + throw new SparkException("Offset checkpoint dir: " + offsetCheckpointDir + " is not a directory.") + } + } + + import OffsetRangeCheckpointHandler._ + private def refreshPartitions(): Unit = { if (!hasFetchedAllPartitions) { kc.getPartitions2(topics) match { @@ -299,10 +323,22 @@ class DynamicPartitionKafkaInputDStream[ offsetsRangeForNextBatch = None tmpOffsetsRange } + offsetsRange.flatMap({ case (from, until) => val rdd = DynamicPartitionKafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, from, until, messageHandler, numPartitions) + + if(enableOffsetCheckpoint) { + val offsetRanges = from.map { case (tp, fo) => + val uo = until(tp) + OffsetRange(tp.topic, tp.partition, fo, uo.offset) + } + // Save offset ranges to staging directory + logInfo(s"Save offset range to HDFS, time: $validTime, topic: $topics, path: $offsetCheckpointDir") + offsetRanges.groupBy(_.topic).foreach{ case (topic, ranges) => writeCheckpoint(validTime, ranges.toList, topic, dc, offsetCheckpointDir, fs) } + } + Some(callCSharpTransform(rdd, validTime)) }) } @@ -369,10 +405,40 @@ class DynamicPartitionKafkaInputDStream[ } } + if (enableOffsetCheckpoint && offsetCheckpointDir != null) { + commitCheckpoint(offsetCheckpointDir, time, new Duration(replayBatches * slideDuration.milliseconds), maxRetainedOffsetCheckpoints, fs) + } + super.clearMetadata(time) } override def start(): Unit = { + if (enableOffsetCheckpoint && offsetCheckpointDir != null) { + logInfo("Offset range checkpoint is enabled.") + val offsetRanges = topics.flatMap(readCheckpoint(offsetCheckpointDir, new Duration(replayBatches * slideDuration.milliseconds), _, dc, fs)) + .flatten.groupBy(range => range.topicAndPartition()) + .map { case (tp, ranges) => tp ->(ranges.minBy(_.fromOffset).fromOffset, ranges.maxBy(_.untilOffset).untilOffset) } + logInfo("Find " + offsetRanges.size + " offset ranges for topics:" + topics + ", dc:" + dc) + + if (!offsetRanges.isEmpty) { + val leaders = if (kafkaParams("metadata.broker.list").isEmpty) + Map[TopicAndPartition, (String, Int)]() + else + KafkaCluster.checkErrors(kc.findLeaders(offsetRanges.keySet)) + + // remove those topics which don't have leaders + val filteredOffsetRanges = offsetRanges.filterKeys(leaders.contains(_)) + offsetsRangeForNextBatch = Some( + filteredOffsetRanges.map { case (tp, (minFromOffset, maxUntilOffset)) => tp -> minFromOffset }, + filteredOffsetRanges.map { case (tp, (minFromOffset, maxUntilOffset)) => tp -> new LeaderOffset(leaders(tp)._1, leaders(tp)._2, maxUntilOffset) + } + ) + logInfo(s"Loaded offset range from checkpoint, topics: $topics, dc: $dc") + } + // remove all *staging directories + deleteUncommittedCheckpoint(offsetCheckpointDir, fs) + } + initializeReceiver() instantiateAndStartRefreshOffsetsScheduler() } @@ -426,3 +492,144 @@ class DynamicPartitionKafkaInputDStream[ } } } + +private[streaming] object OffsetRangeCheckpointHandler extends Logging { + val LOCK = new Object() + val PREFIX = "offsets-" + val REGEX = (PREFIX + """([\d]+)$""").r + val STAGING_REGEX = (PREFIX + """([\d]+)\.staging$""").r + val SEP = "\t" + + @Transient var lastCommittedBatchTim: Time = null + + def checkpointDirForBatch(checkpointPath: Path, time: Time)(implicit staging: Boolean = false): Path = { + if (staging) new Path(checkpointPath, PREFIX + time.milliseconds + ".staging") else new Path(checkpointPath, PREFIX + time.milliseconds) + } + + def checkpointPath(time: Time, topic: String, dc: String, checkpointDir: Path, staging: Boolean): Path = { + new Path(checkpointDirForBatch(checkpointDir, time)(staging), s"$dc-$topic") + } + + def commitCheckpoint(checkpointDir: Path, batchTime: Time, replayDuration: Duration, maxRetainedCheckpoints: Int, fs: FileSystem): Unit = { + logInfo("Attempt to commit offsets checkpoint, batchTime:" + batchTime) + LOCK.synchronized { + Utils.tryWithSafeFinally { + // When offset ranges checkpoint enabled, the 1st batch is to replay the checkpointed offsets, + // in this case don't need to commit the offsets + if (lastCommittedBatchTim == null || batchTime == lastCommittedBatchTim) { + return + } + + val offsetsCheckpointDir = checkpointDirForBatch(checkpointDir, batchTime) + if (fs.exists(offsetsCheckpointDir)) { + logInfo(s"Checkpoint directory for time: $batchTime already exists: $offsetsCheckpointDir") + } else { + val stagingDir = checkpointDirForBatch(checkpointDir, batchTime)(true) + if (!fs.exists(stagingDir)) { + logError(s"Checkpoint staging directory for time: $batchTime does not exists, path: $stagingDir.") + } else { + fs.rename(stagingDir, offsetsCheckpointDir) + logInfo(s"Successfully commit offset ranges for time: $batchTime, path: $offsetsCheckpointDir") + } + } + } { + lastCommittedBatchTim = batchTime + } + + //remove expired offset checkpoint + fs.listStatus(checkpointDir, new PathFilter() { + def accept(path: Path): Boolean = { + REGEX.findFirstIn(path.getName).nonEmpty + } + }).sortBy(_.getPath.getName).reverse.drop(maxRetainedCheckpoints).foreach(s => { + fs.delete(s.getPath, true) + logWarning("Deleted offset checkpoint: " + s.getPath) + } + ) + } + } + + def writeCheckpoint(batchTime: Time, offsetRanges: List[OffsetRange], topic: String, dc: String, checkpointDir: Path, fs: FileSystem): Path = { + // Remove existing staging directory first + val offsetCheckpointPath = checkpointPath(batchTime, topic, dc, checkpointDir, true) + // Write checkpoint file + if (fs.exists(offsetCheckpointPath)) { + logError("Offset checkpoint file " + offsetCheckpointPath.toString + "already exists, this should not happen, delete it.") + fs.delete(offsetCheckpointPath, true) + } + + val bo = new BufferedWriter(new OutputStreamWriter(fs.create(offsetCheckpointPath), "UTF-8")) + Utils.tryWithSafeFinally { + offsetRanges.sortBy(_.fromOffset).foreach { kv => + offsetRanges.foreach { range => + //schema: {topic}\t{patition}\t{fromOffset}\t{untilOffset} + bo.write(s"$topic$SEP${range.partition}$SEP${range.fromOffset}$SEP${range.untilOffset}") + bo.newLine + } + } + logInfo(s"Checkpointed offset ranges for dc $dc time $batchTime offsetRange ${offsetRanges.size}") + } { + bo.close + } + offsetCheckpointPath + } + + def readOffsets(path: Path, fs: FileSystem): Seq[OffsetRange] = { + logInfo("Trying to read offsets from " + path) + val br = new BufferedReader(new InputStreamReader(fs.open(path), "UTF-8")) + Utils.tryWithSafeFinally { + val offsetRanges = Iterator.continually(br.readLine()).takeWhile(_ != null).map(_.split(SEP)).map(r => OffsetRange(r(0), r(1).toInt, r(2).toLong, r(3).toLong)).toSeq + logInfo(s"Read offset ranges from $path partitions ${offsetRanges.length}") + offsetRanges + } { + br.close + } + } + + def readCheckpoint(checkpointPath: Path, replayDuration: Duration, topic: String, dc: String, fs: FileSystem): Option[Seq[OffsetRange]] = { + // Try to find offset checkpoint files + val checkpointFiles = fs.listStatus(checkpointPath, new PathFilter() { + def accept(path: Path): Boolean = { + REGEX.findFirstIn(path.getName).nonEmpty + } + }).map(_.getPath).sortBy(_.getName).reverse + + if (checkpointFiles.isEmpty) { + logInfo("Could not find any offset checkpoint files.") + return None + } + + val offsetRanges = new ListBuffer[OffsetRange]() + val latestReplayBatchTime = new Time(checkpointFiles.head.getName.substring(PREFIX.length).toLong) + logInfo("Batch time for the latest offset checkpoint file is: " + latestReplayBatchTime.milliseconds) + val earliestReplayBatchTime = latestReplayBatchTime - replayDuration + logInfo("Batch time for the earliest offset checkpoint file is: " + earliestReplayBatchTime.milliseconds) + + checkpointFiles.filter(p => new Time(p.getName.substring(PREFIX.length).toLong) > earliestReplayBatchTime).foreach(path => { + logInfo("Attempting to load checkpoint from directory " + path) + val statues = fs.listStatus(path) + if (statues != null) { + val paths = statues.map(_.getPath).filter(_.getName.endsWith(s"$dc-$topic")) + paths.foreach(p => offsetRanges.appendAll(readOffsets(p, fs))) + } else { + logWarning("Listing " + path + "returned null") + } + }) + logInfo(s"Read offsets with topic: $topic, dc: $dc, ranges: " + offsetRanges) + Some(offsetRanges) + } + + def deleteUncommittedCheckpoint(checkpointPath: Path, fs: FileSystem): Unit = { + LOCK.synchronized { + fs.listStatus(checkpointPath, new PathFilter() { + def accept(path: Path): Boolean = { + STAGING_REGEX.findFirstIn(path.getName).nonEmpty + } + }).foreach { p => + fs.delete(p.getPath, true) + logInfo("Delete staging directory: " + p.getPath) + } + } + } +} + diff --git a/scala/src/test/scala/org/apache/spark/streaming/api/csharp/CSharpDStreamSuite.scala b/scala/src/test/scala/org/apache/spark/streaming/api/csharp/CSharpDStreamSuite.scala index f9079f6..d8505d1 100644 --- a/scala/src/test/scala/org/apache/spark/streaming/api/csharp/CSharpDStreamSuite.scala +++ b/scala/src/test/scala/org/apache/spark/streaming/api/csharp/CSharpDStreamSuite.scala @@ -4,13 +4,17 @@ */ package org.apache.spark.streaming.api.csharp -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.scheduler.{BatchInfo, StreamingListenerBatchCompleted} -import org.apache.spark.streaming.{StreamingContext, Duration, Time} -import org.apache.spark.streaming.dstream.DStream -import org.apache.spark.{SparkContext, SparkConf} -import org.apache.spark.csharp.SparkCLRFunSuite +import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.csharp.SparkCLRFunSuite +import org.apache.spark.rdd.{LocalRDDCheckpointData, RDD} +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.scheduler.{BatchInfo, StreamingListenerBatchCompleted} +import org.apache.spark.streaming.{Duration, Seconds, StreamingContext, Time} +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} + +import scala.collection.mutable.{ArrayBuffer, Queue} import scala.reflect.ClassTag private class MockSparkContext(config: SparkConf) extends SparkContext(config) { @@ -32,7 +36,72 @@ private class MockCSharpStateDStream( override def slideDuration: Duration = new Duration(1000) } -class CSharpDStreamSuite extends SparkCLRFunSuite { +class CSharpDStreamSuite extends SparkCLRFunSuite with BeforeAndAfterAll with BeforeAndAfter { + private var conf: SparkConf = null + private var sc: SparkContext = null + + before { + StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } + if (sc != null && !sc.stopped.get) { + sc.stop() + } + } + + after { + StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } + if (sc != null && !sc.stopped.get) { + sc.stop() + } + } + + test("CSharpStateDStream - local checkpoint enabled") { + val replicas = 2 + conf = new SparkConf().setMaster("local[*]").setAppName("CSharpStateDStream") + conf.set("spark.mobius.streaming.localCheckpoint.enabled", "true") + conf.set("spark.mobius.streaming.localCheckpoint.replicas", replicas.toString) + + // set reserved memory to 50M so the min system memory only needs 50M * 1.5 = 70.5M + conf.set("spark.testing.reservedMemory", "" + 50 * 1024 * 1024) + + sc = new SparkContext(conf) + val ssc = new StreamingContext(sc, Seconds(1)) + + // Set up CSharpDStream + val expectedRDD = sc.makeRDD(ArrayBuffer.fill(10)("a".getBytes)) + CSharpDStream.debugMode = true + CSharpDStream.debugRDD = Some(expectedRDD) + + // Construct CSharpStateDStream + val parentDStream = ssc.queueStream[Array[Byte]](Queue(sc.makeRDD(ArrayBuffer.fill(10)("a".getBytes))), true) + val stateDStream = new CSharpStateDStream(parentDStream, new Array[Byte](0), "byte", "byte") + stateDStream.register() + + val rddCount = new AtomicInteger(0) // counter to indicate how many batches have finished + + stateDStream.foreachRDD { stateRDD => + rddCount.incrementAndGet() + println(stateRDD.count()) + // asserts + assert(stateRDD.checkpointData.get.isInstanceOf[LocalRDDCheckpointData[Array[Byte]]]) + assert(stateRDD.getStorageLevel.replication == replicas) + } + ssc.start() + + // check whether first batch finished + val startWaitingTimeInMills = System.currentTimeMillis() + val maxWaitingTimeInSecs = 20 + while(rddCount.get() < 1) { + Thread.sleep(100) + if((System.currentTimeMillis() - startWaitingTimeInMills) > maxWaitingTimeInSecs * 1000) { + // if waiting time exceeds `maxWaitingTimeInSecs` secs, will fail current test. + fail(s"Total running time exceeds $maxWaitingTimeInSecs, fail current test.") + } + } + + ssc.stop(false) + sc.stop + assert(rddCount.get() >= 1) + } test("runParallelJob in UpdateStateByKey") {