Avoid RDD checkpoint for DynamicPartitionKafkaInputDStream by replaying last N batches

This commit is contained in:
tawan0109 2016-07-08 17:52:47 +08:00
Родитель d3999ca43d
Коммит 3b67b2a6a7
3 изменённых файлов: 117 добавлений и 30 удалений

Просмотреть файл

@ -195,6 +195,27 @@ function Untar-File($tarFile, $targetDir)
Write-Output "[downloadtools.Untar-File] Extraction completed. Time taken: $howlong"
}
function Download-Winutils
{
$winutilsBin = "$toolsDir\winutils\bin"
if (!(test-path "$winutilsBin"))
{
New-Item -ItemType Directory -Force -Path $winutilsBin | Out-Null
}
$winutilsExe = "$winutilsBin\winutils.exe"
if (!(test-path $winutilsExe))
{
$url = "https://github.com/MobiusForSpark/winutils/blob/master/hadoop-2.6.0/bin/winutils.exe?raw=true"
$output=$winutilsExe
Download-File $url $output
}
else
{
Write-Output "[downloadtools.Download-RuntimeDependencies] $winutilsExe exists already. No download and extraction needed"
}
}
function Download-BuildTools
{
# Create a cmd file to update environment variable
@ -286,6 +307,10 @@ function Download-BuildTools
}
}
# Download winutils.exe
Download-Winutils
$envStream.WriteLine("set HADOOP_HOME=$toolsDir\winutils");
$envStream.close()
}
@ -372,26 +397,8 @@ function Download-RuntimeDependencies
}
# Download winutils.exe
$H_HOME = "$toolsDir\winutils"
$winutilsBin = "$H_HOME\bin"
if (!(test-path "$winutilsBin"))
{
New-Item -ItemType Directory -Force -Path $winutilsBin | Out-Null
}
$winutilsExe = "$winutilsBin\winutils.exe"
if (!(test-path $winutilsExe))
{
$url = "https://github.com/MobiusForSpark/winutils/blob/master/hadoop-2.6.0/bin/winutils.exe?raw=true"
$output=$winutilsExe
Download-File $url $output
}
else
{
Write-Output "[downloadtools.Download-RuntimeDependencies] $winutilsExe exists already. No download and extraction needed"
}
$envStream.WriteLine("set HADOOP_HOME=$H_HOME");
Download-Winutils
$envStream.WriteLine("set HADOOP_HOME=$toolsDir\winutils");
$envStream.close()

Просмотреть файл

