Merge pull request #507 from xiongrenyi/DirectStream

unifying KafkaUtils.CreateDirectStream API
This commit is contained in:
Renyi Xiong 2016-07-20 18:58:53 -07:00 коммит произвёл GitHub
Родитель 36e8fbbf2c 66f6f60571
Коммит 9dc4674e93
6 изменённых файлов: 235 добавлений и 99 удалений

Просмотреть файл

@ -10,6 +10,7 @@ using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop.Ipc;
namespace Microsoft.Spark.CSharp.Streaming
{
@ -83,6 +84,14 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <returns>A DStream object</returns>
public static DStream<KeyValuePair<byte[], byte[]>> CreateDirectStream(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets)
{
int numPartitions = GetNumPartitionsFromConfig(ssc, topics, kafkaParams);
if (numPartitions >= 0 ||
ssc.SparkContext.SparkConf.SparkConfProxy.Get("spark.mobius.streaming.kafka.CSharpReader.enabled", "false").ToLower() == "true" ||
ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.numReceivers", 0) > 0 ||
topics.Any(topic => ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.maxMessagesPerTask." + topic, 0) > 0))
{
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, null, null), ssc, SerializedMode.Pair);
}
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStream(topics, kafkaParams, fromOffsets), ssc, SerializedMode.Pair);
}
@ -109,57 +118,18 @@ namespace Microsoft.Spark.CSharp.Streaming
/// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
/// </param>
/// <param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
/// <param name="numPartitions">
/// user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions,
/// unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across
/// a probably larger number of RDD partitions
/// If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined
/// If numPartitions = 0, repartition using original kafka partition count
/// If numPartitions > 0, repartition using this parameter
/// </param>
/// <returns>A DStream object</returns>
public static DStream<KeyValuePair<byte[], byte[]>> CreateDirectStreamWithRepartition(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets, int numPartitions = -1)
{
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, null, null), ssc, SerializedMode.Pair);
}
/// <summary>
/// Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
///
/// This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
/// in each batch duration and processed without storing.
///
/// This does not use Zookeeper to store offsets. The consumed offsets are tracked
/// by the stream itself. For interoperability with Kafka monitoring tools that depend on
/// Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
/// You can access the offsets used in each batch from the generated RDDs (see
/// [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
/// To recover from driver failures, you have to enable checkpointing in the StreamingContext.
/// The information on consumed offset can be recovered from the checkpoint.
/// See the programming guide for details (constraints, etc.).
///
/// </summary>
/// <param name="ssc">Spark Streaming Context</param>
/// <param name="topics">list of topic_name to consume.</param>
/// <param name="kafkaParams">
/// Additional params for Kafka. Requires "metadata.broker.list" or "bootstrap.servers" to be set
/// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
/// </param>
/// <param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
/// <param name="numPartitions">
/// user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions,
/// unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across
/// a probably larger number of RDD partitions
/// If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined
/// If numPartitions = 0, repartition using original kafka partition count
/// If numPartitions > 0, repartition using this parameter
/// </param>
/// <param name="readFunc">user function to process the kafka data.</param>
/// <returns>A DStream object</returns>
public static DStream<T> CreateDirectStreamWithRepartitionAndReadFunc<T>(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets,
int numPartitions, Func<int, IEnumerable<KeyValuePair<byte[], byte[]>>, IEnumerable<T>> readFunc)
public static DStream<T> CreateDirectStream<T>(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets, Func<int, IEnumerable<KeyValuePair<byte[], byte[]>>, IEnumerable<T>> readFunc)
{
var mapPartitionsWithIndexHelper = new MapPartitionsWithIndexHelper<KeyValuePair<byte[], byte[]>, T>(readFunc, true);
int numPartitions = GetNumPartitionsFromConfig(ssc, topics, kafkaParams);
if (ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.numReceivers", 0) <= 0)
{
var dstream = new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, null, null), ssc, SerializedMode.Pair);
return dstream.MapPartitionsWithIndex(readFunc, true);
}
var mapPartitionsWithIndexHelper = new MapPartitionsWithIndexHelper<KeyValuePair<byte[], byte[]>, T>(readFunc, true);
var transformHelper = new TransformHelper<KeyValuePair<byte[], byte[]>, T>(mapPartitionsWithIndexHelper.Execute);
var transformDynamicHelper = new TransformDynamicHelper<KeyValuePair<byte[], byte[]>, T>(transformHelper.Execute);
Func<double, RDD<dynamic>, RDD<dynamic>> func = transformDynamicHelper.Execute;
@ -170,5 +140,105 @@ namespace Microsoft.Spark.CSharp.Streaming
string serializationMode = SerializedMode.Pair.ToString();
return new DStream<T>(ssc.streamingContextProxy.DirectKafkaStreamWithRepartition(topics, kafkaParams, fromOffsets, numPartitions, readFuncBytes, serializationMode), ssc);
}
/// <summary>
/// create offset range from kafka messages when CSharpReader is enabled
/// </summary>
/// <param name="input"></param>
/// <returns></returns>
public static OffsetRange GetOffsetRange(IEnumerable<KeyValuePair<byte[], byte[]>> input)
{
int count = 2;
int i = 0;
var offsetRange = new KeyValuePair<byte[], byte[]>[count];
foreach (var message in input)
{
offsetRange[i++ % count] = message;
if (i > count)
break;
}
if (i != count)
{
throw new ArgumentException("Expecting kafka OffsetRange metadata.");
}
var topicAndClusterId = SerDe.ToString(offsetRange[0].Key);
var topic = topicAndClusterId.Split(',')[0];
var clusterId = topicAndClusterId.Split(',')[1];
var partition = SerDe.ToInt(offsetRange[0].Value);
var fromOffset = SerDe.ReadLong(new MemoryStream(offsetRange[1].Key));
var untilOffset = SerDe.ReadLong(new MemoryStream(offsetRange[1].Value));
return new OffsetRange(topic, clusterId, partition, fromOffset, untilOffset);
}
/// <summary>
/// topics should contain only one topic if choose to repartitions to a configured numPartitions
/// TODO: move to scala and merge into DynamicPartitionKafkaRDD.getPartitions to remove above limitation
/// </summary>
/// <param name="ssc"></param>
/// <param name="topics"></param>
/// <param name="kafkaParams"></param>
/// <returns></returns>
private static int GetNumPartitionsFromConfig(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams)
{
if (topics == null || topics.Count == 0)
return -1;
string clusterId = kafkaParams.ContainsKey("cluster.id") ? "." + kafkaParams["cluster.id"] : null;
return ssc.SparkContext.SparkConf.SparkConfProxy.GetInt("spark.mobius.streaming.kafka.numPartitions." + topics[0] + clusterId, -1);
}
}
/// <summary>
/// Kafka offset range
/// </summary>
public class OffsetRange
{
private readonly string topic;
private readonly string clusterId;
private readonly int partition;
private readonly long fromOffset;
private readonly long untilOffset;
/// <summary>
/// Topic
/// </summary>
public string Topic { get { return topic; } }
/// <summary>
/// ClusterId
/// </summary>
public string ClusterId { get { return clusterId; } }
/// <summary>
/// Partition
/// </summary>
public int Partition { get { return partition; } }
/// <summary>
/// FromOffset
/// </summary>
public long FromOffset { get { return fromOffset; } }
/// <summary>
/// Until Offset
/// </summary>
public long UntilOffset { get { return untilOffset; } }
internal OffsetRange(string topic, string clusterId, int partition, long fromOffset, long untilOffset)
{
this.topic = topic;
this.clusterId = clusterId;
this.partition = partition;
this.fromOffset = fromOffset;
this.untilOffset = untilOffset;
}
/// <summary>
/// OffsetRange string format
/// </summary>
/// <returns></returns>
public override string ToString()
{
return string.Format("Kafka OffsetRange: topic {0} cluster {1} partition {2} from {3} until {4}", topic, clusterId, partition, fromOffset, untilOffset);
}
}
}

