From bd1eb4354d2b8445d7bd1b2eb56b46b01c8beff2 Mon Sep 17 00:00:00 2001 From: Renyi Xiong Date: Fri, 20 May 2016 11:12:52 -0700 Subject: [PATCH 01/10] fix Duration type mismatch bug in --- .../Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs index fcfea67..2e729a4 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs @@ -40,7 +40,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc public IDStreamProxy Window(int windowSeconds, int slideSeconds = 0) { string windowId = null; - var windowDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { windowSeconds * 1000 }); + var windowDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)windowSeconds * 1000 }); if (slideSeconds <= 0) { @@ -48,7 +48,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc return new DStreamIpcProxy(new JvmObjectReference(windowId)); } - var slideDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { slideSeconds * 1000 }); + var slideDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)slideSeconds * 1000 }); windowId = (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(javaDStreamReference, "window", new object[] { windowDurationReference, slideDurationReference }); return new DStreamIpcProxy(new JvmObjectReference(windowId)); From 0ee69f65a1a1b7e022a4c3db6e8548ccf71a97af Mon Sep 17 00:00:00 2001 From: Renyi Xiong Date: Fri, 20 May 2016 11:17:46 -0700 Subject: [PATCH 02/10] enable DStreamTextFileSample and add Window API --- .../Microsoft.Spark.CSharp/DStreamSamples.cs | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs b/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs index 4dfe245..cbaefe9 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs @@ -21,7 +21,7 @@ namespace Microsoft.Spark.CSharp { private static int count; private static bool stopFileServer; - private static void StartFileServer(string directory, string pattern, int loop) + private static void StartFileServer(StreamingContext ssc, string directory, string pattern, int loops = 1) { string testDir = Path.Combine(directory, "test"); if (!Directory.Exists(testDir)) @@ -33,26 +33,29 @@ namespace Microsoft.Spark.CSharp Task.Run(() => { + int loop = 0; while (!stopFileServer) { - DateTime now = DateTime.Now; - foreach (string path in files) + if (loop++ < loops) { - string text = File.ReadAllText(path); - File.WriteAllText(testDir + "\\" + now.ToBinary() + "_" + Path.GetFileName(path), text); + DateTime now = DateTime.Now; + foreach (string path in files) + { + string text = File.ReadAllText(path); + File.WriteAllText(testDir + "\\" + now.ToBinary() + "_" + Path.GetFileName(path), text); + } } System.Threading.Thread.Sleep(200); } - System.Threading.Thread.Sleep(3000); - - foreach (var file in Directory.GetFiles(testDir, "*")) - File.Delete(file); + ssc.Stop(); }); + + System.Threading.Thread.Sleep(1); } - [Sample("experimental")] - internal static void DStreamTextFileSamples() + [Sample] + internal static void DStreamTextFileSample() { count = 0; @@ -78,7 +81,7 @@ namespace Microsoft.Spark.CSharp // separate dstream transformations defined in CSharpDStream.scala // an extra CSharpRDD is introduced in between these operations var wordCounts = pairs.ReduceByKey((x, y) => x + y); - var join = wordCounts.Join(wordCounts, 2); + var join = wordCounts.Window(2, 2).Join(wordCounts, 2); var state = join.UpdateStateByKey, int>(new UpdateStateHelper(b).Execute); state.ForeachRDD((time, rdd) => @@ -94,21 +97,23 @@ namespace Microsoft.Spark.CSharp foreach (object record in taken) { Console.WriteLine(record); + + var countByWord = (KeyValuePair)record; + Assert.AreEqual(countByWord.Value, countByWord.Key == "The" || countByWord.Key == "lazy" || countByWord.Key == "dog" ? 92 : 88); } Console.WriteLine(); - stopFileServer = count++ > 100; + stopFileServer = true; }); return context; }); + StartFileServer(ssc, directory, "words.txt"); + ssc.Start(); - StartFileServer(directory, "words.txt", 100); - ssc.AwaitTermination(); - ssc.Stop(); } private static string brokers = ConfigurationManager.AppSettings["KafkaTestBrokers"] ?? "127.0.0.1:9092"; From 69f5673741ba2326362ab2d819ef5ae0407b0151 Mon Sep 17 00:00:00 2001 From: Renyi Xiong Date: Fri, 20 May 2016 11:12:52 -0700 Subject: [PATCH 03/10] fix Duration type mismatch bug in --- .../Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs index fcfea67..2e729a4 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs @@ -40,7 +40,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc public IDStreamProxy Window(int windowSeconds, int slideSeconds = 0) { string windowId = null; - var windowDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { windowSeconds * 1000 }); + var windowDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)windowSeconds * 1000 }); if (slideSeconds <= 0) { @@ -48,7 +48,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc return new DStreamIpcProxy(new JvmObjectReference(windowId)); } - var slideDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { slideSeconds * 1000 }); + var slideDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)slideSeconds * 1000 }); windowId = (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(javaDStreamReference, "window", new object[] { windowDurationReference, slideDurationReference }); return new DStreamIpcProxy(new JvmObjectReference(windowId)); From d8f96a9fe5285efe852e23a344ef400a7ee0313b Mon Sep 17 00:00:00 2001 From: Renyi Xiong Date: Fri, 20 May 2016 11:17:46 -0700 Subject: [PATCH 04/10] enable DStreamTextFileSample and add Window API --- .../Microsoft.Spark.CSharp/DStreamSamples.cs | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs b/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs index 4dfe245..cbaefe9 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs @@ -21,7 +21,7 @@ namespace Microsoft.Spark.CSharp { private static int count; private static bool stopFileServer; - private static void StartFileServer(string directory, string pattern, int loop) + private static void StartFileServer(StreamingContext ssc, string directory, string pattern, int loops = 1) { string testDir = Path.Combine(directory, "test"); if (!Directory.Exists(testDir)) @@ -33,26 +33,29 @@ namespace Microsoft.Spark.CSharp Task.Run(() => { + int loop = 0; while (!stopFileServer) { - DateTime now = DateTime.Now; - foreach (string path in files) + if (loop++ < loops) { - string text = File.ReadAllText(path); - File.WriteAllText(testDir + "\\" + now.ToBinary() + "_" + Path.GetFileName(path), text); + DateTime now = DateTime.Now; + foreach (string path in files) + { + string text = File.ReadAllText(path); + File.WriteAllText(testDir + "\\" + now.ToBinary() + "_" + Path.GetFileName(path), text); + } } System.Threading.Thread.Sleep(200); } - System.Threading.Thread.Sleep(3000); - - foreach (var file in Directory.GetFiles(testDir, "*")) - File.Delete(file); + ssc.Stop(); }); + + System.Threading.Thread.Sleep(1); } - [Sample("experimental")] - internal static void DStreamTextFileSamples() + [Sample] + internal static void DStreamTextFileSample() { count = 0; @@ -78,7 +81,7 @@ namespace Microsoft.Spark.CSharp // separate dstream transformations defined in CSharpDStream.scala // an extra CSharpRDD is introduced in between these operations var wordCounts = pairs.ReduceByKey((x, y) => x + y); - var join = wordCounts.Join(wordCounts, 2); + var join = wordCounts.Window(2, 2).Join(wordCounts, 2); var state = join.UpdateStateByKey, int>(new UpdateStateHelper(b).Execute); state.ForeachRDD((time, rdd) => @@ -94,21 +97,23 @@ namespace Microsoft.Spark.CSharp foreach (object record in taken) { Console.WriteLine(record); + + var countByWord = (KeyValuePair)record; + Assert.AreEqual(countByWord.Value, countByWord.Key == "The" || countByWord.Key == "lazy" || countByWord.Key == "dog" ? 92 : 88); } Console.WriteLine(); - stopFileServer = count++ > 100; + stopFileServer = true; }); return context; }); + StartFileServer(ssc, directory, "words.txt"); + ssc.Start(); - StartFileServer(directory, "words.txt", 100); - ssc.AwaitTermination(); - ssc.Stop(); } private static string brokers = ConfigurationManager.AppSettings["KafkaTestBrokers"] ?? "127.0.0.1:9092"; From eab9a533663f30cc13745c00de5f671aefdff7bd Mon Sep 17 00:00:00 2001 From: renyi Date: Fri, 20 May 2016 12:50:07 -0700 Subject: [PATCH 05/10] change duration type to seconds for streaming bacth interval, Window, Checkpoint and Remember APIs --- .../Proxy/IDStreamProxy.cs | 2 +- .../Proxy/ISparkCLRProxy.cs | 2 +- .../Proxy/IStreamingContextProxy.cs | 2 +- .../Proxy/Ipc/DStreamIpcProxy.cs | 4 +- .../Proxy/Ipc/SparkCLRIpcProxy.cs | 4 +- .../Proxy/Ipc/StreamingContextIpcProxy.cs | 8 ++-- .../Streaming/DStream.cs | 6 +-- .../Streaming/StreamingContext.cs | 12 +++--- .../Microsoft.Spark.CSharp.Adapter.Doc.XML | 39 +++++++++++++++---- .../documentation/Mobius_API_Documentation.md | 2 +- csharp/AdapterTest/DStreamTest.cs | 14 +++---- csharp/AdapterTest/EventHubsUtilsTest.cs | 2 +- csharp/AdapterTest/Mocks/MockDStreamProxy.cs | 2 +- csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs | 2 +- .../Mocks/MockStreamingContextProxy.cs | 2 +- csharp/AdapterTest/StreamingContextTest.cs | 4 +- csharp/AdapterTest/TestWithMoqDemo.cs | 4 +- .../Microsoft.Spark.CSharp/DStreamSamples.cs | 6 +-- .../DStreamStateSample.cs | 2 +- 19 files changed, 72 insertions(+), 47 deletions(-) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IDStreamProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IDStreamProxy.cs index c348af1..0c801c3 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IDStreamProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IDStreamProxy.cs @@ -19,7 +19,7 @@ namespace Microsoft.Spark.CSharp.Proxy void CallForeachRDD(byte[] func, string serializedMode); void Print(int num = 10); void Persist(StorageLevelType storageLevelType); - void Checkpoint(long intervalMs); + void Checkpoint(int intervalSeconds); IRDDProxy[] Slice(long fromUnixTime, long toUnixTime); } } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs index 454e58c..d61569f 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs @@ -16,7 +16,7 @@ namespace Microsoft.Spark.CSharp.Proxy // or restore it from checkpoint. Thus this function is called before IStreamingContextProxy is initialized. So CheckpointExists() // should not be put to IStreamingContextProxy. bool CheckpointExists(string checkpointPath); - IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs); + IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds); IStreamingContextProxy CreateStreamingContext(string checkpointPath); } } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IStreamingContextProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IStreamingContextProxy.cs index 8af21d6..bad1d8d 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IStreamingContextProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IStreamingContextProxy.cs @@ -16,7 +16,7 @@ namespace Microsoft.Spark.CSharp.Proxy SparkContext SparkContext { get; } void Start(); void Stop(); - void Remember(long durationMs); + void Remember(int durationSeconds); void Checkpoint(string directory); IDStreamProxy TextFileStream(string directory); IDStreamProxy SocketTextStream(string hostname, int port, StorageLevelType storageLevelType); diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs index 2e729a4..78878a6 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs @@ -77,9 +77,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "persist", new object[] { jstorageLevel }); } - public void Checkpoint(long intervalMs) + public void Checkpoint(int intervalSeconds) { - var jinterval = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { intervalMs }); + var jinterval = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)intervalSeconds * 1000 }); SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "checkpoint", new object[] { jinterval }); } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs index c120d9e..2eb55a1 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs @@ -80,9 +80,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "listStatus", path) != null; } - public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs) + public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds) { - streamingContextIpcProxy = new StreamingContextIpcProxy(sparkContext, durationMs); + streamingContextIpcProxy = new StreamingContextIpcProxy(sparkContext, durationSeconds); return streamingContextIpcProxy; } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs index 46816c2..2801186 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs @@ -43,11 +43,11 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc } } - public StreamingContextIpcProxy(SparkContext sparkContext, long durationMs) + public StreamingContextIpcProxy(SparkContext sparkContext, int durationSeconds) { this.sparkContext = sparkContext; sparkContextProxy = sparkContext.SparkContextProxy; - var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { durationMs }); + var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)durationSeconds * 1000 }); JvmObjectReference jvmSparkContextReference = (sparkContextProxy as SparkContextIpcProxy).JvmSparkContextReference; jvmStreamingContextReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.StreamingContext", new object[] { jvmSparkContextReference, jduration }); @@ -99,9 +99,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("SparkCLRHandler", "closeCallback"); } - public void Remember(long durationMs) + public void Remember(int durationSeconds) { - var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (int)durationMs }); + var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)durationSeconds * 1000 }); SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "remember", new object[] { jduration }); } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/DStream.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/DStream.cs index 6cb6c49..bab2717 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/DStream.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/DStream.cs @@ -221,12 +221,12 @@ namespace Microsoft.Spark.CSharp.Streaming /// /// Enable periodic checkpointing of RDDs of this DStream /// - /// time in seconds, after each period of that, generated RDD will be checkpointed + /// time in seconds, after each period of that, generated RDD will be checkpointed /// - public DStream Checkpoint(long intervalMs) + public DStream Checkpoint(int intervalSeconds) { isCheckpointed = true; - DStreamProxy.Checkpoint(intervalMs); + DStreamProxy.Checkpoint(intervalSeconds); return this; } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/StreamingContext.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/StreamingContext.cs index be85a92..fe06c7a 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/StreamingContext.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/StreamingContext.cs @@ -56,11 +56,11 @@ namespace Microsoft.Spark.CSharp.Streaming /// Initializes a new instance of StreamingContext with a existing SparkContext /// /// An existing SparkContext - /// the time interval at which streaming data will be divided into batches - public StreamingContext(SparkContext sparkContext, long durationMs) + /// the time interval at which streaming data will be divided into batches + public StreamingContext(SparkContext sparkContext, int durationSeconds) { this.sparkContext = sparkContext; - streamingContextProxy = SparkCLREnvironment.SparkCLRProxy.CreateStreamingContext(sparkContext, durationMs); + streamingContextProxy = SparkCLREnvironment.SparkCLRProxy.CreateStreamingContext(sparkContext, durationSeconds); } /// @@ -106,10 +106,10 @@ namespace Microsoft.Spark.CSharp.Streaming /// collection. This method allows the developer to specify how long to remember the RDDs ( /// if the developer wishes to query old data outside the DStream computation). /// - /// Minimum duration that each DStream should remember its RDDs - public void Remember(long durationMs) + /// Minimum duration that each DStream should remember its RDDs + public void Remember(int durationSeconds) { - streamingContextProxy.Remember(durationMs); + streamingContextProxy.Remember(durationSeconds); } /// diff --git a/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML b/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML index d9198ed..52be22f 100644 --- a/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML +++ b/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML @@ -1951,6 +1951,11 @@ and broadcast variables on that cluster. + + + Get existing SparkContext + + Initializes a SparkContext instance with a specific master, application name, and spark home @@ -6059,11 +6064,11 @@ - + Enable periodic checkpointing of RDDs of this DStream - time in seconds, after each period of that, generated RDD will be checkpointed + time in seconds, after each period of that, generated RDD will be checkpointed @@ -6320,7 +6325,7 @@ Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream. A DStream object - + Create an input stream that directly pulls messages from a Kafka Broker and specific offset. @@ -6352,8 +6357,28 @@ If numPartitions = 0, repartition using original kafka partition count If numPartitions > 0, repartition using this parameter + if defined, will be used to read kafka on C# side + timeout for readFunc if value > 0 A DStream object + + + wrapping user defined kafka read function through OffsetRange interface + collecting and passing back dropped offset range due to timeout if defined + for JVM side driver to pick up and schedule a tast in next batch + + + + + + + + Accumulator friendly + + + + + DStream representing the stream of data generated by `mapWithState` operation on a pair DStream. @@ -6724,12 +6749,12 @@ - + Initializes a new instance of StreamingContext with a existing SparkContext An existing SparkContext - the time interval at which streaming data will be divided into batches + the time interval at which streaming data will be divided into batches @@ -6752,14 +6777,14 @@ Stop the execution of the streams. - + Set each DStreams in this context to remember RDDs it generated in the last given duration. DStreams remember RDDs only for a limited duration of time and releases them for garbage collection. This method allows the developer to specify how long to remember the RDDs ( if the developer wishes to query old data outside the DStream computation). - Minimum duration that each DStream should remember its RDDs + Minimum duration that each DStream should remember its RDDs diff --git a/csharp/Adapter/documentation/Mobius_API_Documentation.md b/csharp/Adapter/documentation/Mobius_API_Documentation.md index 9e219fd..2ad1246 100644 --- a/csharp/Adapter/documentation/Mobius_API_Documentation.md +++ b/csharp/Adapter/documentation/Mobius_API_Documentation.md @@ -293,7 +293,7 @@ ####Methods -
NameDescription
TextFileRead a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
Parallelize``1Distribute a local collection to form an RDD. sc.Parallelize(new int[] {0, 2, 3, 4, 6}, 5).Glom().Collect() [[0], [2], [3], [4], [6]]
EmptyRDDCreate an RDD that has no partitions or elements.
WholeTextFilesRead a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file. For example, if you have the following files: {{{ hdfs://a-hdfs-path/part-00000 hdfs://a-hdfs-path/part-00001 ... hdfs://a-hdfs-path/part-nnnnn }}} Do {{{ RDD<KeyValuePair<string, string>> rdd = sparkContext.WholeTextFiles("hdfs://a-hdfs-path") }}} then `rdd` contains {{{ (a-hdfs-path/part-00000, its content) (a-hdfs-path/part-00001, its content) ... (a-hdfs-path/part-nnnnn, its content) }}} Small files are preferred, large file is also allowable, but may cause bad performance. minPartitions A suggestion value of the minimal splitting number for input data.
BinaryFilesRead a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file. For example, if you have the following files: {{{ hdfs://a-hdfs-path/part-00000 hdfs://a-hdfs-path/part-00001 ... hdfs://a-hdfs-path/part-nnnnn }}} Do RDD<KeyValuePair<string, byte[]>>"/> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, then `rdd` contains {{{ (a-hdfs-path/part-00000, its content) (a-hdfs-path/part-00001, its content) ... (a-hdfs-path/part-nnnnn, its content) }}} @note Small files are preferred; very large files but may cause bad performance. @param minPartitions A suggestion value of the minimal splitting number for input data.
SequenceFileRead a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows: 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes 2. Serialization is attempted via Pyrolite pickling 3. If this fails, the fallback is to call 'toString' on each key and value 4. PickleSerializer is used to deserialize pickled objects on the Python side
NewAPIHadoopFileRead a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
NewAPIHadoopRDDRead a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.
HadoopFileRead an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java.
HadoopRDDRead an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.
Union``1Build the union of a list of RDDs. This supports unions() of RDDs with different serialized formats, although this forces them to be reserialized using the default serializer: >>> path = os.path.join(tempdir, "union-text.txt") >>> with open(path, "w") as testFile: ... _ = testFile.write("Hello") >>> textFile = sc.textFile(path) >>> textFile.collect() [u'Hello'] >>> parallelized = sc.parallelize(["World!"]) >>> sorted(sc.union([textFile, parallelized]).collect()) [u'Hello', 'World!']
Broadcast``1Broadcast a read-only variable to the cluster, returning a Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
Accumulator``1Create an with the given initial value, using a given helper object to define how to add values of the data type if provided. Default AccumulatorParams are used for integers and floating-point numbers if you do not provide one. For other types, a custom AccumulatorParam can be used.
StopShut down the SparkContext.
AddFileAdd a file to be downloaded with this Spark job on every node. The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, use `SparkFiles.get(fileName)` to find its download location.
SetCheckpointDirSet the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster.
SetJobGroupAssigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared. Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group. The application can also use [[org.apache.spark.api.java.JavaSparkContext.cancelJobGroup]] to cancel all running jobs in this group. For example, {{{ // In the main thread: sc.setJobGroup("some_job_to_cancel", "some job description"); rdd.map(...).count(); // In a separate thread: sc.cancelJobGroup("some_job_to_cancel"); }}} If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
SetLocalPropertySet a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.
GetLocalPropertyGet a local property set in this thread, or null if it is missing. See [[org.apache.spark.api.java.JavaSparkContext.setLocalProperty]].
SetLogLevelControl our logLevel. This overrides any user-defined log settings. @param logLevel The desired log level as a string. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
CancelJobGroupCancel active jobs for the specified group. See for more information.
CancelAllJobsCancel all jobs that have been scheduled or are running.
+
NameDescription
GetActiveSparkContextGet existing SparkContext
TextFileRead a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
Parallelize``1Distribute a local collection to form an RDD. sc.Parallelize(new int[] {0, 2, 3, 4, 6}, 5).Glom().Collect() [[0], [2], [3], [4], [6]]
EmptyRDDCreate an RDD that has no partitions or elements.
WholeTextFilesRead a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file. For example, if you have the following files: {{{ hdfs://a-hdfs-path/part-00000 hdfs://a-hdfs-path/part-00001 ... hdfs://a-hdfs-path/part-nnnnn }}} Do {{{ RDD<KeyValuePair<string, string>> rdd = sparkContext.WholeTextFiles("hdfs://a-hdfs-path") }}} then `rdd` contains {{{ (a-hdfs-path/part-00000, its content) (a-hdfs-path/part-00001, its content) ... (a-hdfs-path/part-nnnnn, its content) }}} Small files are preferred, large file is also allowable, but may cause bad performance. minPartitions A suggestion value of the minimal splitting number for input data.
BinaryFilesRead a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file. For example, if you have the following files: {{{ hdfs://a-hdfs-path/part-00000 hdfs://a-hdfs-path/part-00001 ... hdfs://a-hdfs-path/part-nnnnn }}} Do RDD<KeyValuePair<string, byte[]>>"/> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`, then `rdd` contains {{{ (a-hdfs-path/part-00000, its content) (a-hdfs-path/part-00001, its content) ... (a-hdfs-path/part-nnnnn, its content) }}} @note Small files are preferred; very large files but may cause bad performance. @param minPartitions A suggestion value of the minimal splitting number for input data.
SequenceFileRead a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows: 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes 2. Serialization is attempted via Pyrolite pickling 3. If this fails, the fallback is to call 'toString' on each key and value 4. PickleSerializer is used to deserialize pickled objects on the Python side
NewAPIHadoopFileRead a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
NewAPIHadoopRDDRead a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.
HadoopFileRead an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java.
HadoopRDDRead an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.
Union``1Build the union of a list of RDDs. This supports unions() of RDDs with different serialized formats, although this forces them to be reserialized using the default serializer: >>> path = os.path.join(tempdir, "union-text.txt") >>> with open(path, "w") as testFile: ... _ = testFile.write("Hello") >>> textFile = sc.textFile(path) >>> textFile.collect() [u'Hello'] >>> parallelized = sc.parallelize(["World!"]) >>> sorted(sc.union([textFile, parallelized]).collect()) [u'Hello', 'World!']
Broadcast``1Broadcast a read-only variable to the cluster, returning a Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.
Accumulator``1Create an with the given initial value, using a given helper object to define how to add values of the data type if provided. Default AccumulatorParams are used for integers and floating-point numbers if you do not provide one. For other types, a custom AccumulatorParam can be used.
StopShut down the SparkContext.
AddFileAdd a file to be downloaded with this Spark job on every node. The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, use `SparkFiles.get(fileName)` to find its download location.
SetCheckpointDirSet the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster.
SetJobGroupAssigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared. Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group. The application can also use [[org.apache.spark.api.java.JavaSparkContext.cancelJobGroup]] to cancel all running jobs in this group. For example, {{{ // In the main thread: sc.setJobGroup("some_job_to_cancel", "some job description"); rdd.map(...).count(); // In a separate thread: sc.cancelJobGroup("some_job_to_cancel"); }}} If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
SetLocalPropertySet a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.
GetLocalPropertyGet a local property set in this thread, or null if it is missing. See [[org.apache.spark.api.java.JavaSparkContext.setLocalProperty]].
SetLogLevelControl our logLevel. This overrides any user-defined log settings. @param logLevel The desired log level as a string. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
CancelJobGroupCancel active jobs for the specified group. See for more information.
CancelAllJobsCancel all jobs that have been scheduled or are running.
--- diff --git a/csharp/AdapterTest/DStreamTest.cs b/csharp/AdapterTest/DStreamTest.cs index a46ff06..7b0022a 100644 --- a/csharp/AdapterTest/DStreamTest.cs +++ b/csharp/AdapterTest/DStreamTest.cs @@ -21,7 +21,7 @@ namespace AdapterTest [Test] public void TestDStreamMapReduce() { - var ssc = new StreamingContext(new SparkContext("", ""), 1000); + var ssc = new StreamingContext(new SparkContext("", ""), 1); Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy)); var lines = ssc.TextFileStream(Path.GetTempPath()); @@ -31,7 +31,7 @@ namespace AdapterTest words.Slice(DateTime.MinValue, DateTime.MaxValue); words.Cache(); - words.Checkpoint(1000); + words.Checkpoint(1); words.Count().ForeachRDD((time, rdd) => { @@ -82,7 +82,7 @@ namespace AdapterTest [Test] public void TestDStreamTransform() { - var ssc = new StreamingContext(new SparkContext("", ""), 1000); + var ssc = new StreamingContext(new SparkContext("", ""), 1); Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy)); var lines = ssc.TextFileStream(Path.GetTempPath()); @@ -138,7 +138,7 @@ namespace AdapterTest [Test] public void TestDStreamJoin() { - var ssc = new StreamingContext(new SparkContext("", ""), 1000); + var ssc = new StreamingContext(new SparkContext("", ""), 1); Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy)); var lines = ssc.TextFileStream(Path.GetTempPath()); @@ -245,7 +245,7 @@ namespace AdapterTest [Test] public void TestDStreamUpdateStateByKey() { - var ssc = new StreamingContext(new SparkContext("", ""), 1000); + var ssc = new StreamingContext(new SparkContext("", ""), 1); Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy)); var lines = ssc.TextFileStream(Path.GetTempPath()); @@ -311,7 +311,7 @@ namespace AdapterTest SparkCLREnvironment.SparkCLRProxy = sparkClrProxy.Object; var sparkConf = new SparkConf(false); - var ssc = new StreamingContext(new SparkContext(sparkContextProxy.Object, sparkConf), 10000); + var ssc = new StreamingContext(new SparkContext(sparkContextProxy.Object, sparkConf), 10); var dstreamProxy = new Mock(); var pairDStream = new DStream>(dstreamProxy.Object, ssc); @@ -412,7 +412,7 @@ namespace AdapterTest { var sc = new SparkContext("", ""); var rdd = sc.Parallelize(Enumerable.Range(0, 10), 1); - var ssc = new StreamingContext(sc, 1000); + var ssc = new StreamingContext(sc, 1); // test when rdd is null Assert.Throws(() => new ConstantInputDStream(null, ssc)); diff --git a/csharp/AdapterTest/EventHubsUtilsTest.cs b/csharp/AdapterTest/EventHubsUtilsTest.cs index 428d4b6..6ce9403 100644 --- a/csharp/AdapterTest/EventHubsUtilsTest.cs +++ b/csharp/AdapterTest/EventHubsUtilsTest.cs @@ -26,7 +26,7 @@ namespace AdapterTest .Returns(mockDstreamProxy); var mockSparkClrProxy = new Mock(); - mockSparkClrProxy.Setup(m => m.CreateStreamingContext(It.IsAny(), It.IsAny())) + mockSparkClrProxy.Setup(m => m.CreateStreamingContext(It.IsAny(), It.IsAny())) .Returns(streamingContextProxy.Object); SparkCLREnvironment.SparkCLRProxy = mockSparkClrProxy.Object; diff --git a/csharp/AdapterTest/Mocks/MockDStreamProxy.cs b/csharp/AdapterTest/Mocks/MockDStreamProxy.cs index 1217289..145bdd9 100644 --- a/csharp/AdapterTest/Mocks/MockDStreamProxy.cs +++ b/csharp/AdapterTest/Mocks/MockDStreamProxy.cs @@ -57,7 +57,7 @@ namespace AdapterTest.Mocks { } - public void Checkpoint(long intervalMs) + public void Checkpoint(int intervalSeconds) { } diff --git a/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs b/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs index 911fac3..9030d4d 100644 --- a/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs +++ b/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs @@ -58,7 +58,7 @@ namespace AdapterTest.Mocks return false; } - public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs) + public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds) { streamingContextProxy = new MockStreamingContextProxy(); return streamingContextProxy; diff --git a/csharp/AdapterTest/Mocks/MockStreamingContextProxy.cs b/csharp/AdapterTest/Mocks/MockStreamingContextProxy.cs index 9df6c50..80eb508 100644 --- a/csharp/AdapterTest/Mocks/MockStreamingContextProxy.cs +++ b/csharp/AdapterTest/Mocks/MockStreamingContextProxy.cs @@ -23,7 +23,7 @@ namespace AdapterTest.Mocks public void Stop() {} - public void Remember(long durationMs) + public void Remember(int durationSeconds) {} public void Checkpoint(string directory) diff --git a/csharp/AdapterTest/StreamingContextTest.cs b/csharp/AdapterTest/StreamingContextTest.cs index af07c48..0c0ca93 100644 --- a/csharp/AdapterTest/StreamingContextTest.cs +++ b/csharp/AdapterTest/StreamingContextTest.cs @@ -18,11 +18,11 @@ namespace AdapterTest [Test] public void TestStreamingContext() { - var ssc = new StreamingContext(new SparkContext("", ""), 1000); + var ssc = new StreamingContext(new SparkContext("", ""), 1); Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy)); ssc.Start(); - ssc.Remember(1000); + ssc.Remember(1); ssc.Checkpoint(Path.GetTempPath()); var textFile = ssc.TextFileStream(Path.GetTempPath()); diff --git a/csharp/AdapterTest/TestWithMoqDemo.cs b/csharp/AdapterTest/TestWithMoqDemo.cs index 70c4f8d..bb0f4b2 100644 --- a/csharp/AdapterTest/TestWithMoqDemo.cs +++ b/csharp/AdapterTest/TestWithMoqDemo.cs @@ -57,7 +57,7 @@ namespace AdapterTest _mockSparkCLRProxy.Setup(m => m.CreateSparkConf(It.IsAny())).Returns(new MockSparkConfProxy()); // some of mocks which rarely change can be kept _mockSparkCLRProxy.Setup(m => m.CreateSparkContext(It.IsAny())).Returns(_mockSparkContextProxy.Object); - _mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny(), It.IsAny())).Returns(_mockStreamingContextProxy.Object); + _mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny(), It.IsAny())).Returns(_mockStreamingContextProxy.Object); _mockRddProxy.Setup(m => m.CollectAndServe()).Returns(() => { TcpListener listener = new TcpListener(IPAddress.Loopback, 0); @@ -123,7 +123,7 @@ namespace AdapterTest return _mockRddProxy.Object; }); - _streamingContext = new StreamingContext(new SparkContext("", ""), 1000); + _streamingContext = new StreamingContext(new SparkContext("", ""), 1); } diff --git a/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs b/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs index cbaefe9..f08ddaf 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs @@ -69,7 +69,7 @@ namespace Microsoft.Spark.CSharp () => { - StreamingContext context = new StreamingContext(sc, 2000); + StreamingContext context = new StreamingContext(sc, 2); context.Checkpoint(checkpointPath); var lines = context.TextFileStream(Path.Combine(directory, "test")); @@ -140,7 +140,7 @@ namespace Microsoft.Spark.CSharp () => { SparkContext sc = SparkCLRSamples.SparkContext; - StreamingContext context = new StreamingContext(sc, 2000); + StreamingContext context = new StreamingContext(sc, 2); context.Checkpoint(checkpointPath); var kafkaParams = new Dictionary { @@ -188,7 +188,7 @@ namespace Microsoft.Spark.CSharp internal static void DStreamConstantDStreamSample() { var sc = SparkCLRSamples.SparkContext; - var ssc = new StreamingContext(sc, 2000); + var ssc = new StreamingContext(sc, 2); const int count = 100; const int partitions = 2; diff --git a/csharp/Samples/Microsoft.Spark.CSharp/DStreamStateSample.cs b/csharp/Samples/Microsoft.Spark.CSharp/DStreamStateSample.cs index 1866774..324eba4 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/DStreamStateSample.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/DStreamStateSample.cs @@ -54,7 +54,7 @@ namespace Microsoft.Spark.CSharp.Samples () => { SparkContext sc = SparkCLRSamples.SparkContext; - StreamingContext context = new StreamingContext(sc, 10000); + StreamingContext context = new StreamingContext(sc, 10); context.Checkpoint(checkpointPath); var lines = context.TextFileStream(Path.Combine(directory, "test1")); From f7bf3cee5c73ebb6296f6341e0073b1f558769f8 Mon Sep 17 00:00:00 2001 From: Renyi Xiong Date: Wed, 25 May 2016 10:23:06 -0700 Subject: [PATCH 06/10] Update JvmBridgeUtils.cs --- .../Microsoft.Spark.CSharp/Interop/Ipc/JvmBridgeUtils.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JvmBridgeUtils.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JvmBridgeUtils.cs index cb250fe..1de526f 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JvmBridgeUtils.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JvmBridgeUtils.cs @@ -70,5 +70,11 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc { return new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "toSeq", GetJavaList(enumerable))); } + + public static JvmObjectReference GetJavaDuration(int durationSeconds) + { + // java expects Duration in mini seconds and must be of long type + return SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)durationSeconds * 1000 }); + } } } From 3776666a1a8c6f8d1cec50b6cdbe1389e27560b1 Mon Sep 17 00:00:00 2001 From: Renyi Xiong Date: Wed, 25 May 2016 10:25:51 -0700 Subject: [PATCH 07/10] use GetJavaDuration utility method --- .../Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs index 78878a6..5dec26f 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DStreamIpcProxy.cs @@ -40,7 +40,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc public IDStreamProxy Window(int windowSeconds, int slideSeconds = 0) { string windowId = null; - var windowDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)windowSeconds * 1000 }); + var windowDurationReference = JvmBridgeUtils.GetJavaDuration(windowSeconds); if (slideSeconds <= 0) { @@ -48,7 +48,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc return new DStreamIpcProxy(new JvmObjectReference(windowId)); } - var slideDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)slideSeconds * 1000 }); + var slideDurationReference = JvmBridgeUtils.GetJavaDuration(slideSeconds); windowId = (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(javaDStreamReference, "window", new object[] { windowDurationReference, slideDurationReference }); return new DStreamIpcProxy(new JvmObjectReference(windowId)); @@ -79,7 +79,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc public void Checkpoint(int intervalSeconds) { - var jinterval = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)intervalSeconds * 1000 }); + var jinterval = JvmBridgeUtils.GetJavaDuration(intervalSeconds); SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "checkpoint", new object[] { jinterval }); } From 1711a2dfe1d0e01efd1204c11d0d3be4f5281929 Mon Sep 17 00:00:00 2001 From: Renyi Xiong Date: Wed, 25 May 2016 10:27:46 -0700 Subject: [PATCH 08/10] use GetJavaDuration utility method --- .../Proxy/Ipc/StreamingContextIpcProxy.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs index 2801186..a05c265 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs @@ -47,7 +47,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc { this.sparkContext = sparkContext; sparkContextProxy = sparkContext.SparkContextProxy; - var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)durationSeconds * 1000 }); + var jduration = JvmBridgeUtils.GetJavaDuration(durationSeconds); JvmObjectReference jvmSparkContextReference = (sparkContextProxy as SparkContextIpcProxy).JvmSparkContextReference; jvmStreamingContextReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.StreamingContext", new object[] { jvmSparkContextReference, jduration }); @@ -101,7 +101,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc public void Remember(int durationSeconds) { - var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)durationSeconds * 1000 }); + var jduration = JvmBridgeUtils.GetJavaDuration(durationSeconds); SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "remember", new object[] { jduration }); } @@ -134,8 +134,8 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc public IDStreamProxy CreateCSharpReducedWindowedDStream(IDStreamProxy jdstream, byte[] func, byte[] invFunc, int windowSeconds, int slideSeconds, string serializationMode) { - var windowDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long) windowSeconds * 1000 }); - var slideDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long) slideSeconds * 1000 }); + var windowDurationReference = JvmBridgeUtils.GetJavaDuration(windowSeconds); + var slideDurationReference = JvmBridgeUtils.GetJavaDuration(slideSeconds); var jvmDStreamReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.api.csharp.CSharpReducedWindowedDStream", new object[] { (jdstream as DStreamIpcProxy).jvmDStreamReference, func, invFunc, windowDurationReference, slideDurationReference, serializationMode }); From 9909b7fc0d6a32adf08d96804c0de428d752f41d Mon Sep 17 00:00:00 2001 From: Renyi Xiong Date: Wed, 25 May 2016 10:28:48 -0700 Subject: [PATCH 09/10] include Window API in unit test --- csharp/AdapterTest/DStreamTest.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/csharp/AdapterTest/DStreamTest.cs b/csharp/AdapterTest/DStreamTest.cs index 7b0022a..8577854 100644 --- a/csharp/AdapterTest/DStreamTest.cs +++ b/csharp/AdapterTest/DStreamTest.cs @@ -32,6 +32,7 @@ namespace AdapterTest words.Slice(DateTime.MinValue, DateTime.MaxValue); words.Cache(); words.Checkpoint(1); + words.Window(1, 1); words.Count().ForeachRDD((time, rdd) => { From 78c3033f52afa93b634c3e2124a5f6ed4dc1a869 Mon Sep 17 00:00:00 2001 From: Renyi Xiong Date: Wed, 25 May 2016 10:30:49 -0700 Subject: [PATCH 10/10] revert DStreamTextFileSample to experimental Window API included in unit test --- csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs b/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs index f08ddaf..f69c1e8 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/DStreamSamples.cs @@ -54,7 +54,7 @@ namespace Microsoft.Spark.CSharp System.Threading.Thread.Sleep(1); } - [Sample] + [Sample("experimental")] internal static void DStreamTextFileSample() { count = 0;