@ -116,6 +116,7 @@ class DynamicPartitionKafkaInputDStream[
case n if n > replayBatches => n
case _: Int => replayBatches
}
@transient private[streaming] var needToReplayOffsetRanges = false
if (enableOffsetCheckpoint && fs != null) {
if (!fs.exists(offsetCheckpointDir)) {
@ -406,7 +407,13 @@ class DynamicPartitionKafkaInputDStream[
}
if (enableOffsetCheckpoint && offsetCheckpointDir != null) {
commitCheckpoint(offsetCheckpointDir, time, new Duration(replayBatches * slideDuration.milliseconds), maxRetainedOffsetCheckpoints, fs)
// When offset ranges checkpoint enabled, the 1st batch is used to replay the checkpointed offsets,
// in this case don't need to commit the offsets
if (needToReplayOffsetRanges) {
needToReplayOffsetRanges = false
} else {
commitCheckpoint(offsetCheckpointDir, time, new Duration(replayBatches * slideDuration.milliseconds), maxRetainedOffsetCheckpoints, fs)
}
}
super.clearMetadata(time)
@ -423,6 +430,11 @@ class DynamicPartitionKafkaInputDStream[
if (!offsetRanges.isEmpty) {
val leaders = if (kafkaParams("metadata.broker.list").isEmpty)
Map[TopicAndPartition, (String, Int)]()
else if (kafkaParams.contains("metadata.leader.list")) // this piece of code is for test purpose only
kafkaParams("metadata.leader.list").split("\\|").map { l =>
val s = l.split(",")
TopicAndPartition(s(0), s(1).toInt) -> (s(2), s(3).toInt)
}.toMap
else
KafkaCluster.checkErrors(kc.findLeaders(offsetRanges.keySet))
@ -434,6 +446,7 @@ class DynamicPartitionKafkaInputDStream[
}
)
logInfo(s"Loaded offset range from checkpoint, topics: $topics, dc: $dc")
needToReplayOffsetRanges = true
}
// remove all *staging directories
deleteUncommittedCheckpoint(offsetCheckpointDir, fs)
@ -514,9 +527,7 @@ private[streaming] object OffsetRangeCheckpointHandler extends Logging {
logInfo("Attempt to commit offsets checkpoint, batchTime:" + batchTime)
LOCK.synchronized {
Utils.tryWithSafeFinally {
// When offset ranges checkpoint enabled, the 1st batch is to replay the checkpointed offsets,
// in this case don't need to commit the offsets
if (lastCommittedBatchTim == null || batchTime == lastCommittedBatchTim) {
if (batchTime == lastCommittedBatchTim) {
return
}
@ -560,17 +571,16 @@ private[streaming] object OffsetRangeCheckpointHandler extends Logging {
val bo = new BufferedWriter(new OutputStreamWriter(fs.create(offsetCheckpointPath), "UTF-8"))
Utils.tryWithSafeFinally {
offsetRanges.sortBy(_.fromOffset).foreach { kv =>
offsetRanges.foreach { range =>
//schema: {topic}\t{patition}\t{fromOffset}\t{untilOffset}
bo.write(s"$topic$SEP${range.partition}$SEP${range.fromOffset}$SEP${range.untilOffset}")
bo.newLine
}
offsetRanges.sortBy(_.fromOffset).foreach { range =>
//schema: {topic}\t{patition}\t{fromOffset}\t{untilOffset}
bo.write(s"$topic$SEP${range.partition}$SEP${range.fromOffset}$SEP${range.untilOffset}")
bo.newLine
}
logInfo(s"Checkpointed offset ranges for dc $dc time $batchTime offsetRange ${offsetRanges.size}")
} {
bo.close
}
offsetCheckpointPath
}

Просмотреть файл

@ -4,10 +4,16 @@
*/
package org.apache.spark.streaming.kafka
import java.io.File
import java.nio.file.Files
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.DefaultDecoder
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{LocalFileSystem, Path}
import org.apache.spark.csharp.SparkCLRFunSuite
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.streaming.api.csharp.CSharpDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.scheduler.InputInfoTracker
@ -85,4 +91,68 @@ class DynamicPartitionKafkaInputDStreamSuite extends SparkCLRFunSuite {
sc.stop()
}
}
test("Kafka offset checkpoint and replay") {
val port = 5000
val host0 = "host0"
val host1 = "host1"
val topic = "testTopic"
val dc = "testDC"
val tp0 = TopicAndPartition(topic, 0)
val tp1 = TopicAndPartition(topic, 1)
val offsetRange = Some((Map(tp0 -> 100L, tp1 -> 200L), Map(tp0 -> LeaderOffset(host0, port, 1000), tp1 -> LeaderOffset(host1, port, 300))))
val leaders = Map(tp0 -> (host0, port), tp1 -> (host1, port))
val kafkaParams = Map("metadata.broker.list" -> "broker1",
"metadata.leader.list" -> leaders.map { case (tp, hostAndPort) =>
s"${tp.topic},${tp.partition},${hostAndPort._1},${hostAndPort._2}"
}.mkString("|"),
"auto.offset.reset" -> "largest",
"cluster.id" -> dc
)
val conf = new SparkConf()
.setAppName("test")
.setMaster("local")
.set("spark.testing", "true")
.set("spark.mobius.streaming.kafka.enableOffsetCheckpoint", "true")
.set("spark.mobius.streaming.kafka.replayBatches", "1")
.set("spark.mobius.streaming.kafka.maxRetainedOffsetCheckpoints", "1")
val sc = new SparkContext(conf)
val checkpointDirPath = new Path(FileUtils.getTempDirectoryPath, "test-checkpoint")
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
if (fs.exists(checkpointDirPath)) fs.delete(checkpointDirPath, true)
sc.setCheckpointDir(checkpointDirPath.toString)
val ssc = new StreamingContext(sc, new Duration(1000))
ssc.scheduler.inputInfoTracker = new InputInfoTracker(ssc)
var ds: DynamicPartitionKafkaInputDStream[_, _, _, _, _] = null
try {
val messageHandler =
(mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
ds = new DynamicPartitionKafkaInputDStream
[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (Array[Byte], Array[Byte])](
ssc, kafkaParams, Set(topic), Map(), sc.clean(messageHandler), 1)
ds.zeroTime = new Time(0)
ds.offsetsRangeForNextBatch = offsetRange
ds.compute(new Time(1000)) // fresh start
ds.clearMetadata(new Time(1000)) // this will
assert(!ds.needToReplayOffsetRanges)
ds.start()
assert(ds.offsetsRangeForNextBatch.isDefined)
assert(ds.needToReplayOffsetRanges)
assertResult(offsetRange)(ds.offsetsRangeForNextBatch)
} finally {
if(ds != null) ds.stop
sc.stop()
fs.delete(checkpointDirPath, true)
fs.close()
}
}
}