Просмотреть файл

@ -7455,7 +7455,7 @@
<param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
<returns>A DStream object</returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.CreateDirectStreamWithRepartition(Microsoft.Spark.CSharp.Streaming.StreamingContext,System.Collections.Generic.List{System.String},System.Collections.Generic.Dictionary{System.String,System.String},System.Collections.Generic.Dictionary{System.String,System.Int64},System.Int32)">
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.CreateDirectStream``1(Microsoft.Spark.CSharp.Streaming.StreamingContext,System.Collections.Generic.List{System.String},System.Collections.Generic.Dictionary{System.String,System.String},System.Collections.Generic.Dictionary{System.String,System.Int64},System.Func{System.Int32,System.Collections.Generic.IEnumerable{System.Collections.Generic.KeyValuePair{System.Byte[],System.Byte[]}},System.Collections.Generic.IEnumerable{``0}})">
<summary>
Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
@ -7479,51 +7479,62 @@
with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
</param>
<param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
<param name="numPartitions">
user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions,
unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across
a probably larger number of RDD partitions
If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined
If numPartitions = 0, repartition using original kafka partition count
If numPartitions > 0, repartition using this parameter
</param>
<returns>A DStream object</returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.CreateDirectStreamWithRepartitionAndReadFunc``1(Microsoft.Spark.CSharp.Streaming.StreamingContext,System.Collections.Generic.List{System.String},System.Collections.Generic.Dictionary{System.String,System.String},System.Collections.Generic.Dictionary{System.String,System.Int64},System.Int32,System.Func{System.Int32,System.Collections.Generic.IEnumerable{System.Collections.Generic.KeyValuePair{System.Byte[],System.Byte[]}},System.Collections.Generic.IEnumerable{``0}})">
<summary>
Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
in each batch duration and processed without storing.
This does not use Zookeeper to store offsets. The consumed offsets are tracked
by the stream itself. For interoperability with Kafka monitoring tools that depend on
Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
You can access the offsets used in each batch from the generated RDDs (see
[[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
To recover from driver failures, you have to enable checkpointing in the StreamingContext.
The information on consumed offset can be recovered from the checkpoint.
See the programming guide for details (constraints, etc.).
</summary>
<param name="ssc">Spark Streaming Context</param>
<param name="topics">list of topic_name to consume.</param>
<param name="kafkaParams">
Additional params for Kafka. Requires "metadata.broker.list" or "bootstrap.servers" to be set
with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
</param>
<param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
<param name="numPartitions">
user hint on how many kafka RDD partitions to create instead of aligning with kafka partitions,
unbalanced kafka partitions and/or under-distributed data will be redistributed evenly across
a probably larger number of RDD partitions
If numPartitions = -1, either repartition based on spark.streaming.kafka.maxRatePerTask or do nothing if config not defined
If numPartitions = 0, repartition using original kafka partition count
If numPartitions > 0, repartition using this parameter
</param>
<param name="readFunc">user function to process the kafka data.</param>
<returns>A DStream object</returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.GetOffsetRange(System.Collections.Generic.IEnumerable{System.Collections.Generic.KeyValuePair{System.Byte[],System.Byte[]}})">
<summary>
create offset range from kafka messages when CSharpReader is enabled
</summary>
<param name="input"></param>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.GetNumPartitionsFromConfig(Microsoft.Spark.CSharp.Streaming.StreamingContext,System.Collections.Generic.List{System.String},System.Collections.Generic.Dictionary{System.String,System.String})">
<summary>
topics should contain only one topic if choose to repartitions to a configured numPartitions
TODO: move to scala and merge into DynamicPartitionKafkaRDD.getPartitions to remove above limitation
</summary>
<param name="ssc"></param>
<param name="topics"></param>
<param name="kafkaParams"></param>
<returns></returns>
</member>
<member name="T:Microsoft.Spark.CSharp.Streaming.OffsetRange">
<summary>
Kafka offset range
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Streaming.OffsetRange.Topic">
<summary>
Topic
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Streaming.OffsetRange.ClusterId">
<summary>
ClusterId
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Streaming.OffsetRange.Partition">
<summary>
Partition
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Streaming.OffsetRange.FromOffset">
<summary>
FromOffset
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Streaming.OffsetRange.UntilOffset">
<summary>
Until Offset
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.OffsetRange.ToString">
<summary>
OffsetRange string format
</summary>
<returns></returns>
</member>
<member name="T:Microsoft.Spark.CSharp.Streaming.MapWithStateDStream`4">
<summary>
DStream representing the stream of data generated by `mapWithState` operation on a pair DStream.

Просмотреть файл

@ -985,7 +985,21 @@
####Methods
<table><tr><th>Name</th><th>Description</th></tr><tr><td><font color="blue">CreateStream</font></td><td>Create an input stream that pulls messages from a Kafka Broker.</td></tr><tr><td><font color="blue">CreateStream</font></td><td>Create an input stream that pulls messages from a Kafka Broker.</td></tr><tr><td><font color="blue">CreateDirectStream</font></td><td>Create an input stream that directly pulls messages from a Kafka Broker and specific offset. This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing. This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).</td></tr><tr><td><font color="blue">CreateDirectStreamWithRepartition</font></td><td>Create an input stream that directly pulls messages from a Kafka Broker and specific offset. This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing. This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).</td></tr><tr><td><font color="blue">CreateDirectStreamWithRepartitionAndReadFunc``1</font></td><td>Create an input stream that directly pulls messages from a Kafka Broker and specific offset. This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing. This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).</td></tr></table>
<table><tr><th>Name</th><th>Description</th></tr><tr><td><font color="blue">CreateStream</font></td><td>Create an input stream that pulls messages from a Kafka Broker.</td></tr><tr><td><font color="blue">CreateStream</font></td><td>Create an input stream that pulls messages from a Kafka Broker.</td></tr><tr><td><font color="blue">CreateDirectStream</font></td><td>Create an input stream that directly pulls messages from a Kafka Broker and specific offset. This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing. This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).</td></tr><tr><td><font color="blue">CreateDirectStream``1</font></td><td>Create an input stream that directly pulls messages from a Kafka Broker and specific offset. This is not a receiver based Kafka input stream, it directly pulls the message from Kafka in each batch duration and processed without storing. This does not use Zookeeper to store offsets. The consumed offsets are tracked by the stream itself. For interoperability with Kafka monitoring tools that depend on Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. You can access the offsets used in each batch from the generated RDDs (see [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). To recover from driver failures, you have to enable checkpointing in the StreamingContext. The information on consumed offset can be recovered from the checkpoint. See the programming guide for details (constraints, etc.).</td></tr><tr><td><font color="blue">GetOffsetRange</font></td><td>create offset range from kafka messages when CSharpReader is enabled</td></tr><tr><td><font color="blue">GetNumPartitionsFromConfig</font></td><td>topics should contain only one topic if choose to repartitions to a configured numPartitions TODO: move to scala and merge into DynamicPartitionKafkaRDD.getPartitions to remove above limitation</td></tr></table>
---
###<font color="#68228B">Microsoft.Spark.CSharp.Streaming.OffsetRange</font>
####Summary
Kafka offset range
####Methods
<table><tr><th>Name</th><th>Description</th></tr><tr><td><font color="blue">ToString</font></td><td>OffsetRange string format</td></tr></table>
---

Просмотреть файл

@ -36,6 +36,11 @@ namespace AdapterTest.Mocks
public void Set(string key, string value)
{
stringConfDictionary[key] = value;
int i;
if (int.TryParse(value, out i))
{
intConfDictionary[key] = i;
}
}
public int GetInt(string key, int defaultValue)

Просмотреть файл

@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Text;
using AdapterTest.Mocks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Streaming;
@ -37,16 +38,26 @@ namespace AdapterTest
var directKafkaStream = KafkaUtils.CreateDirectStream(ssc, new List<string> { "testTopic2" }, new Dictionary<string, string>(), new Dictionary<string, long>());
Assert.IsNotNull(directKafkaStream.DStreamProxy);
var directKafkaStreamWithRepartition = KafkaUtils.CreateDirectStreamWithRepartition(ssc, new List<string> { "testTopic3" }, new Dictionary<string, string>(), new Dictionary<string, long>(), 10);
ssc.SparkContext.SparkConf.Set("spark.mobius.streaming.kafka.numPartitions.testTopic3", "10");
var directKafkaStreamWithRepartition = KafkaUtils.CreateDirectStream(ssc, new List<string> { "testTopic3" }, new Dictionary<string, string>(), new Dictionary<string, long>());
Assert.IsNotNull(directKafkaStreamWithRepartition.DStreamProxy);
var directKafkaStreamWithRepartitionAndReadFunc = KafkaUtils.CreateDirectStreamWithRepartitionAndReadFunc(
var directKafkaStreamWithRepartitionAndReadFunc = KafkaUtils.CreateDirectStream(
ssc,
new List<string> { "testTopic3" },
new Dictionary<string, string>(), new Dictionary<string, long>(),
10,
(int pid, IEnumerable<KeyValuePair<byte[], byte[]>> input) => { return input;});
Assert.IsNotNull(directKafkaStreamWithRepartitionAndReadFunc.DStreamProxy);
(int pid, IEnumerable<KeyValuePair<byte[], byte[]>> input) => { return input; });
Assert.IsNotNull(directKafkaStreamWithRepartitionAndReadFunc);
ssc.SparkContext.SparkConf.Set("spark.mobius.streaming.kafka.numReceivers", "10");
var directKafkaReceiver = KafkaUtils.CreateDirectStream(
ssc,
new List<string> { "testTopic3" },
new Dictionary<string, string>(), new Dictionary<string, long>(),
(int pid, IEnumerable<KeyValuePair<byte[], byte[]>> input) => { return input; });
Assert.IsNotNull(directKafkaReceiver.DStreamProxy);
var union = ssc.Union(textFile, socketStream);
Assert.IsNotNull(union.DStreamProxy);
@ -77,5 +88,28 @@ namespace AdapterTest
ssc.AwaitTerminationOrTimeout(3000);
ssc.Stop();
}
[Test]
public void TestStreamingOffsetRange()
{
byte[] partition = BitConverter.GetBytes(1);
Array.Reverse(partition);
byte[] fromOffset = BitConverter.GetBytes(2L);
Array.Reverse(fromOffset);
byte[] untilOffset = BitConverter.GetBytes(3L);
Array.Reverse(untilOffset);
var offsetRange = KafkaUtils.GetOffsetRange(new List<KeyValuePair<byte[], byte[]>>
{
new KeyValuePair<byte[], byte[]>(Encoding.UTF8.GetBytes("testTopic,testClusterId"), partition),
new KeyValuePair<byte[], byte[]>(fromOffset, untilOffset)
});
Assert.AreEqual(offsetRange.Topic, "testTopic");
Assert.AreEqual(offsetRange.ClusterId, "testClusterId");
Assert.AreEqual(offsetRange.Partition, 1);
Assert.AreEqual(offsetRange.FromOffset, 2);
Assert.AreEqual(offsetRange.UntilOffset, 3);
}
}
}

Просмотреть файл

@ -140,7 +140,8 @@ namespace Microsoft.Spark.CSharp
StreamingContext ssc = StreamingContext.GetOrCreate(checkpointPath,
() =>
{
SparkContext sc = SparkCLRSamples.SparkContext;
var conf = new SparkConf();
SparkContext sc = new SparkContext(conf);
StreamingContext context = new StreamingContext(sc, 2000L);
context.Checkpoint(checkpointPath);
@ -149,7 +150,8 @@ namespace Microsoft.Spark.CSharp
{"auto.offset.reset", "smallest"}
};
var dstream = KafkaUtils.CreateDirectStreamWithRepartition(context, new List<string> { topic }, kafkaParams, new Dictionary<string, long>(), partitions);
conf.Set("spark.mobius.streaming.kafka.numPartitions." + topic, partitions.ToString());
var dstream = KafkaUtils.CreateDirectStream(context, new List<string> { topic }, kafkaParams, new Dictionary<string, long>());
dstream.ForeachRDD((time, rdd) =>
{