Merge pull request #201 from xiongrenyi/master
add an overload of UpdateStateByKey API and move kafka DStream to KafkaUtils class
This commit is contained in:
Коммит
aa798fe3be
|
@ -128,6 +128,7 @@
|
|||
<Compile Include="Sql\Types.cs" />
|
||||
<Compile Include="Sql\UserDefinedFunction.cs" />
|
||||
<Compile Include="Streaming\DStream.cs" />
|
||||
<Compile Include="Streaming\Kafka.cs" />
|
||||
<Compile Include="Streaming\PairDStreamFunctions.cs" />
|
||||
<Compile Include="Streaming\StreamingContext.cs" />
|
||||
<Compile Include="Streaming\TransformedDStream.cs" />
|
||||
|
|
|
@ -7,10 +7,75 @@ using System.Linq;
|
|||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Streaming
|
||||
{
|
||||
//TODO - complete the impl
|
||||
public class Kafka
|
||||
public class KafkaUtils
|
||||
{
|
||||
/// <summary>
|
||||
/// Create an input stream that pulls messages from a Kafka Broker.
|
||||
/// </summary>
|
||||
/// <param name="zkQuorum">Zookeeper quorum (hostname:port,hostname:port,..).</param>
|
||||
/// <param name="groupId">The group id for this consumer.</param>
|
||||
/// <param name="topics">Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.</param>
|
||||
/// <param name="kafkaParams">Additional params for Kafka</param>
|
||||
/// <returns>A DStream object</returns>
|
||||
public static DStream<KeyValuePair<byte[], byte[]>> CreateStream(StreamingContext ssc, string zkQuorum, string groupId, Dictionary<string, int> topics, Dictionary<string, string> kafkaParams)
|
||||
{
|
||||
return CreateStream(ssc, zkQuorum, groupId, topics, kafkaParams, StorageLevelType.MEMORY_AND_DISK_SER_2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create an input stream that pulls messages from a Kafka Broker.
|
||||
/// </summary>
|
||||
/// <param name="zkQuorum">Zookeeper quorum (hostname:port,hostname:port,..).</param>
|
||||
/// <param name="groupId">The group id for this consumer.</param>
|
||||
/// <param name="topics">Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.</param>
|
||||
/// <param name="kafkaParams">Additional params for Kafka</param>
|
||||
/// <param name="storageLevelType">RDD storage level.</param>
|
||||
/// <returns>A DStream object</returns>
|
||||
public static DStream<KeyValuePair<byte[], byte[]>> CreateStream(StreamingContext ssc, string zkQuorum, string groupId, Dictionary<string, int> topics, Dictionary<string, string> kafkaParams, StorageLevelType storageLevelType)
|
||||
{
|
||||
if (kafkaParams == null)
|
||||
kafkaParams = new Dictionary<string, string>();
|
||||
|
||||
if (!string.IsNullOrEmpty(zkQuorum))
|
||||
kafkaParams["zookeeper.connect"] = zkQuorum;
|
||||
if (groupId != null)
|
||||
kafkaParams["group.id"] = groupId;
|
||||
if (kafkaParams.ContainsKey("zookeeper.connection.timeout.ms"))
|
||||
kafkaParams["zookeeper.connection.timeout.ms"] = "10000";
|
||||
|
||||
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.KafkaStream(topics, kafkaParams, storageLevelType), ssc);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
|
||||
///
|
||||
/// This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
|
||||
/// in each batch duration and processed without storing.
|
||||
///
|
||||
/// This does not use Zookeeper to store offsets. The consumed offsets are tracked
|
||||
/// by the stream itself. For interoperability with Kafka monitoring tools that depend on
|
||||
/// Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
|
||||
/// You can access the offsets used in each batch from the generated RDDs (see
|
||||
/// [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
|
||||
/// To recover from driver failures, you have to enable checkpointing in the StreamingContext.
|
||||
/// The information on consumed offset can be recovered from the checkpoint.
|
||||
/// See the programming guide for details (constraints, etc.).
|
||||
///
|
||||
/// </summary>
|
||||
/// <param name="topics">list of topic_name to consume.</param>
|
||||
/// <param name="kafkaParams">
|
||||
/// Additional params for Kafka. Requires "metadata.broker.list" or "bootstrap.servers" to be set
|
||||
/// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
|
||||
/// </param>
|
||||
/// <param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
|
||||
/// <returns>A DStream object</returns>
|
||||
public static DStream<KeyValuePair<byte[], byte[]>> CreateDirectStream(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets)
|
||||
{
|
||||
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStream(topics, kafkaParams, fromOffsets), ssc, SerializedMode.Pair);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -315,19 +315,37 @@ namespace Microsoft.Spark.CSharp.Streaming
|
|||
/// <typeparam name="V"></typeparam>
|
||||
/// <typeparam name="S"></typeparam>
|
||||
/// <param name="self"></param>
|
||||
/// <param name="updateFunc">State update function. If this function returns None, then corresponding state key-value pair will be eliminated.</param>
|
||||
/// <param name="updateFunc"></param>
|
||||
/// <param name="numPartitions"></param>
|
||||
/// <returns></returns>
|
||||
public static DStream<KeyValuePair<K, S>> UpdateStateByKey<K, V, S>(this DStream<KeyValuePair<K, V>> self,
|
||||
Func<IEnumerable<V>, S, S> updateFunc,
|
||||
int numPartitions = 0)
|
||||
{
|
||||
return UpdateStateByKey<K, V, S>(self, new UpdateStateByKeyHelper<K, V, S>(updateFunc).Execute, numPartitions);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Return a new "state" DStream where the state for each key is updated by applying
|
||||
/// the given function on the previous state of the key and the new values of the key.
|
||||
/// </summary>
|
||||
/// <typeparam name="K"></typeparam>
|
||||
/// <typeparam name="V"></typeparam>
|
||||
/// <typeparam name="S"></typeparam>
|
||||
/// <param name="self"></param>
|
||||
/// <param name="updateFunc">State update function. If this function returns None, then corresponding state key-value pair will be eliminated.</param>
|
||||
/// <param name="numPartitions"></param>
|
||||
/// <returns></returns>
|
||||
public static DStream<KeyValuePair<K, S>> UpdateStateByKey<K, V, S>(this DStream<KeyValuePair<K, V>> self,
|
||||
Func<IEnumerable<KeyValuePair<K, Tuple<IEnumerable<V>, S>>>, IEnumerable<KeyValuePair<K, S>>> updateFunc,
|
||||
int numPartitions = 0)
|
||||
{
|
||||
if (numPartitions <= 0)
|
||||
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
|
||||
|
||||
Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc = self.Piplinable ? (self as TransformedDStream<KeyValuePair<K, V>>).func : null;
|
||||
|
||||
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> func = new UpdateStateByKeyHelper<K, V, S>(updateFunc, prevFunc, numPartitions).Execute;
|
||||
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> func = new UpdateStateByKeysHelper<K, V, S>(updateFunc, prevFunc, numPartitions).Execute;
|
||||
|
||||
var formatter = new BinaryFormatter();
|
||||
var stream = new MemoryStream();
|
||||
|
@ -565,9 +583,27 @@ namespace Microsoft.Spark.CSharp.Streaming
|
|||
internal class UpdateStateByKeyHelper<K, V, S>
|
||||
{
|
||||
private readonly Func<IEnumerable<V>, S, S> func;
|
||||
|
||||
internal UpdateStateByKeyHelper(Func<IEnumerable<V>, S, S> f)
|
||||
{
|
||||
func = f;
|
||||
}
|
||||
|
||||
internal IEnumerable<KeyValuePair<K, S>> Execute(IEnumerable<KeyValuePair<K, Tuple<IEnumerable<V>, S>>> input)
|
||||
{
|
||||
return input.Select(x => new KeyValuePair<K, S>(x.Key, func(x.Value.Item1, x.Value.Item2)));
|
||||
}
|
||||
}
|
||||
|
||||
[Serializable]
|
||||
internal class UpdateStateByKeysHelper<K, V, S>
|
||||
{
|
||||
private readonly Func<IEnumerable<KeyValuePair<K, Tuple<IEnumerable<V>, S>>>, IEnumerable<KeyValuePair<K, S>>> func;
|
||||
private readonly Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc;
|
||||
private readonly int numPartitions;
|
||||
internal UpdateStateByKeyHelper(Func<IEnumerable<V>, S, S> f, Func<double, RDD<dynamic>, RDD<dynamic>> prevF, int numPartitions)
|
||||
internal UpdateStateByKeysHelper(
|
||||
Func<IEnumerable<KeyValuePair<K, Tuple<IEnumerable<V>, S>>>, IEnumerable<KeyValuePair<K, S>>> f,
|
||||
Func<double, RDD<dynamic>, RDD<dynamic>> prevF, int numPartitions)
|
||||
{
|
||||
func = f;
|
||||
prevFunc = prevF;
|
||||
|
@ -577,7 +613,7 @@ namespace Microsoft.Spark.CSharp.Streaming
|
|||
internal RDD<dynamic> Execute(double t, RDD<dynamic> stateRDD, RDD<dynamic> valuesRDD)
|
||||
{
|
||||
RDD<KeyValuePair<K, S>> state = null;
|
||||
RDD<KeyValuePair<K, Tuple<List<V>, S>>> g = null;
|
||||
RDD<KeyValuePair<K, Tuple<IEnumerable<V>, S>>> g = null;
|
||||
|
||||
if (prevFunc != null)
|
||||
valuesRDD = prevFunc(t, valuesRDD);
|
||||
|
@ -586,17 +622,17 @@ namespace Microsoft.Spark.CSharp.Streaming
|
|||
|
||||
if (stateRDD == null)
|
||||
{
|
||||
g = values.GroupByKey(numPartitions).MapValues(x => new Tuple<List<V>, S>(new List<V>(x), default(S)));
|
||||
g = values.GroupByKey(numPartitions).MapValues(x => new Tuple<IEnumerable<V>, S>(new List<V>(x), default(S)));
|
||||
}
|
||||
else
|
||||
{
|
||||
state = stateRDD.ConvertTo<KeyValuePair<K, S>>();
|
||||
values = values.PartitionBy(numPartitions);
|
||||
state.partitioner = values.partitioner;
|
||||
g = state.GroupWith(values, numPartitions).MapValues(x => new Tuple<List<V>, S>(new List<V>(x.Item2), x.Item1.Count > 0 ? x.Item1[0] : default(S)));
|
||||
g = state.GroupWith(values, numPartitions).MapValues(x => new Tuple<IEnumerable<V>, S>(new List<V>(x.Item2), x.Item1.Count > 0 ? x.Item1[0] : default(S)));
|
||||
}
|
||||
|
||||
state = g.MapValues(x => func(x.Item1, x.Item2)).Filter(x => x.Value != null);
|
||||
state = g.MapPartitionsWithIndex((pid, iter) => func(iter), true).Filter(x => x.Value != null);
|
||||
|
||||
return state.ConvertTo<dynamic>();
|
||||
}
|
||||
|
|
|
@ -141,71 +141,6 @@ namespace Microsoft.Spark.CSharp.Streaming
|
|||
return new DStream<string>(streamingContextProxy.TextFileStream(directory), this, SerializedMode.String);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create an input stream that pulls messages from a Kafka Broker.
|
||||
/// </summary>
|
||||
/// <param name="zkQuorum">Zookeeper quorum (hostname:port,hostname:port,..).</param>
|
||||
/// <param name="groupId">The group id for this consumer.</param>
|
||||
/// <param name="topics">Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.</param>
|
||||
/// <param name="kafkaParams">Additional params for Kafka</param>
|
||||
/// <returns>A DStream object</returns>
|
||||
public DStream<KeyValuePair<byte[], byte[]>> KafkaStream(string zkQuorum, string groupId, Dictionary<string, int> topics, Dictionary<string, string> kafkaParams)
|
||||
{
|
||||
return this.KafkaStream(zkQuorum, groupId, topics, kafkaParams, StorageLevelType.MEMORY_AND_DISK_SER_2);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create an input stream that pulls messages from a Kafka Broker.
|
||||
/// </summary>
|
||||
/// <param name="zkQuorum">Zookeeper quorum (hostname:port,hostname:port,..).</param>
|
||||
/// <param name="groupId">The group id for this consumer.</param>
|
||||
/// <param name="topics">Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.</param>
|
||||
/// <param name="kafkaParams">Additional params for Kafka</param>
|
||||
/// <param name="storageLevelType">RDD storage level.</param>
|
||||
/// <returns>A DStream object</returns>
|
||||
public DStream<KeyValuePair<byte[], byte[]>> KafkaStream(string zkQuorum, string groupId, Dictionary<string, int> topics, Dictionary<string, string> kafkaParams, StorageLevelType storageLevelType)
|
||||
{
|
||||
if (kafkaParams == null)
|
||||
kafkaParams = new Dictionary<string, string>();
|
||||
|
||||
if (!string.IsNullOrEmpty(zkQuorum))
|
||||
kafkaParams["zookeeper.connect"] = zkQuorum;
|
||||
if (groupId != null)
|
||||
kafkaParams["group.id"] = groupId;
|
||||
if (kafkaParams.ContainsKey("zookeeper.connection.timeout.ms"))
|
||||
kafkaParams["zookeeper.connection.timeout.ms"] = "10000";
|
||||
|
||||
return new DStream<KeyValuePair<byte[], byte[]>>(this.streamingContextProxy.KafkaStream(topics, kafkaParams, storageLevelType), this);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
|
||||
///
|
||||
/// This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
|
||||
/// in each batch duration and processed without storing.
|
||||
///
|
||||
/// This does not use Zookeeper to store offsets. The consumed offsets are tracked
|
||||
/// by the stream itself. For interoperability with Kafka monitoring tools that depend on
|
||||
/// Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
|
||||
/// You can access the offsets used in each batch from the generated RDDs (see
|
||||
///
|
||||
/// To recover from driver failures, you have to enable checkpointing in the StreamingContext.
|
||||
/// The information on consumed offset can be recovered from the checkpoint.
|
||||
/// See the programming guide for details (constraints, etc.).
|
||||
///
|
||||
/// </summary>
|
||||
/// <param name="topics">list of topic_name to consume.</param>
|
||||
/// <param name="kafkaParams">
|
||||
/// Additional params for Kafka. Requires "metadata.broker.list" or "bootstrap.servers" to be set
|
||||
/// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
|
||||
/// </param>
|
||||
/// <param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
|
||||
/// <returns>A DStream object</returns>
|
||||
public DStream<KeyValuePair<byte[], byte[]>> DirectKafkaStream(List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets)
|
||||
{
|
||||
return new DStream<KeyValuePair<byte[], byte[]>>(this.streamingContextProxy.DirectKafkaStream(topics, kafkaParams, fromOffsets), this, SerializedMode.Pair);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Wait for the execution to stop.
|
||||
/// </summary>
|
||||
|
|
|
@ -31,10 +31,10 @@ namespace AdapterTest
|
|||
var socketStream = ssc.SocketTextStream(IPAddress.Loopback.ToString(), 12345);
|
||||
Assert.IsNotNull(socketStream.DStreamProxy);
|
||||
|
||||
var kafkaStream = ssc.KafkaStream(IPAddress.Loopback + ":2181", "testGroupId", new Dictionary<string, int> { { "testTopic1", 1 } }, new Dictionary<string, string>());
|
||||
var kafkaStream = KafkaUtils.CreateStream(ssc, IPAddress.Loopback + ":2181", "testGroupId", new Dictionary<string, int> { { "testTopic1", 1 } }, new Dictionary<string, string>());
|
||||
Assert.IsNotNull(kafkaStream.DStreamProxy);
|
||||
|
||||
var directKafkaStream = ssc.DirectKafkaStream(new List<string> { "testTopic2" }, new Dictionary<string, string>(), new Dictionary<string, long>());
|
||||
var directKafkaStream = KafkaUtils.CreateDirectStream(ssc, new List<string> { "testTopic2" }, new Dictionary<string, string>(), new Dictionary<string, long>());
|
||||
Assert.IsNotNull(directKafkaStream.DStreamProxy);
|
||||
|
||||
var union = ssc.Union(textFile, socketStream);
|
||||
|
|
Загрузка…
Ссылка в новой задаче