fast initialization of progress tracker (#162)
* [spark-2.1.x] making EventHub API call every batch (#156) * making EventHub API call every batch * remove unused import * style fix * Bug fix * same bug fix but for DStreams * [2.1.x] fast initialization of progress tracker (#158) * metadata path setup and fall back to list all mechanism * delete progress file directory after each test * fix the failed test * add test for metdata exist * add test for read progress file * revise mockito test * update pom * pass in all params with mockit * change the return value of getLatestFile * add test for checking if really queried metadata * test: When metadata presents, we should respect it even the more recent progress file is there * update import * add writing logic of metadata * update tests * add log * add logging * revise logging * add logging in commit() of structured streaming * fix the test * revise test for metadata not committed * revise tests * fix the failed test * add partially committed progress file test * add test of upgrade * update upgrade test * add test for upgrading structured streaming * add test for no metadata * update test name * enabling cleanup thread * cancel cleanup task * add tracing * test for fishy change * test for fishy change * fix the path * code clean * address the comments * missed one
This commit is contained in:
Родитель
c06e0fbc69
Коммит
ca3223964e
|
@ -73,10 +73,10 @@ private[spark] object RateControlUtils extends Logging {
|
|||
private[spark] def fetchLatestOffset(
|
||||
eventHubClient: EventHubClient,
|
||||
retryIfFail: Boolean,
|
||||
fetchedHighestOffsetsAndSeqNums: Map[EventHubNameAndPartition, (Long, Long)],
|
||||
currentOffsetsAndSeqNums: Map[EventHubNameAndPartition, (Long, Long)]):
|
||||
fetchedHighestOffsetsAndSeqNums: Map[EventHubNameAndPartition, (Long, Long)]):
|
||||
Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
|
||||
val r = eventHubClient.endPointOfPartition(retryIfFail, currentOffsetsAndSeqNums.keySet.toList)
|
||||
val r = eventHubClient.endPointOfPartition(
|
||||
retryIfFail, fetchedHighestOffsetsAndSeqNums.keySet.toList)
|
||||
if (r.isDefined) {
|
||||
// merge results
|
||||
val mergedOffsets = if (fetchedHighestOffsetsAndSeqNums != null) {
|
||||
|
|
|
@ -25,12 +25,16 @@ private[spark] object PathTools extends Serializable {
|
|||
subDirs.mkString("/")
|
||||
}
|
||||
|
||||
def progressDirPathStr(checkpointDir: String, subDirNames: String*): String = {
|
||||
s"$checkpointDir/${fromSubDirNamesToString(subDirNames)}"
|
||||
def progressDirPathStr(progressDir: String, subDirNames: String*): String = {
|
||||
s"$progressDir/${fromSubDirNamesToString(subDirNames)}"
|
||||
}
|
||||
|
||||
def progressTempDirPathStr(checkpointDir: String, subDirNames: String*): String = {
|
||||
s"$checkpointDir/${fromSubDirNamesToString(subDirNames)}_temp"
|
||||
def progressTempDirPathStr(progressDir: String, subDirNames: String*): String = {
|
||||
s"$progressDir/${fromSubDirNamesToString(subDirNames)}_temp"
|
||||
}
|
||||
|
||||
def progressMetadataDirPathStr(progressDir: String, subDirNames: String*): String = {
|
||||
s"$progressDir/${fromSubDirNamesToString(subDirNames)}_metadata"
|
||||
}
|
||||
|
||||
def progressTempFileNamePattern(
|
||||
|
@ -45,6 +49,8 @@ private[spark] object PathTools extends Serializable {
|
|||
s"progress-$timestamp"
|
||||
}
|
||||
|
||||
def progressMetadataNamePattern(timestamp: Long): String = timestamp.toString
|
||||
|
||||
def progressTempFileStr(
|
||||
basePath: String,
|
||||
streamId: Int,
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.eventhubscommon.progress
|
||||
|
||||
import java.io.{BufferedReader, InputStreamReader, IOException}
|
||||
import java.util.concurrent.{ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.collection.mutable.ListBuffer
|
||||
|
@ -32,18 +33,31 @@ import org.apache.spark.internal.Logging
|
|||
private[spark] abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
||||
progressDir: String, appName: String, hadoopConfiguration: Configuration) extends Logging {
|
||||
|
||||
|
||||
protected lazy val progressDirStr: String = PathTools.progressDirPathStr(progressDir, appName)
|
||||
protected lazy val progressTempDirStr: String = PathTools.progressTempDirPathStr(progressDir,
|
||||
appName)
|
||||
protected lazy val progressMetadataDirStr: String = PathTools.progressMetadataDirPathStr(
|
||||
progressDir, appName)
|
||||
|
||||
protected lazy val progressDirPath = new Path(progressDirStr)
|
||||
protected lazy val progressTempDirPath = new Path(progressTempDirStr)
|
||||
protected lazy val progressMetadataDirPath = new Path(progressMetadataDirStr)
|
||||
|
||||
def eventHubNameAndPartitions: Map[String, List[EventHubNameAndPartition]]
|
||||
|
||||
private[spark] def progressDirectoryPath = progressDirPath
|
||||
private[spark] def progressTempDirectoryPath = progressTempDirPath
|
||||
private[spark] def progressMetadataDirectoryPath = progressMetadataDirPath
|
||||
|
||||
// metadata cleaning is different with progress file and temporary file clean up in that, metadata
|
||||
// is developed for fast initialization of progress tracker and it should (can) be independent
|
||||
// with Spark Streaming checkpoint (the others have to be coordinated with Spark Streaming
|
||||
// checkpoint cleanup to ensure that we have enough support for recovery). By cleaning metadata
|
||||
// in progress tracker, we can ensure that the ProgressTracker can still be quickly initialized
|
||||
// even the user does not enable Spark Streaming checkpoint or the cleanup of checkpoint is
|
||||
// lagging behind
|
||||
private val threadPoolForMetadataClean = new ScheduledThreadPoolExecutor(1)
|
||||
protected val metadataCleanupFuture: ScheduledFuture[_] = scheduleMetadataCleanTask()
|
||||
|
||||
// getModificationTime is not reliable for unit test and some extreme case in distributed
|
||||
// file system so that we have to derive timestamp from the file names. The timestamp can be the
|
||||
|
@ -61,13 +75,10 @@ private[spark] abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get the latest progress file saved under directory
|
||||
*/
|
||||
protected def getLatestFile(
|
||||
directory: Path, fs: FileSystem, timestamp: Long = Long.MaxValue): Option[Path] = {
|
||||
require(fs.isDirectory(directory), s"$directory is not a directory")
|
||||
val allFiles = fs.listStatus(directory)
|
||||
// no metadata (for backward compatiblity
|
||||
private def getLatestFileWithoutMetadata(fs: FileSystem, timestamp: Long = Long.MaxValue):
|
||||
Option[Path] = {
|
||||
val allFiles = fs.listStatus(progressDirPath)
|
||||
if (allFiles.length < 1) {
|
||||
None
|
||||
} else {
|
||||
|
@ -77,16 +88,45 @@ private[spark] abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
|||
}
|
||||
}
|
||||
|
||||
private def getLatestFileWithMetadata(metadataFiles: Array[FileStatus]): Option[Path] = {
|
||||
val latestMetadata = metadataFiles.sortWith((f1, f2) => f1.getPath.getName.toLong >
|
||||
f2.getPath.getName.toLong).head
|
||||
logInfo(s"locate latest timestamp in metadata as ${latestMetadata.getPath.getName}")
|
||||
Some(new Path(progressDirStr + "/progress-" + latestMetadata.getPath.getName))
|
||||
}
|
||||
|
||||
/**
|
||||
* get the latest progress file saved under directory
|
||||
*
|
||||
* NOTE: the additional integer in return value is to simplify the test (could be improved)
|
||||
*/
|
||||
private[spark] def getLatestFile(fs: FileSystem, timestamp: Long = Long.MaxValue):
|
||||
(Int, Option[Path]) = {
|
||||
// first check metadata directory if exists
|
||||
if (fs.exists(progressMetadataDirPath)) {
|
||||
val metadataFiles = fs.listStatus(progressMetadataDirPath).filter(
|
||||
file => file.isFile && file.getPath.getName.toLong <= timestamp)
|
||||
if (metadataFiles.nonEmpty) {
|
||||
// metadata files exists
|
||||
(0, getLatestFileWithMetadata(metadataFiles))
|
||||
} else {
|
||||
(1, getLatestFileWithoutMetadata(fs, timestamp))
|
||||
}
|
||||
} else {
|
||||
(1, getLatestFileWithoutMetadata(fs, timestamp))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* this method is called when ProgressTracker is started for the first time (including recovering
|
||||
* from checkpoint). This method validate the latest progress file by checking whether it
|
||||
* contains progress of all partitions we subscribe to. If not, we will delete the corrupt
|
||||
* progress file
|
||||
* from Spark Streaming checkpoint). This method validate the latest progress file by checking
|
||||
* whether it contains progress of all partitions we subscribe to. If not, we will delete the
|
||||
* corrupt progress file
|
||||
* @return (whether the latest file pass the validation, option to the file path,
|
||||
* the latest timestamp)
|
||||
*/
|
||||
protected def validateProgressFile(fs: FileSystem): (Boolean, Option[Path]) = {
|
||||
val latestFileOpt = getLatestFile(progressDirPath, fs)
|
||||
val (_, latestFileOpt) = getLatestFile(fs)
|
||||
val allProgressFiles = new mutable.HashMap[String, List[EventHubNameAndPartition]]
|
||||
var br: BufferedReader = null
|
||||
try {
|
||||
|
@ -168,7 +208,7 @@ private[spark] abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
|||
private[spark] def pinPointProgressFile(fs: FileSystem, timestamp: Long): Option[Path] = {
|
||||
try {
|
||||
require(fs.isDirectory(progressDirPath), s"$progressDirPath is not a directory")
|
||||
val targetFilePath = new Path(progressDirPath.toString + s"/progress-$timestamp")
|
||||
val targetFilePath = new Path(s"$progressDirStr/progress-$timestamp")
|
||||
val targetFileExists = fs.exists(targetFilePath)
|
||||
if (targetFileExists) Some(targetFilePath) else None
|
||||
} catch {
|
||||
|
@ -200,7 +240,7 @@ private[spark] abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
|||
if (!fallBack) {
|
||||
pinPointProgressFile(fs, timestamp)
|
||||
} else {
|
||||
getLatestFile(progressDirPath, fs, timestamp)
|
||||
getLatestFile(fs, timestamp)._2
|
||||
}
|
||||
}
|
||||
if (progressFileOption.isEmpty) {
|
||||
|
@ -234,15 +274,15 @@ private[spark] abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
|||
OffsetRecord(readTimestamp, recordToReturn)
|
||||
}
|
||||
|
||||
// write offsetToCommit to a progress tracking file
|
||||
private def transaction(
|
||||
offsetToCommit: Map[String, Map[EventHubNameAndPartition, (Long, Long)]],
|
||||
fs: FileSystem,
|
||||
commitTime: Long): Unit = {
|
||||
private def createProgressFile(
|
||||
offsetToCommit: Map[String, Map[EventHubNameAndPartition, (Long, Long)]],
|
||||
fs: FileSystem,
|
||||
commitTime: Long): Boolean = {
|
||||
var oos: FSDataOutputStream = null
|
||||
try {
|
||||
oos = fs.create(new Path(progressDirStr +
|
||||
s"/${PathTools.progressFileNamePattern(commitTime)}"), true)
|
||||
// write progress file
|
||||
oos = fs.create(new Path(s"$progressDirStr/${PathTools.progressFileNamePattern(commitTime)}"),
|
||||
true)
|
||||
offsetToCommit.foreach {
|
||||
case (namespace, ehNameAndPartitionToOffsetAndSeq) =>
|
||||
ehNameAndPartitionToOffsetAndSeq.foreach {
|
||||
|
@ -254,6 +294,11 @@ private[spark] abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
|||
)
|
||||
}
|
||||
}
|
||||
true
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
e.printStackTrace()
|
||||
false
|
||||
} finally {
|
||||
if (oos != null) {
|
||||
oos.close()
|
||||
|
@ -261,6 +306,41 @@ private[spark] abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
|||
}
|
||||
}
|
||||
|
||||
private def createMetadata(fs: FileSystem, commitTime: Long): Boolean = {
|
||||
var oos: FSDataOutputStream = null
|
||||
try {
|
||||
oos = fs.create(new Path(s"$progressMetadataDirStr/" +
|
||||
s"${PathTools.progressMetadataNamePattern(commitTime)}"), true)
|
||||
true
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
e.printStackTrace()
|
||||
false
|
||||
} finally {
|
||||
if (oos != null) {
|
||||
oos.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// write offsetToCommit to a progress tracking file
|
||||
private def transaction(
|
||||
offsetToCommit: Map[String, Map[EventHubNameAndPartition, (Long, Long)]],
|
||||
fs: FileSystem,
|
||||
commitTime: Long): Unit = {
|
||||
if (createProgressFile(offsetToCommit, fs, commitTime)) {
|
||||
if (!createMetadata(fs, commitTime)) {
|
||||
logError(s"cannot create progress file at $commitTime")
|
||||
throw new IOException(s"cannot create metadata file at $commitTime," +
|
||||
s" check the previous exception for the root cause")
|
||||
}
|
||||
} else {
|
||||
logError(s"cannot create progress file at $commitTime")
|
||||
throw new IOException(s"cannot create progress file at $commitTime," +
|
||||
s" check the previous exception for the root cause")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* commit offsetToCommit to a new progress tracking file
|
||||
*/
|
||||
|
@ -363,5 +443,22 @@ private[spark] abstract class ProgressTrackerBase[T <: EventHubsConnector](
|
|||
}
|
||||
}
|
||||
|
||||
private def scheduleMetadataCleanTask(): ScheduledFuture[_] = {
|
||||
val metadataCleanTask = new Runnable {
|
||||
override def run() = {
|
||||
val fs = progressMetadataDirectoryPath.getFileSystem(new Configuration())
|
||||
val allMetadataFiles = fs.listStatus(progressMetadataDirPath)
|
||||
val sortedMetadataFiles = allMetadataFiles.sortWith((f1, f2) => f1.getPath.getName.toLong <
|
||||
f2.getPath.getName.toLong)
|
||||
sortedMetadataFiles.take(math.max(sortedMetadataFiles.length - 1, 0)).map{
|
||||
file =>
|
||||
fs.delete(file.getPath, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
// do not need to expose internals to users so hardcoded
|
||||
threadPoolForMetadataClean.scheduleAtFixedRate(metadataCleanTask, 0, 30, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
def init(): Unit
|
||||
}
|
||||
|
|
|
@ -121,11 +121,10 @@ private[spark] class EventHubsSource(
|
|||
RateControlUtils.fetchLatestOffset(eventHubClient,
|
||||
retryIfFail = retryIfFail,
|
||||
if (fetchedHighestOffsetsAndSeqNums == null) {
|
||||
null
|
||||
committedOffsetsAndSeqNums.offsets
|
||||
} else {
|
||||
fetchedHighestOffsetsAndSeqNums.offsets
|
||||
},
|
||||
committedOffsetsAndSeqNums.offsets) match {
|
||||
}) match {
|
||||
case Some(highestOffsets) =>
|
||||
fetchedHighestOffsetsAndSeqNums = EventHubsOffset(committedOffsetsAndSeqNums.batchId,
|
||||
highestOffsets)
|
||||
|
@ -296,7 +295,7 @@ private[spark] class EventHubsSource(
|
|||
val nextBatchId = batchId + 1
|
||||
val progress = progressTracker.read(uid, nextBatchId, fallBack = false)
|
||||
if (progress.timestamp == -1 || !validateReadResults(progress)) {
|
||||
// next batch hasn't been committed
|
||||
// next batch hasn't been committed successfully
|
||||
val lastCommittedOffset = progressTracker.read(uid, batchId, fallBack = false)
|
||||
EventHubsOffset(batchId, lastCommittedOffset.offsets)
|
||||
} else {
|
||||
|
|
|
@ -37,23 +37,38 @@ class StructuredStreamingProgressTracker(
|
|||
progressDir, appName, uid)
|
||||
protected override lazy val progressTempDirStr: String = PathTools.progressTempDirPathStr(
|
||||
progressDir, appName, uid)
|
||||
protected override lazy val progressMetadataDirStr: String = PathTools.progressMetadataDirPathStr(
|
||||
progressDir, appName, uid)
|
||||
|
||||
override def eventHubNameAndPartitions: Map[String, List[EventHubNameAndPartition]] = {
|
||||
val connector = StructuredStreamingProgressTracker.registeredConnectors(uid)
|
||||
Map(connector.uid -> connector.connectedInstances)
|
||||
}
|
||||
|
||||
override def init(): Unit = {
|
||||
// recover from partially executed checkpoint commit
|
||||
private def initMetadataDirectory(): Unit = {
|
||||
try {
|
||||
val fs = progressMetadataDirPath.getFileSystem(hadoopConfiguration)
|
||||
val checkpointMetadaDirExisted = fs.exists(progressTempDirPath)
|
||||
if (!checkpointMetadaDirExisted) {
|
||||
fs.mkdirs(progressMetadataDirPath)
|
||||
}
|
||||
} catch {
|
||||
case ex: Exception =>
|
||||
ex.printStackTrace()
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
|
||||
private def initProgressFileDirectory(): Unit = {
|
||||
val fs = progressDirPath.getFileSystem(hadoopConfiguration)
|
||||
try {
|
||||
val checkpointDirExisted = fs.exists(progressDirPath)
|
||||
if (checkpointDirExisted) {
|
||||
val progressDirExist = fs.exists(progressDirPath)
|
||||
if (progressDirExist) {
|
||||
val (validationPass, latestFile) = validateProgressFile(fs)
|
||||
println(s"${latestFile}")
|
||||
if (!validationPass) {
|
||||
if (latestFile.isDefined) {
|
||||
logWarning(s"latest progress file ${latestFile.get} corrupt, rebuild file...")
|
||||
println(s"latest progress file ${latestFile.get} corrupt, rebuild file...")
|
||||
val latestFileTimestamp = fromPathToTimestamp(latestFile.get)
|
||||
val progressRecords = collectProgressRecordsForBatch(latestFileTimestamp,
|
||||
List(StructuredStreamingProgressTracker.registeredConnectors(uid)))
|
||||
|
@ -67,10 +82,13 @@ class StructuredStreamingProgressTracker(
|
|||
case ex: Exception =>
|
||||
ex.printStackTrace()
|
||||
throw ex
|
||||
} finally {
|
||||
// EMPTY
|
||||
}
|
||||
}
|
||||
|
||||
override def init(): Unit = {
|
||||
initProgressFileDirectory()
|
||||
initMetadataDirectory()
|
||||
}
|
||||
}
|
||||
|
||||
object StructuredStreamingProgressTracker {
|
||||
|
@ -81,6 +99,7 @@ object StructuredStreamingProgressTracker {
|
|||
|
||||
private[spark] def reset(): Unit = {
|
||||
registeredConnectors.clear()
|
||||
_progressTrackers.values.map(pt => pt.metadataCleanupFuture.cancel(true))
|
||||
_progressTrackers.clear()
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ import scala.collection.mutable
|
|||
import com.microsoft.azure.eventhubs.EventData
|
||||
|
||||
import org.apache.spark.eventhubscommon._
|
||||
import org.apache.spark.eventhubscommon.client.{AMQPEventHubsClient, EventHubClient, EventHubsClientWrapper, RestfulEventHubClient}
|
||||
import org.apache.spark.eventhubscommon.client.{AMQPEventHubsClient, EventHubClient, EventHubsClientWrapper}
|
||||
import org.apache.spark.eventhubscommon.client.EventHubsOffsetTypes.EventHubsOffsetType
|
||||
import org.apache.spark.eventhubscommon.rdd.{EventHubsRDD, OffsetRange, OffsetStoreParams}
|
||||
import org.apache.spark.internal.Logging
|
||||
|
@ -295,11 +295,10 @@ private[eventhubs] class EventHubDirectDStream private[eventhubs] (
|
|||
eventHubClient,
|
||||
retryIfFail,
|
||||
if (fetchedHighestOffsetsAndSeqNums == null) {
|
||||
null
|
||||
currentOffsetsAndSeqNums.offsets
|
||||
} else {
|
||||
fetchedHighestOffsetsAndSeqNums.offsets
|
||||
},
|
||||
currentOffsetsAndSeqNums.offsets)
|
||||
})
|
||||
match {
|
||||
case Some(highestOffsets) =>
|
||||
fetchedHighestOffsetsAndSeqNums = OffsetRecord(validTime.milliseconds, highestOffsets)
|
||||
|
|
|
@ -52,14 +52,10 @@ private[spark] class DirectDStreamProgressTracker private[spark](
|
|||
}.toMap
|
||||
}
|
||||
|
||||
/**
|
||||
* called when ProgressTracker is called for the first time, including recovering from the
|
||||
* checkpoint
|
||||
*/
|
||||
override def init(): Unit = {
|
||||
// recover from partially executed checkpoint commit
|
||||
val fs = progressDirPath.getFileSystem(hadoopConfiguration)
|
||||
|
||||
private def initProgressFileDirectory(): Unit = {
|
||||
try {
|
||||
val fs = progressDirPath.getFileSystem(hadoopConfiguration)
|
||||
val checkpointDirExisted = fs.exists(progressDirPath)
|
||||
if (checkpointDirExisted) {
|
||||
val (validationPass, latestFile) = validateProgressFile(fs)
|
||||
|
@ -72,6 +68,16 @@ private[spark] class DirectDStreamProgressTracker private[spark](
|
|||
} else {
|
||||
fs.mkdirs(progressDirPath)
|
||||
}
|
||||
} catch {
|
||||
case ex: Exception =>
|
||||
ex.printStackTrace()
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
|
||||
private def initTempProgressFileDirectory(): Unit = {
|
||||
try {
|
||||
val fs = progressTempDirPath.getFileSystem(hadoopConfiguration)
|
||||
val checkpointTempDirExisted = fs.exists(progressTempDirPath)
|
||||
if (checkpointTempDirExisted) {
|
||||
fs.delete(progressTempDirPath, true)
|
||||
|
@ -82,11 +88,33 @@ private[spark] class DirectDStreamProgressTracker private[spark](
|
|||
case ex: Exception =>
|
||||
ex.printStackTrace()
|
||||
throw ex
|
||||
} finally {
|
||||
// EMPTY
|
||||
}
|
||||
}
|
||||
|
||||
private def initMetadataDirectory(): Unit = {
|
||||
try {
|
||||
val fs = progressMetadataDirPath.getFileSystem(hadoopConfiguration)
|
||||
val checkpointMetadaDirExisted = fs.exists(progressTempDirPath)
|
||||
if (!checkpointMetadaDirExisted) {
|
||||
fs.mkdirs(progressMetadataDirPath)
|
||||
}
|
||||
} catch {
|
||||
case ex: Exception =>
|
||||
ex.printStackTrace()
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* called when ProgressTracker is referred for the first time, including recovering from the
|
||||
* Spark Streaming checkpoint
|
||||
*/
|
||||
override def init(): Unit = {
|
||||
initProgressFileDirectory()
|
||||
initTempProgressFileDirectory()
|
||||
initMetadataDirectory()
|
||||
}
|
||||
|
||||
/**
|
||||
* read the progress record for the specified namespace, streamId and timestamp
|
||||
*/
|
||||
|
@ -147,11 +175,17 @@ object DirectDStreamProgressTracker {
|
|||
|
||||
private[spark] def reset(): Unit = {
|
||||
registeredConnectors.clear()
|
||||
_progressTracker.metadataCleanupFuture.cancel(true)
|
||||
_progressTracker = null
|
||||
}
|
||||
|
||||
def getInstance: ProgressTrackerBase[_ <: EventHubsConnector] = _progressTracker
|
||||
|
||||
// should only be used for testing
|
||||
private[streaming] def setProgressTracker(progressTracker: DirectDStreamProgressTracker): Unit = {
|
||||
_progressTracker = progressTracker
|
||||
}
|
||||
|
||||
private[spark] def initInstance(
|
||||
progressDirStr: String,
|
||||
appName: String,
|
||||
|
|
|
@ -530,7 +530,7 @@ class EventHubsSourceSuite extends EventHubsStreamTest {
|
|||
EventHubsTestUtilities.simulateEventHubs(eventHubsParameters,
|
||||
eventPayloadsAndProperties.take(30 * 10 * 2))
|
||||
val sourceQuery = generateInputQuery(eventHubsParameters, spark)
|
||||
val manualClock = new StreamManualClock(0)
|
||||
val manualClock = new StreamManualClock
|
||||
val firstBatch = Seq(
|
||||
StartStream(trigger = ProcessingTime(10), triggerClock = manualClock),
|
||||
AddEventHubsData(eventHubsParameters, 9))
|
||||
|
@ -570,7 +570,7 @@ class EventHubsSourceSuite extends EventHubsStreamTest {
|
|||
testStream(sourceQuery)(firstBatch ++ clockMove ++ secondBatch ++ clockMove2 ++ thirdBatch: _*)
|
||||
}
|
||||
|
||||
test("Verify expected dataframe can be retrieved when the partial committed results exist") {
|
||||
test("Verify expected dataframe can be retrieved when the progress file is partially committed") {
|
||||
import testImplicits._
|
||||
val eventHubsParameters = buildEventHubsParamters("ns1", "eh1", 2, 30)
|
||||
val eventPayloadsAndProperties = generateIntKeyedData(1000)
|
||||
|
@ -584,11 +584,67 @@ class EventHubsSourceSuite extends EventHubsStreamTest {
|
|||
val clockMove = Array.fill(9)(AdvanceManualClock(10)).toSeq
|
||||
val secondBatch = Seq(
|
||||
CheckAnswer(1 to 600: _*),
|
||||
StopStream(recoverStreamId = true, commitPartialOffset = true, partialType = "partial"),
|
||||
StopStream(recoverStreamId = true, commitPartialOffset = true,
|
||||
partialType = "partial"),
|
||||
StartStream(trigger = ProcessingTime(10), triggerClock = manualClock,
|
||||
additionalConfs = Map("eventhubs.test.newSink" -> "true")),
|
||||
AddEventHubsData(eventHubsParameters, 17, eventPayloadsAndProperties.takeRight(400)))
|
||||
val clockMove2 = Array.fill(8)(AdvanceManualClock(10)).toSeq
|
||||
// in structured streaming, even metadata is not committed, we will be able to skip the
|
||||
// processed data, since we will pinpoint progress file with the recovered batch id
|
||||
val thirdBatch = Seq(CheckAnswer(541 to 1000: _*))
|
||||
testStream(sourceQuery)(firstBatch ++ clockMove ++ secondBatch ++ clockMove2 ++ thirdBatch: _*)
|
||||
}
|
||||
|
||||
test("Verify expected dataframe can be retrieved when upgrading from a directory without" +
|
||||
" metadata") {
|
||||
import testImplicits._
|
||||
val eventHubsParameters = buildEventHubsParamters("ns1", "eh1", 2, 30)
|
||||
val eventPayloadsAndProperties = generateIntKeyedData(1000)
|
||||
EventHubsTestUtilities.simulateEventHubs(eventHubsParameters,
|
||||
eventPayloadsAndProperties.take(30 * 10 * 2))
|
||||
val sourceQuery = generateInputQuery(eventHubsParameters, spark)
|
||||
val manualClock = new StreamManualClock
|
||||
val firstBatch = Seq(
|
||||
StartStream(trigger = ProcessingTime(10), triggerClock = manualClock),
|
||||
AddEventHubsData(eventHubsParameters, 9))
|
||||
val clockMove = Array.fill(9)(AdvanceManualClock(10)).toSeq
|
||||
val secondBatch = Seq(
|
||||
CheckAnswer(1 to 600: _*),
|
||||
StopStream(recoverStreamId = true, commitPartialOffset = true,
|
||||
partialType = "nometadata"),
|
||||
StartStream(trigger = ProcessingTime(10), triggerClock = manualClock,
|
||||
additionalConfs = Map("eventhubs.test.newSink" -> "true")),
|
||||
AddEventHubsData(eventHubsParameters, 17, eventPayloadsAndProperties.takeRight(400)))
|
||||
val clockMove2 = Array.fill(8)(AdvanceManualClock(10)).toSeq
|
||||
// in structured streaming, even metadata is not committed, we will be able to skip the
|
||||
// processed data, since we will pinpoint progress file with the recovered batch id
|
||||
val thirdBatch = Seq(CheckAnswer(601 to 1000: _*))
|
||||
testStream(sourceQuery)(firstBatch ++ clockMove ++ secondBatch ++ clockMove2 ++ thirdBatch: _*)
|
||||
}
|
||||
|
||||
test("Verify expected dataframe can be retrieved when metadata is not committed") {
|
||||
import testImplicits._
|
||||
val eventHubsParameters = buildEventHubsParamters("ns1", "eh1", 2, 30)
|
||||
val eventPayloadsAndProperties = generateIntKeyedData(1000)
|
||||
EventHubsTestUtilities.simulateEventHubs(eventHubsParameters,
|
||||
eventPayloadsAndProperties.take(30 * 10 * 2))
|
||||
val sourceQuery = generateInputQuery(eventHubsParameters, spark)
|
||||
val manualClock = new StreamManualClock
|
||||
val firstBatch = Seq(
|
||||
StartStream(trigger = ProcessingTime(10), triggerClock = manualClock),
|
||||
AddEventHubsData(eventHubsParameters, 9))
|
||||
val clockMove = Array.fill(9)(AdvanceManualClock(10)).toSeq
|
||||
val secondBatch = Seq(
|
||||
CheckAnswer(1 to 600: _*),
|
||||
StopStream(recoverStreamId = true, commitPartialOffset = true,
|
||||
partialType = "deletemetadata"),
|
||||
StartStream(trigger = ProcessingTime(10), triggerClock = manualClock,
|
||||
additionalConfs = Map("eventhubs.test.newSink" -> "true")),
|
||||
AddEventHubsData(eventHubsParameters, 17, eventPayloadsAndProperties.takeRight(400)))
|
||||
val clockMove2 = Array.fill(8)(AdvanceManualClock(10)).toSeq
|
||||
// in structured streaming, even metadata is not committed, we will be able to skip the
|
||||
// processed data, since we will pinpoint progress file with the recovered batch id
|
||||
val thirdBatch = Seq(CheckAnswer(601 to 1000: _*))
|
||||
testStream(sourceQuery)(firstBatch ++ clockMove ++ secondBatch ++ clockMove2 ++ thirdBatch: _*)
|
||||
}
|
||||
|
|
|
@ -37,7 +37,9 @@ import org.scalatest.time.Span
|
|||
import org.scalatest.time.SpanSugar._
|
||||
|
||||
import org.apache.spark.DebugFilesystem
|
||||
import org.apache.spark.eventhubscommon.EventHubsConnector
|
||||
import org.apache.spark.eventhubscommon.client.EventHubsOffsetTypes.EventHubsOffsetType
|
||||
import org.apache.spark.eventhubscommon.progress.ProgressTrackerBase
|
||||
import org.apache.spark.eventhubscommon.utils._
|
||||
import org.apache.spark.sql.{Dataset, Encoder, QueryTest, Row}
|
||||
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
|
||||
|
@ -406,18 +408,30 @@ trait EventHubsStreamTest extends QueryTest with BeforeAndAfter
|
|||
sources.head
|
||||
}
|
||||
|
||||
def createBrokenProgressFile(pathStr: String, timestamp: Long, brokenType: String): Unit = {
|
||||
val path = new Path(pathStr)
|
||||
val fs = path.getFileSystem(new Configuration())
|
||||
if (brokenType == "partial") {
|
||||
val fsos = fs.create(new Path(pathStr + s"/progress-$timestamp"), true)
|
||||
// TODO: separate createBrokenProgressFile from StopStream action
|
||||
def createBrokenProgressFile(
|
||||
progressTracker: ProgressTrackerBase[_ <: EventHubsConnector],
|
||||
timestamp: Long,
|
||||
brokenType: String): Unit = {
|
||||
val progressDir = progressTracker.progressDirectoryPath.toString
|
||||
val metadataDir = progressTracker.progressMetadataDirectoryPath.toString
|
||||
val progressFilePath = new Path(s"$progressDir/progress-$timestamp")
|
||||
val metadataFilePath = new Path(s"$metadataDir/$timestamp")
|
||||
val fs = progressFilePath.getFileSystem(new Configuration())
|
||||
if (brokenType == "delete") {
|
||||
fs.delete(progressFilePath, true)
|
||||
fs.delete(metadataFilePath, true)
|
||||
} else if (brokenType == "deletemetadata") {
|
||||
fs.delete(metadataFilePath, true)
|
||||
} else if (brokenType == "partial" ) {
|
||||
fs.delete(progressFilePath, true)
|
||||
fs.delete(metadataFilePath, true)
|
||||
val fsos = fs.create(progressFilePath)
|
||||
fsos.writeBytes(s"$timestamp ns1_eh1_23 eh1 1 499 499")
|
||||
fsos.close()
|
||||
} else if (brokenType == "delete") {
|
||||
fs.delete(new Path(pathStr + s"/progress-$timestamp"), true)
|
||||
} else if (brokenType == "nometadata") {
|
||||
fs.delete(new Path(metadataDir), true)
|
||||
} else {
|
||||
throw new Exception(s"unrecognizable partial type $brokenType")
|
||||
throw new Exception(s"unrecognizable broken type $brokenType")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -530,8 +544,10 @@ trait EventHubsStreamTest extends QueryTest with BeforeAndAfter
|
|||
if (commitPartialOffset) {
|
||||
val source = searchCurrentSource()
|
||||
val progressTracker = StructuredStreamingProgressTracker.getInstance(source.uid)
|
||||
createBrokenProgressFile(progressTracker.progressDirectoryPath.toString,
|
||||
source.committedOffsetsAndSeqNums.batchId + 1, partialType)
|
||||
source.collectFinishedBatchOffsetsAndCommit(
|
||||
source.committedOffsetsAndSeqNums.batchId + 1)
|
||||
createBrokenProgressFile(progressTracker,
|
||||
source.committedOffsetsAndSeqNums.batchId, partialType)
|
||||
}
|
||||
verify(!currentStream.microBatchThread.isAlive,
|
||||
s"microbatch thread not stopped")
|
||||
|
|
|
@ -200,7 +200,8 @@ trait CheckpointAndProgressTrackerTestSuiteBase extends EventHubTestSuiteBase {
|
|||
operation: EventHubDirectDStream => DStream[V],
|
||||
expectedOutputBeforeRestart: Seq[Seq[V]],
|
||||
expectedOutputAfterRestart: Seq[Seq[V]],
|
||||
useSetFlag: Boolean = false) {
|
||||
useSetFlag: Boolean = false,
|
||||
directoryToClean: Option[Path] = None) {
|
||||
|
||||
require(ssc.conf.get("spark.streaming.clock") === classOf[ManualClock].getName,
|
||||
"Cannot run test without manual clock in the conf")
|
||||
|
@ -208,6 +209,10 @@ trait CheckpointAndProgressTrackerTestSuiteBase extends EventHubTestSuiteBase {
|
|||
runStopAndRecover(input, eventhubsParams, expectedStartingOffsetsAndSeqs,
|
||||
expectedOffsetsAndSeqs, operation, expectedOutputBeforeRestart, useSetFlag = useSetFlag)
|
||||
|
||||
if (directoryToClean.isDefined) {
|
||||
fs.delete(directoryToClean.get, true)
|
||||
}
|
||||
|
||||
// Restart and complete the computation from checkpoint file
|
||||
logInfo(
|
||||
"\n-------------------------------------------\n" +
|
||||
|
|
|
@ -252,6 +252,40 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
|
|||
expectedOutputAfterRestart)
|
||||
}
|
||||
|
||||
test("recover from a progress directory where has no metadata record") {
|
||||
val input = Seq(
|
||||
Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
|
||||
Seq(4, 5, 6, 7, 8, 9, 10, 1, 2, 3),
|
||||
Seq(7, 8, 9, 1, 2, 3, 4, 5, 6, 7))
|
||||
val expectedOutputBeforeRestart = Seq(
|
||||
Seq(2, 3, 5, 6, 8, 9), Seq(4, 5, 7, 8, 10, 2), Seq(6, 7, 9, 10, 3, 4))
|
||||
val expectedOutputAfterRestart = Seq(
|
||||
Seq(6, 7, 9, 10, 3, 4), Seq(8, 9, 11, 2, 5, 6), Seq(10, 11, 3, 4, 7, 8), Seq())
|
||||
|
||||
testCheckpointedOperation(
|
||||
input,
|
||||
eventhubsParams = Map[String, Map[String, String]](
|
||||
"eh1" -> Map(
|
||||
"eventhubs.partition.count" -> "3",
|
||||
"eventhubs.maxRate" -> "2",
|
||||
"eventhubs.name" -> "eh1")
|
||||
),
|
||||
expectedStartingOffsetsAndSeqs = Map(eventhubNamespace ->
|
||||
OffsetRecord(2000L, Map(EventHubNameAndPartition("eh1", 0) -> (3L, 3L),
|
||||
EventHubNameAndPartition("eh1", 1) -> (3L, 3L),
|
||||
EventHubNameAndPartition("eh1", 2) -> (3L, 3L))
|
||||
)),
|
||||
expectedOffsetsAndSeqs = OffsetRecord(3000L,
|
||||
Map(EventHubNameAndPartition("eh1", 0) -> (5L, 5L),
|
||||
EventHubNameAndPartition("eh1", 1) -> (5L, 5L),
|
||||
EventHubNameAndPartition("eh1", 2) -> (5L, 5L))),
|
||||
operation = (inputDStream: EventHubDirectDStream) =>
|
||||
inputDStream.map(eventData => eventData.getProperties.get("output").asInstanceOf[Int] + 1),
|
||||
expectedOutputBeforeRestart,
|
||||
expectedOutputAfterRestart,
|
||||
directoryToClean = Some(progressTracker.progressMetadataDirectoryPath))
|
||||
}
|
||||
|
||||
|
||||
test("recover from progress after updating code (no checkpoint provided)") {
|
||||
val input = Seq(
|
||||
|
@ -432,6 +466,7 @@ class ProgressTrackingAndCheckpointSuite extends CheckpointAndProgressTrackerTes
|
|||
// simulate commit fail
|
||||
val fs = FileSystem.get(new Configuration())
|
||||
fs.delete(new Path(progressRootPath.toString + s"/$appName/progress-3000"), true)
|
||||
fs.delete(new Path(progressRootPath.toString + s"/${appName}_metadata/3000"), true)
|
||||
|
||||
ssc = StreamingContext.getOrCreate(currentCheckpointDirectory,
|
||||
() => createContextForCheckpointOperation(batchDuration, checkpointDirectory))
|
||||
|
|
|
@ -49,6 +49,9 @@ private[spark] trait SharedUtils extends FunSuite with BeforeAndAfterEach {
|
|||
|
||||
override def afterEach(): Unit = {
|
||||
reset()
|
||||
// cannot do it in reset() for test like "recover from progress after updating code
|
||||
// (no checkpoint provided) "
|
||||
fs.delete(progressRootPath, true)
|
||||
}
|
||||
|
||||
def batchDuration: Duration = Seconds(5)
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.spark.streaming.eventhubs.checkpoint
|
||||
|
||||
import java.nio.file.{Files, Paths, StandardOpenOption}
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
|
@ -52,6 +51,35 @@ class ProgressTrackerSuite extends SharedUtils {
|
|||
DirectDStreamProgressTracker.reset()
|
||||
}
|
||||
|
||||
private def writeProgressFile(
|
||||
progressPath: String,
|
||||
streamId: Int,
|
||||
fs: FileSystem,
|
||||
timestamp: Long,
|
||||
namespace: String,
|
||||
ehName: String,
|
||||
partitionRange: Range,
|
||||
offset: Int,
|
||||
seq: Int): Unit = {
|
||||
for (partitionId <- partitionRange) {
|
||||
Files.write(
|
||||
Paths.get(progressPath + s"/${PathTools.progressFileNamePattern(timestamp)}"),
|
||||
(ProgressRecord(timestamp, namespace, ehName, partitionId, offset,
|
||||
seq).toString + "\n").getBytes, {
|
||||
if (Files.exists(Paths.get(progressPath +
|
||||
s"/${PathTools.progressFileNamePattern(timestamp)}"))) {
|
||||
StandardOpenOption.APPEND
|
||||
} else {
|
||||
StandardOpenOption.CREATE
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
private def createMetadataFile(fs: FileSystem, metadataPath: String, timestamp: Long): Unit = {
|
||||
fs.create(new Path(s"$metadataPath/${PathTools.progressMetadataNamePattern(timestamp)}"))
|
||||
}
|
||||
|
||||
test("progress temp directory is created properly when progress and progress temp" +
|
||||
" directory do not exist") {
|
||||
progressTracker = DirectDStreamProgressTracker.initInstance(progressRootPath.toString, appName,
|
||||
|
@ -84,30 +112,6 @@ class ProgressTrackerSuite extends SharedUtils {
|
|||
assert(filesAfter.size === 0)
|
||||
}
|
||||
|
||||
private def writeProgressFile(
|
||||
progressPath: String,
|
||||
streamId: Int,
|
||||
fs: FileSystem,
|
||||
timestamp: Long,
|
||||
namespace: String,
|
||||
ehName: String,
|
||||
partitionRange: Range,
|
||||
offset: Int,
|
||||
seq: Int): Unit = {
|
||||
for (partitionId <- partitionRange) {
|
||||
Files.write(
|
||||
Paths.get(progressPath + s"/progress-$timestamp"),
|
||||
(ProgressRecord(timestamp, namespace, ehName, partitionId, offset,
|
||||
seq).toString + "\n").getBytes, {
|
||||
if (Files.exists(Paths.get(progressPath + s"/progress-$timestamp"))) {
|
||||
StandardOpenOption.APPEND
|
||||
} else {
|
||||
StandardOpenOption.CREATE
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
test("incomplete progress would be discarded") {
|
||||
createDirectStreams(ssc, "namespace1", progressRootPath.toString,
|
||||
Map("eh1" -> Map("eventhubs.partition.count" -> "1"),
|
||||
|
@ -115,11 +119,16 @@ class ProgressTrackerSuite extends SharedUtils {
|
|||
"eh3" -> Map("eventhubs.partition.count" -> "3")))
|
||||
val progressPath = PathTools.progressDirPathStr(progressRootPath.toString, appName)
|
||||
fs.mkdirs(new Path(progressPath))
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1", 0 to 0, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh2", 0 to 1, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh3", 0 to 2, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1", "eh1", 0 to 0, 1, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1", "eh2", 0 to 1, 1, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1",
|
||||
"eh1", 0 to 0, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1",
|
||||
"eh2", 0 to 1, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1",
|
||||
"eh3", 0 to 2, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1",
|
||||
"eh1", 0 to 0, 1, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1",
|
||||
"eh2", 0 to 1, 1, 1)
|
||||
progressTracker = DirectDStreamProgressTracker.initInstance(progressRootPath.toString, appName,
|
||||
new Configuration())
|
||||
assert(!fs.exists(new Path(progressPath.toString + "/progress-2000")))
|
||||
|
@ -130,9 +139,12 @@ class ProgressTrackerSuite extends SharedUtils {
|
|||
// create direct streams, generate 6 EventHubAndPartitions
|
||||
val progressPath = PathTools.progressDirPathStr(progressRootPath.toString, appName)
|
||||
fs.mkdirs(new Path(progressPath))
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1", 0 to 0, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh2", 0 to 1, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh3", 0 to 2, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1",
|
||||
0 to 0, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh2",
|
||||
0 to 1, 0, 0)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh3",
|
||||
0 to 2, 0, 0)
|
||||
progressTracker = DirectDStreamProgressTracker.initInstance(progressRootPath.toString, appName,
|
||||
new Configuration())
|
||||
assert(fs.exists(new Path(progressPath.toString + "/progress-1000")))
|
||||
|
@ -181,12 +193,18 @@ class ProgressTrackerSuite extends SharedUtils {
|
|||
// generate 6 EventHubAndPartitions
|
||||
val progressPath = PathTools.progressDirPathStr(progressRootPath.toString, appName)
|
||||
fs.mkdirs(new Path(progressPath))
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1", 0 to 0, 0, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh2", 0 to 1, 0, 2)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh3", 0 to 2, 0, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh11", 0 to 0, 1, 2)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh12", 0 to 1, 2, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh13", 0 to 2, 3, 4)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1",
|
||||
0 to 0, 0, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh2",
|
||||
0 to 1, 0, 2)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh3",
|
||||
0 to 2, 0, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh11",
|
||||
0 to 0, 1, 2)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh12",
|
||||
0 to 1, 2, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh13",
|
||||
0 to 2, 3, 4)
|
||||
|
||||
progressTracker = DirectDStreamProgressTracker.initInstance(progressRootPath.toString, appName,
|
||||
new Configuration())
|
||||
|
@ -206,17 +224,23 @@ class ProgressTrackerSuite extends SharedUtils {
|
|||
progressTracker = DirectDStreamProgressTracker.initInstance(progressRootPath.toString, appName,
|
||||
new Configuration())
|
||||
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1", 0 to 0, 0, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh2", 0 to 1, 0, 2)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh3", 0 to 2, 0, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh11", 0 to 0, 1, 2)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1",
|
||||
0 to 0, 0, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh2",
|
||||
0 to 1, 0, 2)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh3",
|
||||
0 to 2, 0, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh11",
|
||||
0 to 0, 1, 2)
|
||||
// write wrong record
|
||||
Files.write(
|
||||
Paths.get(progressPath + s"/progress-1000"),
|
||||
(ProgressRecord(2000L, "namespace2", "eh12", 0, 2, 3).toString + "\n").getBytes,
|
||||
StandardOpenOption.APPEND)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh12", 1 to 1, 2, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh13", 0 to 2, 3, 4)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh12",
|
||||
1 to 1, 2, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh13",
|
||||
0 to 2, 3, 4)
|
||||
|
||||
intercept[IllegalArgumentException] {
|
||||
progressTracker.asInstanceOf[DirectDStreamProgressTracker].read("namespace2", 1000L,
|
||||
|
@ -332,28 +356,46 @@ class ProgressTrackerSuite extends SharedUtils {
|
|||
fs.mkdirs(new Path(progressPath))
|
||||
|
||||
// 1000
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1", 0 to 0, 0, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh2", 0 to 1, 0, 2)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh3", 0 to 2, 0, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh11", 0 to 0, 1, 2)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh12", 0 to 1, 2, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh13", 0 to 2, 3, 4)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1",
|
||||
0 to 0, 0, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh2",
|
||||
0 to 1, 0, 2)
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh3",
|
||||
0 to 2, 0, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh11",
|
||||
0 to 0, 1, 2)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh12",
|
||||
0 to 1, 2, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 1000L, "namespace2", "eh13",
|
||||
0 to 2, 3, 4)
|
||||
|
||||
// 2000
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1", "eh1", 0 to 0, 1, 2)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1", "eh2", 0 to 1, 1, 3)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1", "eh3", 0 to 2, 1, 4)
|
||||
writeProgressFile(progressPath, 1, fs, 2000L, "namespace2", "eh11", 0 to 0, 2, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 2000L, "namespace2", "eh12", 0 to 1, 3, 4)
|
||||
writeProgressFile(progressPath, 1, fs, 2000L, "namespace2", "eh13", 0 to 2, 4, 5)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1", "eh1",
|
||||
0 to 0, 1, 2)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1", "eh2",
|
||||
0 to 1, 1, 3)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1", "eh3",
|
||||
0 to 2, 1, 4)
|
||||
writeProgressFile(progressPath, 1, fs, 2000L, "namespace2", "eh11",
|
||||
0 to 0, 2, 3)
|
||||
writeProgressFile(progressPath, 1, fs, 2000L, "namespace2", "eh12",
|
||||
0 to 1, 3, 4)
|
||||
writeProgressFile(progressPath, 1, fs, 2000L, "namespace2", "eh13",
|
||||
0 to 2, 4, 5)
|
||||
|
||||
// 3000
|
||||
writeProgressFile(progressPath, 0, fs, 3000L, "namespace1", "eh1", 0 to 0, 2, 3)
|
||||
writeProgressFile(progressPath, 0, fs, 3000L, "namespace1", "eh2", 0 to 1, 2, 4)
|
||||
writeProgressFile(progressPath, 0, fs, 3000L, "namespace1", "eh3", 0 to 2, 2, 5)
|
||||
writeProgressFile(progressPath, 1, fs, 3000L, "namespace2", "eh11", 0 to 0, 3, 4)
|
||||
writeProgressFile(progressPath, 1, fs, 3000L, "namespace2", "eh12", 0 to 1, 4, 5)
|
||||
writeProgressFile(progressPath, 1, fs, 3000L, "namespace2", "eh13", 0 to 2, 5, 6)
|
||||
writeProgressFile(progressPath, 0, fs, 3000L, "namespace1", "eh1",
|
||||
0 to 0, 2, 3)
|
||||
writeProgressFile(progressPath, 0, fs, 3000L, "namespace1", "eh2",
|
||||
0 to 1, 2, 4)
|
||||
writeProgressFile(progressPath, 0, fs, 3000L, "namespace1", "eh3",
|
||||
0 to 2, 2, 5)
|
||||
writeProgressFile(progressPath, 1, fs, 3000L, "namespace2", "eh11",
|
||||
0 to 0, 3, 4)
|
||||
writeProgressFile(progressPath, 1, fs, 3000L, "namespace2", "eh12",
|
||||
0 to 1, 4, 5)
|
||||
writeProgressFile(progressPath, 1, fs, 3000L, "namespace2", "eh13",
|
||||
0 to 2, 5, 6)
|
||||
|
||||
// if latest timestamp is earlier than the specified timestamp, we shall return the latest
|
||||
// offsets
|
||||
|
@ -379,6 +421,54 @@ class ProgressTrackerSuite extends SharedUtils {
|
|||
verifyProgressFile("namespace2", "eh11", 0 to 0, 2000L, Seq((1, 2)))
|
||||
verifyProgressFile("namespace2", "eh12", 0 to 1, 2000L, Seq((2, 3), (2, 3)))
|
||||
verifyProgressFile("namespace2", "eh13", 0 to 2, 2000L, Seq((3, 4), (3, 4), (3, 4)))
|
||||
}
|
||||
|
||||
test("read progress file correctly when metadata exists") {
|
||||
progressTracker = DirectDStreamProgressTracker.initInstance(progressRootPath.toString,
|
||||
appName, new Configuration())
|
||||
val progressPath = PathTools.progressDirPathStr(progressRootPath.toString, appName)
|
||||
fs.mkdirs(new Path(progressPath))
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1",
|
||||
0 to 0, 0, 1)
|
||||
val metadataPath = PathTools.progressMetadataDirPathStr(progressRootPath.toString, appName)
|
||||
createMetadataFile(fs, metadataPath, 1000L)
|
||||
val result = progressTracker.read("namespace1", 1000, fallBack = true)
|
||||
assert(result.timestamp == 1000L)
|
||||
assert(result.offsets == Map(EventHubNameAndPartition("eh1", 0) -> (0, 1)))
|
||||
}
|
||||
|
||||
test("ProgressTracker does query metadata when metadata exists") {
|
||||
progressTracker = DirectDStreamProgressTracker.initInstance(progressRootPath.toString,
|
||||
appName, new Configuration())
|
||||
val progressPath = PathTools.progressDirPathStr(progressRootPath.toString, appName)
|
||||
fs.mkdirs(new Path(progressPath))
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1",
|
||||
0 to 0, 0, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1", "eh1",
|
||||
0 to 0, 0, 1)
|
||||
val metadataPath = PathTools.progressMetadataDirPathStr(progressRootPath.toString, appName)
|
||||
createMetadataFile(fs, metadataPath, 1000L)
|
||||
createMetadataFile(fs, metadataPath, 2000L)
|
||||
val (sourceOfLatestFile, result) = progressTracker.getLatestFile(fs)
|
||||
assert(sourceOfLatestFile === 0)
|
||||
assert(result.isDefined)
|
||||
assert(result.get.getName === "progress-2000")
|
||||
}
|
||||
|
||||
test("When metadata presents, we should respect it even the more recent progress file is there") {
|
||||
progressTracker = DirectDStreamProgressTracker.initInstance(progressRootPath.toString,
|
||||
appName, new Configuration())
|
||||
val progressPath = PathTools.progressDirPathStr(progressRootPath.toString, appName)
|
||||
fs.mkdirs(new Path(progressPath))
|
||||
writeProgressFile(progressPath, 0, fs, 1000L, "namespace1", "eh1",
|
||||
0 to 0, 0, 1)
|
||||
writeProgressFile(progressPath, 0, fs, 2000L, "namespace1", "eh1",
|
||||
0 to 0, 0, 1)
|
||||
val metadataPath = PathTools.progressMetadataDirPathStr(progressRootPath.toString, appName)
|
||||
createMetadataFile(fs, metadataPath, 1000L)
|
||||
val (sourceOfLatestFile, result) = progressTracker.getLatestFile(fs)
|
||||
assert(sourceOfLatestFile === 0)
|
||||
assert(result.isDefined)
|
||||
assert(result.get.getName === "progress-1000")
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче