support parallelly pre-fetch data for DynamicPartitionKafkaInputDStream

This commit is contained in:
tqin 2016-06-22 22:40:00 -07:00
Родитель 47723f34f2
Коммит 0359d66548
10 изменённых файлов: 390 добавлений и 62 удалений

Просмотреть файл

@ -22,7 +22,8 @@ namespace Microsoft.Spark.CSharp.Proxy
IDStreamProxy SocketTextStream(string hostname, int port, StorageLevelType storageLevelType);
IDStreamProxy KafkaStream(Dictionary<string, int> topics, Dictionary<string, string> kafkaParams, StorageLevelType storageLevelType);
IDStreamProxy DirectKafkaStream(List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets);
IDStreamProxy DirectKafkaStreamWithRepartition(List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets, int numPartitions);
IDStreamProxy DirectKafkaStreamWithRepartition(List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets,
int numPartitions, byte[] readFunc, string serializationMode);
IDStreamProxy Union(IDStreamProxy firstDStreams, IDStreamProxy[] otherDStreams);
void AwaitTermination();
void AwaitTerminationOrTimeout(long timeout);

Просмотреть файл

@ -212,7 +212,8 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
return new DStreamIpcProxy(jstream);
}
public IDStreamProxy DirectKafkaStreamWithRepartition(List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets, int numPartitions)
public IDStreamProxy DirectKafkaStreamWithRepartition(List<string> topics, Dictionary<string, string> kafkaParams,
Dictionary<string, long> fromOffsets, int numPartitions, byte[] readFunc, string serializationMode)
{
JvmObjectReference jtopics = JvmBridgeUtils.GetJavaSet<string>(topics);
JvmObjectReference jkafkaParams = JvmBridgeUtils.GetJavaMap<string, string>(kafkaParams);
@ -228,7 +229,8 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
JvmObjectReference jfromOffsets = JvmBridgeUtils.GetJavaMap<JvmObjectReference, long>(jTopicAndPartitions);
// SparkCLR\scala\src\main\org\apache\spark\streaming\api\kafka\KafkaUtilsCSharpHelper.scala
JvmObjectReference jhelper = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.kafka.KafkaUtilsCSharpHelper", new object[] { });
var jstream = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jhelper, "createDirectStreamWithoutMessageHandler", new object[] { jvmJavaStreamingReference, jkafkaParams, jtopics, jfromOffsets, (int)numPartitions }).ToString());
var jstream = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jhelper, "createDirectStreamWithoutMessageHandler",
new object[] { jvmJavaStreamingReference, jkafkaParams, jtopics, jfromOffsets, (int)numPartitions, readFunc, serializationMode }).ToString());
return new DStreamIpcProxy(jstream);
}

Просмотреть файл

@ -6,6 +6,8 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using Microsoft.Spark.CSharp.Core;
@ -118,7 +120,55 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <returns>A DStream object</returns>
public static DStream<KeyValuePair<byte[], byte[]>> CreateDirectStreamWithRepartition(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets, int numPartitions = -1)
{
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions), ssc, SerializedMode.Pair);
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, null, null), ssc, SerializedMode.Pair);
}
/// <summary>
/// Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
///
/// This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
/// in each batch duration and processed without storing.
///
/// This does not use Zookeeper to store offsets. The consumed offsets are tracked
/// by the stream itself. For interoperability with Kafka monitoring tools that depend on
/// Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
/// You can access the offsets used in each batch from the generated RDDs (see
/// [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
/// To recover from driver failures, you have to enable checkpointing in the StreamingContext.
/// The information on consumed offset can be recovered from the checkpoint.
/// See the programming guide for details (constraints, etc.).
///
/// </summary>
/// <param name="ssc">Spark Streaming Context</param>
/// <param name="topics">list of topic_name to consume.</param>
/// <param name="kafkaParams">
/// Additional params for Kafka. Requires "metadata.broker.list" or "bootstrap.servers" to be set
/// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
/// </param>
/// <param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
/// <param name="numPartitions">
/// user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions,
/// unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across
/// a probably larger number of RDD partitions
/// If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined
/// If numPartitions = 0, repartition using original kafka partition count
/// If numPartitions > 0, repartition using this parameter
/// </param>
/// <param name="readFunc">user function to process the kafka data.</param>
/// <returns>A DStream object</returns>
public static DStream<T> CreateDirectStreamWithRepartitionAndReadFunc<T>(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets,
int numPartitions, Func<int, IEnumerable<KeyValuePair<byte[], byte[]>>, IEnumerable<T>> readFunc)
{
var mapPartitionsWithIndexHelper = new MapPartitionsWithIndexHelper<KeyValuePair<byte[], byte[]>, T>(readFunc, true);
var transformHelper = new TransformHelper<KeyValuePair<byte[], byte[]>, T>(mapPartitionsWithIndexHelper.Execute);
var transformDynamicHelper = new TransformDynamicHelper<KeyValuePair<byte[], byte[]>, T>(transformHelper.Execute);
Func<double, RDD<dynamic>, RDD<dynamic>> func = transformDynamicHelper.Execute;
var formatter = new BinaryFormatter();
var stream = new MemoryStream();
formatter.Serialize(stream, func);
byte[] readFuncBytes = stream.ToArray();
string serializationMode = SerializedMode.Pair.ToString();
return new DStream<T>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, readFuncBytes, serializationMode), ssc);
}
}
}

Просмотреть файл

@ -49,7 +49,8 @@ namespace AdapterTest.Mocks
return new MockDStreamProxy();
}
public IDStreamProxy DirectKafkaStreamWithRepartition(List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets, int numPartitions)
public IDStreamProxy DirectKafkaStreamWithRepartition(List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets,
int numPartitions, byte[] readFunc, string serializationMode)
{
return new MockDStreamProxy();
}

Просмотреть файл

@ -40,6 +40,14 @@ namespace AdapterTest
var directKafkaStreamWithRepartition = KafkaUtils.CreateDirectStreamWithRepartition(ssc, new List<string> { "testTopic3" }, new Dictionary<string, string>(), new Dictionary<string, long>(), 10);
Assert.IsNotNull(directKafkaStreamWithRepartition.DStreamProxy);
var directKafkaStreamWithRepartitionAndReadFunc = KafkaUtils.CreateDirectStreamWithRepartitionAndReadFunc(
ssc,
new List<string> { "testTopic3" },
new Dictionary<string, string>(), new Dictionary<string, long>(),
10,
(int pid, IEnumerable<KeyValuePair<byte[], byte[]>> input) => { return input;});
Assert.IsNotNull(directKafkaStreamWithRepartitionAndReadFunc.DStreamProxy);
var union = ssc.Union(textFile, socketStream);
Assert.IsNotNull(union.DStreamProxy);

Просмотреть файл

@ -4,6 +4,8 @@
|Worker |spark.mobius.CSharpWorker.maxProcessCount |Sets max number of C# worker processes in Spark executors |
|Streaming (Kafka) |spark.mobius.streaming.kafka.CSharpReader.enabled |Enables use of C# Kafka reader in Mobius streaming applications |
|Streaming (Kafka) |spark.mobius.streaming.kafka.maxMessagesPerTask.&lt;topicName&gt; |Sets the max number of messages per RDD partition created from specified Kafka topic to uniformly spread load across tasks that process them |
|Streaming (Kafka) |spark.mobius.streaming.kafka.fetchRate |Set the number of Kafka metadata fetch operation per batch |
|Streaming (Kafka) |spark.mobius.streaming.kafka.numReceivers |Set the number of threads used to materialize the RDD created by applying the user read function to the original KafkaRDD. |
|Streaming (UpdateStateByKey) |spark.mobius.streaming.parallelJobs |Sets 0-based max number of parallel jobs for UpdateStateByKey so that next N batches can start its tasks on time even if previous batch not completed yet. default: 0, recommended: 1. It's a special version of spark.streaming.concurrentJobs which does not observe UpdateStateByKey's state ordering properly |

