add an overload of UpdateStateByKey API and move kafka DStream to KafkaUtil class

This commit is contained in:
renyi 2016-01-04 10:31:40 -08:00
Родитель 1b02544765
Коммит 669f6773f3
5 изменённых файлов: 118 добавлений и 77 удалений

Просмотреть файл

@ -128,6 +128,7 @@
<Compile Include="Sql\Types.cs" />
<Compile Include="Sql\UserDefinedFunction.cs" />
<Compile Include="Streaming\DStream.cs" />
<Compile Include="Streaming\Kafka.cs" />
<Compile Include="Streaming\PairDStreamFunctions.cs" />
<Compile Include="Streaming\StreamingContext.cs" />
<Compile Include="Streaming\TransformedDStream.cs" />

Просмотреть файл

@ -7,10 +7,75 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
namespace Microsoft.Spark.CSharp.Streaming
{
//TODO - complete the impl
public class Kafka
public class KafkaUtils
{
/// <summary>
/// Create an input stream that pulls messages from a Kafka Broker.
/// </summary>
/// <param name="zkQuorum">Zookeeper quorum (hostname:port,hostname:port,..).</param>
/// <param name="groupId">The group id for this consumer.</param>
/// <param name="topics">Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.</param>
/// <param name="kafkaParams">Additional params for Kafka</param>
/// <returns>A DStream object</returns>
public static DStream<KeyValuePair<byte[], byte[]>> CreateStream(StreamingContext ssc, string zkQuorum, string groupId, Dictionary<string, int> topics, Dictionary<string, string> kafkaParams)
{
return CreateStream(ssc, zkQuorum, groupId, topics, kafkaParams, StorageLevelType.MEMORY_AND_DISK_SER_2);
}
/// <summary>
/// Create an input stream that pulls messages from a Kafka Broker.
/// </summary>
/// <param name="zkQuorum">Zookeeper quorum (hostname:port,hostname:port,..).</param>
/// <param name="groupId">The group id for this consumer.</param>
/// <param name="topics">Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.</param>
/// <param name="kafkaParams">Additional params for Kafka</param>
/// <param name="storageLevelType">RDD storage level.</param>
/// <returns>A DStream object</returns>
public static DStream<KeyValuePair<byte[], byte[]>> CreateStream(StreamingContext ssc, string zkQuorum, string groupId, Dictionary<string, int> topics, Dictionary<string, string> kafkaParams, StorageLevelType storageLevelType)
{
if (kafkaParams == null)
kafkaParams = new Dictionary<string, string>();
if (!string.IsNullOrEmpty(zkQuorum))
kafkaParams["zookeeper.connect"] = zkQuorum;
if (groupId != null)
kafkaParams["group.id"] = groupId;
if (kafkaParams.ContainsKey("zookeeper.connection.timeout.ms"))
kafkaParams["zookeeper.connection.timeout.ms"] = "10000";
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.KafkaStream(topics, kafkaParams, storageLevelType), ssc);
}
/// <summary>
/// Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
///
/// This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
/// in each batch duration and processed without storing.
///
/// This does not use Zookeeper to store offsets. The consumed offsets are tracked
/// by the stream itself. For interoperability with Kafka monitoring tools that depend on
/// Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
/// You can access the offsets used in each batch from the generated RDDs (see
/// [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
/// To recover from driver failures, you have to enable checkpointing in the StreamingContext.
/// The information on consumed offset can be recovered from the checkpoint.
/// See the programming guide for details (constraints, etc.).
///
/// </summary>
/// <param name="topics">list of topic_name to consume.</param>
/// <param name="kafkaParams">
/// Additional params for Kafka. Requires "metadata.broker.list" or "bootstrap.servers" to be set
/// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
/// </param>
/// <param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
/// <returns>A DStream object</returns>
public static DStream<KeyValuePair<byte[], byte[]>> CreateDirectStream(StreamingContext ssc, List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets)
{
return new DStream<KeyValuePair<byte[], byte[]>>(ssc.streamingContextProxy.DirectKafkaStream(topics, kafkaParams, fromOffsets), ssc, SerializedMode.Pair);
}
}
}

Просмотреть файл