Просмотреть файл

@ -5,16 +5,19 @@
package org.apache.spark.streaming.api.csharp
import org.apache.spark.api.csharp._
import org.apache.spark.api.csharp.SerDe._
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.language.existentials
import java.io.DataInputStream
import java.io.DataOutputStream
import java.net.Socket
import java.util.{ArrayList => JArrayList}
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.{LinkedBlockingQueue, ConcurrentHashMap, ThreadPoolExecutor}
import scala.collection.JavaConversions._
import scala.language.existentials
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.api.csharp._
import org.apache.spark.api.csharp.SerDe._
import org.apache.spark.api.java._
import org.apache.spark.rdd._
import org.apache.spark.storage.StorageLevel
@ -343,3 +346,147 @@ class CSharpConstantInputDStream(ssc_ : StreamingContext, rdd: RDD[Array[Byte]])
val asJavaDStream: JavaDStream[Array[Byte]] = JavaDStream.fromDStream(this)
}
case class RddPreComputeRecord[T] (
rddSeqNum: Long,
rdd: RDD[T])
/**
* Used to pre-compute and materialize the input RDDs.
*/
class RddPreComputeProcessor[T](
sc: SparkContext,
val id: String,
val maxPendingJobs: Int,
val numThreads: Int,
val outputLimit: Int, // maximum number of records for each output
val storageLevel: StorageLevel) extends Logging {
// Thread pool is used to pre-compute RDDs. To preserve order, SeqNum is assigned to RDDs at
// input, so we can reassemble them at output
@volatile private var inputRddSeqNum: Long = 0L
@volatile private var outputRddSeqNum: Long = 0L
// ackedSeqNum is used for backpressure
@volatile private var ackedSeqNum: Long = -1L
private val precomputedRddMap = new ConcurrentHashMap[Long, RddPreComputeRecord[T]]()
// If RDD is not pre-computed because of backpressure, it is put in this map
private val bypassRddMap = new ConcurrentHashMap[Long, RddPreComputeRecord[T]]()
// If RDD is not pre-computed because of backpressure, it is also put in this queue for future process
private val pendingRddQueue = new LinkedBlockingQueue[RddPreComputeRecord[T]]()
private var jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numThreads, s"mobius-precompute-job-executor-$id")
private val pendingQueueScheduler = ThreadUtils.newDaemonSingleThreadExecutor(s"mobius-precompute-pending-scheduler-$id")
private def isPending(rddSeqNum: Long): Boolean = {
rddSeqNum > ackedSeqNum + maxPendingJobs
}
private def logStatus(): Unit = {
logInfo(s"log status for [$id], inputRddSeqNum: $inputRddSeqNum, ackedSeqNum: $ackedSeqNum, outputRddSeqNum: $outputRddSeqNum")
}
private def processPendingQueue(): Unit = {
while (true) {
val record = pendingRddQueue.take()
logInfo(s"process pending queue [$id], rddSeqNum: ${record.rddSeqNum}")
logStatus()
while (isPending(record.rddSeqNum)) {
Thread.sleep(100)
}
logStatus()
logInfo(s"submit job from pending queue [$id], rddSeqNum: ${record.rddSeqNum}")
jobExecutor.execute(new Runnable {
override def run(): Unit = {
record.rdd.persist(storageLevel)
sc.setCallSite(s"materialize pending RDD for [$id]")
sc.runJob(record.rdd, (iterator: Iterator[T]) => {})
}
})
}
}
// put input RDD to processor
def put(rdd: RDD[T]): Unit = {
val rddSeqNum = inputRddSeqNum
inputRddSeqNum += 1
val record = RddPreComputeRecord[T](rddSeqNum, rdd)
logInfo(s"put input record [$id], rddSeqNum: ${record.rddSeqNum}")
logStatus()
if (isPending(record.rddSeqNum)) {
bypassRddMap.put(record.rddSeqNum, record)
pendingRddQueue.put(record)
} else {
logInfo(s"submit job [$id], rddSeqNum: ${record.rddSeqNum}")
jobExecutor.execute(new Runnable {
override def run(): Unit = {
record.rdd.persist(storageLevel)
sc.setCallSite(s"materialize RDD for [$id]")
sc.runJob(record.rdd, (iterator: Iterator[T]) => {})
precomputedRddMap.put(record.rddSeqNum, record)
}
})
}
}
// get output RDDs from processor
def get(): List[RddPreComputeRecord[T]] = {
val result = new ListBuffer[RddPreComputeRecord[T]]()
var stop = false
var outputNum = 0
while (!stop) {
val rddSeqNum = outputRddSeqNum
if (precomputedRddMap.containsKey(rddSeqNum)) {
result += precomputedRddMap.remove(rddSeqNum)
outputRddSeqNum += 1
outputNum += 1
if (outputNum > outputLimit) {
stop = true
}
} else {
stop = true
}
}
if (result.isEmpty) {
stop = false
outputNum = 0
while (!stop) {
val rddSeqNum = outputRddSeqNum
if (bypassRddMap.containsKey(rddSeqNum)) {
result += bypassRddMap.remove(rddSeqNum)
outputRddSeqNum += 1
outputNum += 1
if (outputNum > outputLimit) {
stop = true
}
} else {
stop = true
}
}
}
result.toList
}
// ack consumed RDDs
def ackRdd(rddSeqNum: Long): Unit = {
if (ackedSeqNum < rddSeqNum) {
ackedSeqNum = rddSeqNum
}
}
def start(): Unit = {
pendingQueueScheduler.execute(new Runnable {
override def run(): Unit = {
processPendingQueue()
}
})
}
def stop(): Unit = {
jobExecutor.shutdown()
pendingQueueScheduler.shutdown()
}
}