@ -315,19 +315,37 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <typeparam name="V"></typeparam>
/// <typeparam name="S"></typeparam>
/// <param name="self"></param>
/// <param name="updateFunc">State update function. If this function returns None, then corresponding state key-value pair will be eliminated.</param>
/// <param name="updateFunc"></param>
/// <param name="numPartitions"></param>
/// <returns></returns>
public static DStream<KeyValuePair<K, S>> UpdateStateByKey<K, V, S>(this DStream<KeyValuePair<K, V>> self,
Func<IEnumerable<V>, S, S> updateFunc,
int numPartitions = 0)
{
return UpdateStateByKey<K, V, S>(self, new UpdateStateByKeyHelper<K, V, S>(updateFunc).Execute, numPartitions);
}
/// <summary>
/// Return a new "state" DStream where the state for each key is updated by applying
/// the given function on the previous state of the key and the new values of the key.
/// </summary>
/// <typeparam name="K"></typeparam>
/// <typeparam name="V"></typeparam>
/// <typeparam name="S"></typeparam>
/// <param name="self"></param>
/// <param name="updateFunc">State update function. If this function returns None, then corresponding state key-value pair will be eliminated.</param>
/// <param name="numPartitions"></param>
/// <returns></returns>
public static DStream<KeyValuePair<K, S>> UpdateStateByKey<K, V, S>(this DStream<KeyValuePair<K, V>> self,
Func<IEnumerable<KeyValuePair<K, Tuple<IEnumerable<V>, S>>>, IEnumerable<KeyValuePair<K, S>>> updateFunc,
int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc = self.Piplinable ? (self as TransformedDStream<KeyValuePair<K, V>>).func : null;
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> func = new UpdateStateByKeyHelper<K, V, S>(updateFunc, prevFunc, numPartitions).Execute;
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> func = new UpdateStateByKeysHelper<K, V, S>(updateFunc, prevFunc, numPartitions).Execute;
var formatter = new BinaryFormatter();
var stream = new MemoryStream();
@ -565,9 +583,27 @@ namespace Microsoft.Spark.CSharp.Streaming
internal class UpdateStateByKeyHelper<K, V, S>
{
private readonly Func<IEnumerable<V>, S, S> func;
internal UpdateStateByKeyHelper(Func<IEnumerable<V>, S, S> f)
{
func = f;
}
internal IEnumerable<KeyValuePair<K, S>> Execute(IEnumerable<KeyValuePair<K, Tuple<IEnumerable<V>, S>>> input)
{
return input.Select(x => new KeyValuePair<K, S>(x.Key, func(x.Value.Item1, x.Value.Item2)));
}
}
[Serializable]
internal class UpdateStateByKeysHelper<K, V, S>
{
private readonly Func<IEnumerable<KeyValuePair<K, Tuple<IEnumerable<V>, S>>>, IEnumerable<KeyValuePair<K, S>>> func;
private readonly Func<double, RDD<dynamic>, RDD<dynamic>> prevFunc;
private readonly int numPartitions;
internal UpdateStateByKeyHelper(Func<IEnumerable<V>, S, S> f, Func<double, RDD<dynamic>, RDD<dynamic>> prevF, int numPartitions)
internal UpdateStateByKeysHelper(
Func<IEnumerable<KeyValuePair<K, Tuple<IEnumerable<V>, S>>>, IEnumerable<KeyValuePair<K, S>>> f,
Func<double, RDD<dynamic>, RDD<dynamic>> prevF, int numPartitions)
{
func = f;
prevFunc = prevF;
@ -577,7 +613,7 @@ namespace Microsoft.Spark.CSharp.Streaming
internal RDD<dynamic> Execute(double t, RDD<dynamic> stateRDD, RDD<dynamic> valuesRDD)
{
RDD<KeyValuePair<K, S>> state = null;
RDD<KeyValuePair<K, Tuple<List<V>, S>>> g = null;
RDD<KeyValuePair<K, Tuple<IEnumerable<V>, S>>> g = null;
if (prevFunc != null)
valuesRDD = prevFunc(t, valuesRDD);
@ -586,17 +622,17 @@ namespace Microsoft.Spark.CSharp.Streaming
if (stateRDD == null)
{
g = values.GroupByKey(numPartitions).MapValues(x => new Tuple<List<V>, S>(new List<V>(x), default(S)));
g = values.GroupByKey(numPartitions).MapValues(x => new Tuple<IEnumerable<V>, S>(new List<V>(x), default(S)));
}
else
{
state = stateRDD.ConvertTo<KeyValuePair<K, S>>();
values = values.PartitionBy(numPartitions);
state.partitioner = values.partitioner;
g = state.GroupWith(values, numPartitions).MapValues(x => new Tuple<List<V>, S>(new List<V>(x.Item2), x.Item1.Count > 0 ? x.Item1[0] : default(S)));
g = state.GroupWith(values, numPartitions).MapValues(x => new Tuple<IEnumerable<V>, S>(new List<V>(x.Item2), x.Item1.Count > 0 ? x.Item1[0] : default(S)));
}
state = g.MapValues(x => func(x.Item1, x.Item2)).Filter(x => x.Value != null);
state = g.MapPartitionsWithIndex((pid, iter) => func(iter), true).Filter(x => x.Value != null);
return state.ConvertTo<dynamic>();
}

Просмотреть файл

@ -141,71 +141,6 @@ namespace Microsoft.Spark.CSharp.Streaming
return new DStream<string>(streamingContextProxy.TextFileStream(directory), this, SerializedMode.String);
}
/// <summary>
/// Create an input stream that pulls messages from a Kafka Broker.
/// </summary>
/// <param name="zkQuorum">Zookeeper quorum (hostname:port,hostname:port,..).</param>
/// <param name="groupId">The group id for this consumer.</param>
/// <param name="topics">Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.</param>
/// <param name="kafkaParams">Additional params for Kafka</param>
/// <returns>A DStream object</returns>
public DStream<KeyValuePair<byte[], byte[]>> KafkaStream(string zkQuorum, string groupId, Dictionary<string, int> topics, Dictionary<string, string> kafkaParams)
{
return this.KafkaStream(zkQuorum, groupId, topics, kafkaParams, StorageLevelType.MEMORY_AND_DISK_SER_2);
}
/// <summary>
/// Create an input stream that pulls messages from a Kafka Broker.
/// </summary>
/// <param name="zkQuorum">Zookeeper quorum (hostname:port,hostname:port,..).</param>
/// <param name="groupId">The group id for this consumer.</param>
/// <param name="topics">Dict of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.</param>
/// <param name="kafkaParams">Additional params for Kafka</param>
/// <param name="storageLevelType">RDD storage level.</param>
/// <returns>A DStream object</returns>
public DStream<KeyValuePair<byte[], byte[]>> KafkaStream(string zkQuorum, string groupId, Dictionary<string, int> topics, Dictionary<string, string> kafkaParams, StorageLevelType storageLevelType)
{
if (kafkaParams == null)
kafkaParams = new Dictionary<string, string>();
if (!string.IsNullOrEmpty(zkQuorum))
kafkaParams["zookeeper.connect"] = zkQuorum;
if (groupId != null)
kafkaParams["group.id"] = groupId;
if (kafkaParams.ContainsKey("zookeeper.connection.timeout.ms"))
kafkaParams["zookeeper.connection.timeout.ms"] = "10000";
return new DStream<KeyValuePair<byte[], byte[]>>(this.streamingContextProxy.KafkaStream(topics, kafkaParams, storageLevelType), this);
}
/// <summary>
/// Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
///
/// This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
/// in each batch duration and processed without storing.
///
/// This does not use Zookeeper to store offsets. The consumed offsets are tracked
/// by the stream itself. For interoperability with Kafka monitoring tools that depend on
/// Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
/// You can access the offsets used in each batch from the generated RDDs (see
///
/// To recover from driver failures, you have to enable checkpointing in the StreamingContext.
/// The information on consumed offset can be recovered from the checkpoint.
/// See the programming guide for details (constraints, etc.).
///
/// </summary>
/// <param name="topics">list of topic_name to consume.</param>
/// <param name="kafkaParams">
/// Additional params for Kafka. Requires "metadata.broker.list" or "bootstrap.servers" to be set
/// with Kafka broker(s) (NOT zookeeper servers), specified in host1:port1,host2:port2 form.
/// </param>
/// <param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
/// <returns>A DStream object</returns>
public DStream<KeyValuePair<byte[], byte[]>> DirectKafkaStream(List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets)
{
return new DStream<KeyValuePair<byte[], byte[]>>(this.streamingContextProxy.DirectKafkaStream(topics, kafkaParams, fromOffsets), this, SerializedMode.Pair);
}
/// <summary>
/// Wait for the execution to stop.
/// </summary>

Просмотреть файл

@ -31,10 +31,14 @@ namespace AdapterTest
var socketStream = ssc.SocketTextStream(IPAddress.Loopback.ToString(), 12345);
Assert.IsNotNull(socketStream.DStreamProxy);
<<<<<<< 1b025447650d8c2a36b151dfb9e1b5ed5039e5f7
var kafkaStream = ssc.KafkaStream(IPAddress.Loopback + ":2181", "testGroupId", new Dictionary<string, int> { { "testTopic1", 1 } }, new Dictionary<string, string>());
=======
var kafkaStream = KafkaUtils.CreateStream(ssc, "127.0.0.1:2181", "testGroupId", new Dictionary<string, int> { { "testTopic1", 1 } }, new Dictionary<string, string>());
>>>>>>> add an overload of UpdateStateByKey API and move kafka DStream to KafkaUtil class
Assert.IsNotNull(kafkaStream.DStreamProxy);
var directKafkaStream = ssc.DirectKafkaStream(new List<string> { "testTopic2" }, new Dictionary<string, string>(), new Dictionary<string, long>());
var directKafkaStream = KafkaUtils.CreateDirectStream(ssc, new List<string> { "testTopic2" }, new Dictionary<string, string>(), new Dictionary<string, long>());
Assert.IsNotNull(directKafkaStream.DStreamProxy);
var union = ssc.Union(textFile, socketStream);