Просмотреть файл

@ -5,27 +5,27 @@
package org.apache.spark.streaming.kafka
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.api.csharp.CSharpDStream
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import scala.annotation.tailrec
import scala.collection.mutable
import scala.reflect.ClassTag
import java.util.concurrent.{TimeUnit, ScheduledExecutorService}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit, ScheduledExecutorService}
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.rdd.{EmptyRDD, UnionRDD, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.Logging
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.StreamInputInfo
import org.apache.spark.util.ThreadUtils
import org.apache.spark.streaming.api.csharp.{RddPreComputeRecord, RddPreComputeProcessor, CSharpDStream}
/**
* A stream of the new DynamicPartitionKafkaRDD to support where
* each given Kafka topic/partition can correspond to multiple RDD partitions
@ -82,19 +82,26 @@ class DynamicPartitionKafkaInputDStream[
@transient private var refreshOffsetsScheduler: ScheduledExecutorService = null
// reading metadata of mutilple topics from across multiple data centers takes long time to complete,
// reading metadata of multiple topics from across multiple data centers takes long time to complete,
// which impacts DStream performance and causes UI steaming tab not responsive due to mutex held by DStream
// so a separate thread is introduced to refresh metadata (current offsets) asynchronously at below interval
// this unblocks DStream in above described situation but not quite in sync with batch timestamp,
// which is OK since batches are still generated at the same interval
// the interval is set to half of the batch interval to make sure they're not in sync to block each other
// TODO: configurable as performance tuning option
private val refreshOffsetsInterval = Math.max(slideDuration.milliseconds / 2, 50)
// the default interval is set to half of the batch interval to make sure they're not in sync to block each other
private val fetchRate = context.sparkContext.getConf.getInt("spark.mobius.streaming.kafka.fetchRate", 2)
private val refreshOffsetsInterval = Math.max(slideDuration.milliseconds / fetchRate, 1)
// fromOffsets and untilOffsets for next batch
@transient @volatile var offsetsRangeForNextBatch:
Option[(Map[TopicAndPartition, Long], Map[TopicAndPartition, LeaderOffset])] = None
private val maxPendingJobs = context.sparkContext.getConf.getInt("spark.mobius.streaming.kafka.maxPendingJobs", 5 * fetchRate)
private val numReceivers = context.sparkContext.getConf.getInt("spark.mobius.streaming.kafka.numReceivers", 0)
// The pre-computed RDDs need to be un-persist to release memory. So we need to remember the pre-computed RDDs
// according to time it is referenced.
@transient private[streaming] var preComputeOutputMap: ConcurrentHashMap[Time, List[RddPreComputeRecord[R]]] = null
@transient private var rddPreComputeProcessor: RddPreComputeProcessor[R] = null
private def refreshPartitions(): Unit = {
if (!hasFetchedAllPartitions) {
@ -224,8 +231,7 @@ class DynamicPartitionKafkaInputDStream[
}
}
val leaderOffsets = getLatestLeaderOffsets(topicAndPartitions, maxRetries)
val leaderOffsetCount = leaderOffsets.count(_ => true)
logInfo(s"leaderOffsets count for stream $id: $leaderOffsetCount")
logInfo(s"topicAndPartitions and leaderOffsets size for stream $id: ${topicAndPartitions.size} ${leaderOffsets.size}")
val fromOffsets = resetFromOffsets(leaderOffsets)
val untilOffsets = leaderOffsets
synchronized {
@ -239,11 +245,14 @@ class DynamicPartitionKafkaInputDStream[
if (refreshOffsetsScheduler == null) {
refreshOffsetsScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"refresh-offsets-$id")
logInfo(s"Instantiated refreshOffsetsScheduler successfully for stream $id.")
logInfo(s"Instantiated refreshOffsetsScheduler successfully for stream $id at rate of $refreshOffsetsInterval ms.")
}
refreshOffsetsScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
logInfo(s"starting setOffsetsRangeForNextBatch for stream$id")
setOffsetsRangeForNextBatch()
logInfo(s"finished setOffsetsRangeForNextBatch for stream$id")
doReceive()
}
}, 0, refreshOffsetsInterval, TimeUnit.MILLISECONDS)
}
@ -255,44 +264,69 @@ class DynamicPartitionKafkaInputDStream[
topicAndPartitions = fromOffsets.keySet
currentOffsets = fromOffsets
} else {
if (autoOffsetReset == None) {
if (autoOffsetReset.isEmpty) {
throw new IllegalArgumentException("Must set auto.offset.reset if fromOffsets is empty")
}
}
private def callCSharpTransform(rdd: DynamicPartitionKafkaRDD[K, V, U, T, R], validTime: Time): Option[RDD[R]] = {
if (cSharpFunc == null)
Some(rdd)
else
CSharpDStream.callCSharpTransform(List(Some(rdd)), validTime, cSharpFunc, List(serializationMode)).asInstanceOf[Option[RDD[R]]]
private def callCSharpTransform(rdd: DynamicPartitionKafkaRDD[K, V, U, T, R], validTime: Time): RDD[R] = {
if (cSharpFunc == null) {
rdd
} else {
val csharpRdd = CSharpDStream.callCSharpTransform(List(Some(rdd)), validTime, cSharpFunc, List(serializationMode)).asInstanceOf[Option[RDD[R]]]
if (csharpRdd.isEmpty) {
throw new RuntimeException(s"callCSharpTransform should return some rdd but get None!")
}
csharpRdd.get
}
}
override def compute(validTime: Time): Option[RDD[R]] = {
var offsetsRange:
Option[(Map[TopicAndPartition, Long], Map[TopicAndPartition, LeaderOffset])] = None
synchronized {
offsetsRange = offsetsRangeForNextBatch
private def getOffsetRanges(rdd: RDD[R]): Array[OffsetRange] = {
val rdds = rdd match {
case emptyRdd: EmptyRDD[R] => Seq[RDD[R]]()
case unionRdd: UnionRDD[R] => unionRdd.rdds
case _ => Seq(rdd)
}
rdds.flatMap { rdd =>
val kafkaRdd: RDD[R] = { if (cSharpFunc == null) rdd else rdd.firstParent[R] }
kafkaRdd.asInstanceOf[DynamicPartitionKafkaRDD[K, V, U, T, R]].offsetRanges
}.toArray
}
private def doCompute(validTime: Time): Option[RDD[R]] = {
val offsetsRange = synchronized {
val tmpOffsetsRange = offsetsRangeForNextBatch
offsetsRangeForNextBatch = None
tmpOffsetsRange
}
val (fromOffsets, untilOffsets) = {
offsetsRange match {
case None => // return empty offsets
(Map[TopicAndPartition, Long](), Map[TopicAndPartition, LeaderOffset]())
case Some((from, until)) =>
(from, until)
}
offsetsRange.flatMap({
case (from, until) =>
val rdd = DynamicPartitionKafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, from, until, messageHandler, numPartitions)
Some(callCSharpTransform(rdd, validTime))
})
}
private def initializeReceiver(): Unit = {
if (numReceivers > 0) {
preComputeOutputMap = new ConcurrentHashMap()
rddPreComputeProcessor = new RddPreComputeProcessor[R](
context.sparkContext, s"dstream-$id", maxPendingJobs, numReceivers, fetchRate * 2,
StorageLevel(useDisk = false, useMemory = true, deserialized = true, 3))
rddPreComputeProcessor.start()
}
}
val rdd = DynamicPartitionKafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, fromOffsets, untilOffsets, messageHandler, numPartitions)
val csharpRdd = callCSharpTransform(rdd, validTime)
private def doReceive(): Unit = {
if (numReceivers > 0) {
logInfo(s"starting doReceive for stream$id")
val rddOption = doCompute(new Time(0))
rddOption.foreach(rdd => rddPreComputeProcessor.put(rdd))
}
}
private def reportInputInfo(validTime: Time, offsetRanges: Array[OffsetRange]): Unit = {
// Report the record number and metadata of this batch interval to InputInfoTracker.
val offsetRanges = fromOffsets.map { case (tp, fo) =>
val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
@ -304,14 +338,43 @@ class DynamicPartitionKafkaInputDStream[
val metadata = Map(
"offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
val count = offsetRanges.map(_.count()).sum
val inputInfo = StreamInputInfo(id, count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
logInfo(s"finished reportInputInfo for stream$id of record count: $count")
}
csharpRdd
override def compute(validTime: Time): Option[RDD[R]] = {
val rddOption = if (numReceivers > 0) {
val recordList = rddPreComputeProcessor.get()
preComputeOutputMap.put(validTime, recordList)
val rddList = recordList.map { case RddPreComputeRecord(_, rdd) => rdd }
if (rddList.nonEmpty) Some(context.sparkContext.union(rddList)) else None
} else {
doCompute(validTime)
}
rddOption.foreach(rdd => reportInputInfo(validTime, getOffsetRanges(rdd)))
Some(rddOption.getOrElse(context.sparkContext.emptyRDD))
}
// override this method to un-persist precomputed RDDs that are no longer needed
override private[streaming] def clearMetadata(time: Time): Unit = {
if (numReceivers > 0) {
if (preComputeOutputMap.containsKey(time)) {
preComputeOutputMap.remove(time).foreach {
case RddPreComputeRecord(rddSeqNum, rdd) =>
rdd.unpersist(false)
rddPreComputeProcessor.ackRdd(rddSeqNum)
}
}
}
super.clearMetadata(time)
}
override def start(): Unit = {
instantiateAndStartRefreshOffsetsScheduler
initializeReceiver()
instantiateAndStartRefreshOffsetsScheduler()
}
override def stop(): Unit = {
@ -328,19 +391,23 @@ class DynamicPartitionKafkaInputDStream[
override def update(time: Time) {
batchForTime.clear()
generatedRDDs.foreach { kv =>
val a = (if (cSharpFunc == null) kv._2 else kv._2.firstParent[R]).asInstanceOf[DynamicPartitionKafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
batchForTime += kv._1 -> a
generatedRDDs.foreach { case (k, v) =>
batchForTime += (k -> getOffsetRanges(v).map(_.toTuple))
}
}
override def cleanup(time: Time) { }
override def restore() {
currentOffsets = batchForTime(batchForTime.keys.max).map(o => (TopicAndPartition(o._1, o._2), o._4)).toMap
currentOffsets = batchForTime(batchForTime.keys.max).groupBy {
case (topic, partition, _, _) => (topic, partition)
}.map {
case (k, vIter) => (TopicAndPartition(k._1, k._2), vIter.maxBy(_._4)._4)
}
// this is assuming that the topics don't change during execution, which is true currently
topicAndPartitions = currentOffsets.keySet
offsetsRangeForNextBatch = None
initializeReceiver()
instantiateAndStartRefreshOffsetsScheduler()
// for unit test purpose only, it will not get here in prod if broker list is empty
val leaders = if (kafkaParams("metadata.broker.list").isEmpty)
@ -352,8 +419,9 @@ class DynamicPartitionKafkaInputDStream[
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
logInfo(s"Restoring DynamicPartitionKafkaRDD for id $id time $t ${b.mkString("[", ", ", "]")}")
generatedRDDs += t -> callCSharpTransform(new DynamicPartitionKafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler, numPartitions), t).get
val generatedRdd = callCSharpTransform(new DynamicPartitionKafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler, numPartitions), t)
generatedRDDs += t -> generatedRdd
}
}
}

Просмотреть файл

@ -39,10 +39,12 @@ private[kafka] class KafkaUtilsCSharpHelper extends KafkaUtilsPythonHelper{
kafkaParams: JMap[String, String],
topics: JSet[String],
fromOffsets: JMap[TopicAndPartition, JLong],
numPartitions: Int): JavaDStream[(Array[Byte], Array[Byte])] = {
numPartitions: Int,
cSharpFunc: Array[Byte],
serializationMode: String): JavaDStream[(Array[Byte], Array[Byte])] = {
val messageHandler =
(mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler, numPartitions))
new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler, numPartitions, cSharpFunc, serializationMode))
}
def createDirectStream[V: ClassTag](
@ -51,7 +53,9 @@ private[kafka] class KafkaUtilsCSharpHelper extends KafkaUtilsPythonHelper{
topics: JSet[String],
fromOffsets: JMap[TopicAndPartition, JLong],
messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V,
numPartitions: Int): DStream[V] = {
numPartitions: Int,
cSharpFunc: Array[Byte],
serializationMode: String): DStream[V] = {
val currentFromOffsets: Map[TopicAndPartition, Long] = if (!fromOffsets.isEmpty) {
val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic)
@ -76,6 +80,8 @@ private[kafka] class KafkaUtilsCSharpHelper extends KafkaUtilsPythonHelper{
Set(topics.asScala.toSeq: _*),
Map(currentFromOffsets.toSeq: _*),
jssc.ssc.sc.clean(messageHandler),
numPartitions)
numPartitions,
cSharpFunc,
serializationMode)
}
}

Просмотреть файл

@ -0,0 +1,43 @@
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.streaming.api.csharp
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.csharp.SparkCLRFunSuite
class RddPreComputeProcessorSuite extends SparkCLRFunSuite {
test("RddPreComputeProcessor") {
val conf = new SparkConf().setAppName("test").setMaster("local").set("spark.testing", "true")
val sc = new SparkContext(conf)
val preComputeProcessor = new RddPreComputeProcessor[Long](
sc, "RddPreComputeProcessor-test", 1, 1, 1, StorageLevel.MEMORY_ONLY)
try {
val rdd1 = sc.range(1L, 10L, 1L)
preComputeProcessor.put(rdd1)
var stop = false
while (!stop) {
var preComputedResult1 = preComputeProcessor.get()
if (preComputedResult1.isEmpty) {
Thread.sleep(100)
} else {
stop = true
assert(preComputedResult1.size == 1)
}
}
// test bypass scenario because ackRdd() is not called
val rdd2 = sc.range(1L, 5L, 1L)
preComputeProcessor.put(rdd2)
var preComputedResult2 = preComputeProcessor.get()
assert(preComputedResult2.size == 1)
} finally {
preComputeProcessor.stop()
sc.stop()
}
}
}