fix Duration type mismatch bug in
enable DStreamTextFileSample and add Window API change duration type to seconds for streaming bacth interval, Window, Checkpoint and Remember APIs fix Duration type mismatch bug in enable DStreamTextFileSample and add Window API Update JvmBridgeUtils.cs use GetJavaDuration utility method use GetJavaDuration utility method include Window API in unit test revert DStreamTextFileSample to experimental Window API included in unit test
This commit is contained in:
Родитель
434bcbfd04
Коммит
abe1dfbbab
|
@ -70,5 +70,11 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc
|
|||
{
|
||||
return new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "toSeq", GetJavaList<T>(enumerable)));
|
||||
}
|
||||
|
||||
public static JvmObjectReference GetJavaDuration(int durationSeconds)
|
||||
{
|
||||
// java expects Duration in mini seconds and must be of long type
|
||||
return SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)durationSeconds * 1000 });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ namespace Microsoft.Spark.CSharp.Proxy
|
|||
void CallForeachRDD(byte[] func, string serializedMode);
|
||||
void Print(int num = 10);
|
||||
void Persist(StorageLevelType storageLevelType);
|
||||
void Checkpoint(long intervalMs);
|
||||
void Checkpoint(int intervalSeconds);
|
||||
IRDDProxy[] Slice(long fromUnixTime, long toUnixTime);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ namespace Microsoft.Spark.CSharp.Proxy
|
|||
// or restore it from checkpoint. Thus this function is called before IStreamingContextProxy is initialized. So CheckpointExists()
|
||||
// should not be put to IStreamingContextProxy.
|
||||
bool CheckpointExists(string checkpointPath);
|
||||
IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs);
|
||||
IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds);
|
||||
IStreamingContextProxy CreateStreamingContext(string checkpointPath);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ namespace Microsoft.Spark.CSharp.Proxy
|
|||
SparkContext SparkContext { get; }
|
||||
void Start();
|
||||
void Stop();
|
||||
void Remember(long durationMs);
|
||||
void Remember(int durationSeconds);
|
||||
void Checkpoint(string directory);
|
||||
IDStreamProxy TextFileStream(string directory);
|
||||
IDStreamProxy SocketTextStream(string hostname, int port, StorageLevelType storageLevelType);
|
||||
|
|
|
@ -40,7 +40,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
public IDStreamProxy Window(int windowSeconds, int slideSeconds = 0)
|
||||
{
|
||||
string windowId = null;
|
||||
var windowDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { windowSeconds * 1000 });
|
||||
var windowDurationReference = JvmBridgeUtils.GetJavaDuration(windowSeconds);
|
||||
|
||||
if (slideSeconds <= 0)
|
||||
{
|
||||
|
@ -48,7 +48,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
return new DStreamIpcProxy(new JvmObjectReference(windowId));
|
||||
}
|
||||
|
||||
var slideDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { slideSeconds * 1000 });
|
||||
var slideDurationReference = JvmBridgeUtils.GetJavaDuration(slideSeconds);
|
||||
windowId = (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(javaDStreamReference, "window", new object[] { windowDurationReference, slideDurationReference });
|
||||
|
||||
return new DStreamIpcProxy(new JvmObjectReference(windowId));
|
||||
|
@ -77,9 +77,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "persist", new object[] { jstorageLevel });
|
||||
}
|
||||
|
||||
public void Checkpoint(long intervalMs)
|
||||
public void Checkpoint(int intervalSeconds)
|
||||
{
|
||||
var jinterval = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { intervalMs });
|
||||
var jinterval = JvmBridgeUtils.GetJavaDuration(intervalSeconds);
|
||||
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "checkpoint", new object[] { jinterval });
|
||||
}
|
||||
|
||||
|
|
|
@ -80,9 +80,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "listStatus", path) != null;
|
||||
}
|
||||
|
||||
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
|
||||
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds)
|
||||
{
|
||||
streamingContextIpcProxy = new StreamingContextIpcProxy(sparkContext, durationMs);
|
||||
streamingContextIpcProxy = new StreamingContextIpcProxy(sparkContext, durationSeconds);
|
||||
return streamingContextIpcProxy;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,11 +43,11 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
}
|
||||
}
|
||||
|
||||
public StreamingContextIpcProxy(SparkContext sparkContext, long durationMs)
|
||||
public StreamingContextIpcProxy(SparkContext sparkContext, int durationSeconds)
|
||||
{
|
||||
this.sparkContext = sparkContext;
|
||||
sparkContextProxy = sparkContext.SparkContextProxy;
|
||||
var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { durationMs });
|
||||
var jduration = JvmBridgeUtils.GetJavaDuration(durationSeconds);
|
||||
|
||||
JvmObjectReference jvmSparkContextReference = (sparkContextProxy as SparkContextIpcProxy).JvmSparkContextReference;
|
||||
jvmStreamingContextReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.StreamingContext", new object[] { jvmSparkContextReference, jduration });
|
||||
|
@ -99,9 +99,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("SparkCLRHandler", "closeCallback");
|
||||
}
|
||||
|
||||
public void Remember(long durationMs)
|
||||
public void Remember(int durationSeconds)
|
||||
{
|
||||
var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (int)durationMs });
|
||||
var jduration = JvmBridgeUtils.GetJavaDuration(durationSeconds);
|
||||
|
||||
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "remember", new object[] { jduration });
|
||||
}
|
||||
|
@ -134,8 +134,8 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
|
|||
|
||||
public IDStreamProxy CreateCSharpReducedWindowedDStream(IDStreamProxy jdstream, byte[] func, byte[] invFunc, int windowSeconds, int slideSeconds, string serializationMode)
|
||||
{
|
||||
var windowDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long) windowSeconds * 1000 });
|
||||
var slideDurationReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long) slideSeconds * 1000 });
|
||||
var windowDurationReference = JvmBridgeUtils.GetJavaDuration(windowSeconds);
|
||||
var slideDurationReference = JvmBridgeUtils.GetJavaDuration(slideSeconds);
|
||||
|
||||
var jvmDStreamReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.api.csharp.CSharpReducedWindowedDStream",
|
||||
new object[] { (jdstream as DStreamIpcProxy).jvmDStreamReference, func, invFunc, windowDurationReference, slideDurationReference, serializationMode });
|
||||
|
|
|
@ -221,12 +221,12 @@ namespace Microsoft.Spark.CSharp.Streaming
|
|||
/// <summary>
|
||||
/// Enable periodic checkpointing of RDDs of this DStream
|
||||
/// </summary>
|
||||
/// <param name="intervalMs">time in seconds, after each period of that, generated RDD will be checkpointed</param>
|
||||
/// <param name="intervalSeconds">time in seconds, after each period of that, generated RDD will be checkpointed</param>
|
||||
/// <returns></returns>
|
||||
public DStream<T> Checkpoint(long intervalMs)
|
||||
public DStream<T> Checkpoint(int intervalSeconds)
|
||||
{
|
||||
isCheckpointed = true;
|
||||
DStreamProxy.Checkpoint(intervalMs);
|
||||
DStreamProxy.Checkpoint(intervalSeconds);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -56,11 +56,11 @@ namespace Microsoft.Spark.CSharp.Streaming
|
|||
/// Initializes a new instance of StreamingContext with a existing SparkContext
|
||||
/// </summary>
|
||||
/// <param name="sparkContext">An existing SparkContext</param>
|
||||
/// <param name="durationMs">the time interval at which streaming data will be divided into batches</param>
|
||||
public StreamingContext(SparkContext sparkContext, long durationMs)
|
||||
/// <param name="durationSeconds">the time interval at which streaming data will be divided into batches</param>
|
||||
public StreamingContext(SparkContext sparkContext, int durationSeconds)
|
||||
{
|
||||
this.sparkContext = sparkContext;
|
||||
streamingContextProxy = SparkCLREnvironment.SparkCLRProxy.CreateStreamingContext(sparkContext, durationMs);
|
||||
streamingContextProxy = SparkCLREnvironment.SparkCLRProxy.CreateStreamingContext(sparkContext, durationSeconds);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -106,10 +106,10 @@ namespace Microsoft.Spark.CSharp.Streaming
|
|||
/// collection. This method allows the developer to specify how long to remember the RDDs (
|
||||
/// if the developer wishes to query old data outside the DStream computation).
|
||||
/// </summary>
|
||||
/// <param name="durationMs">Minimum duration that each DStream should remember its RDDs</param>
|
||||
public void Remember(long durationMs)
|
||||
/// <param name="durationSeconds">Minimum duration that each DStream should remember its RDDs</param>
|
||||
public void Remember(int durationSeconds)
|
||||
{
|
||||
streamingContextProxy.Remember(durationMs);
|
||||
streamingContextProxy.Remember(durationSeconds);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
|
@ -1951,6 +1951,11 @@
|
|||
and broadcast variables on that cluster.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Core.SparkContext.GetActiveSparkContext">
|
||||
<summary>
|
||||
Get existing SparkContext
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Core.SparkContext.#ctor(System.String,System.String,System.String)">
|
||||
<summary>
|
||||
Initializes a SparkContext instance with a specific master, application name, and spark home
|
||||
|
@ -6059,11 +6064,11 @@
|
|||
<param name="storageLevelType"></param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.DStream`1.Checkpoint(System.Int64)">
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.DStream`1.Checkpoint(System.Int32)">
|
||||
<summary>
|
||||
Enable periodic checkpointing of RDDs of this DStream
|
||||
</summary>
|
||||
<param name="intervalMs">time in seconds, after each period of that, generated RDD will be checkpointed</param>
|
||||
<param name="intervalSeconds">time in seconds, after each period of that, generated RDD will be checkpointed</param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.DStream`1.CountByValue(System.Int32)">
|
||||
|
@ -6320,7 +6325,7 @@
|
|||
<param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
|
||||
<returns>A DStream object</returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.CreateDirectStreamWithRepartition(Microsoft.Spark.CSharp.Streaming.StreamingContext,System.Collections.Generic.List{System.String},System.Collections.Generic.Dictionary{System.String,System.String},System.Collections.Generic.Dictionary{System.String,System.Int64},System.Int32)">
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.CreateDirectStreamWithRepartition(Microsoft.Spark.CSharp.Streaming.StreamingContext,System.Collections.Generic.List{System.String},System.Collections.Generic.Dictionary{System.String,System.String},System.Collections.Generic.Dictionary{System.String,System.Int64},System.Int32,System.Func{Microsoft.Spark.CSharp.Streaming.OffsetRange,System.Collections.Generic.IEnumerable{System.Collections.Generic.IList{System.Byte[]}}},System.Int32)">
|
||||
<summary>
|
||||
Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
|
||||
|
||||
|
@ -6352,8 +6357,28 @@
|
|||
If numPartitions = 0, repartition using original kafka partition count
|
||||
If numPartitions > 0, repartition using this parameter
|
||||
</param>
|
||||
<param name="readFunc">if defined, will be used to read kafka on C# side</param>
|
||||
<param name="readTimeout">timeout for readFunc if value > 0</param>
|
||||
<returns>A DStream object</returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.DirectKafkaStreamHelper.Execute(System.Int32,System.Collections.Generic.IEnumerable{System.Collections.Generic.KeyValuePair{System.Byte[],System.Byte[]}})">
|
||||
<summary>
|
||||
wrapping user defined kafka read function through OffsetRange interface
|
||||
collecting and passing back dropped offset range due to timeout if defined
|
||||
for JVM side driver to pick up and schedule a tast in next batch
|
||||
</summary>
|
||||
<param name="pid"></param>
|
||||
<param name="input"></param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.OffsetRange.op_Addition(Microsoft.Spark.CSharp.Streaming.OffsetRange,Microsoft.Spark.CSharp.Streaming.OffsetRange)">
|
||||
<summary>
|
||||
Accumulator friendly
|
||||
</summary>
|
||||
<param name="self"></param>
|
||||
<param name="other"></param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Streaming.MapWithStateDStream`4">
|
||||
<summary>
|
||||
DStream representing the stream of data generated by `mapWithState` operation on a pair DStream.
|
||||
|
@ -6724,12 +6749,12 @@
|
|||
</summary>
|
||||
<param name="streamingContextProxy"></param>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.#ctor(Microsoft.Spark.CSharp.Core.SparkContext,System.Int64)">
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.#ctor(Microsoft.Spark.CSharp.Core.SparkContext,System.Int32)">
|
||||
<summary>
|
||||
Initializes a new instance of StreamingContext with a existing SparkContext
|
||||
</summary>
|
||||
<param name="sparkContext">An existing SparkContext</param>
|
||||
<param name="durationMs">the time interval at which streaming data will be divided into batches</param>
|
||||
<param name="durationSeconds">the time interval at which streaming data will be divided into batches</param>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.GetOrCreate(System.String,System.Func{Microsoft.Spark.CSharp.Streaming.StreamingContext})">
|
||||
<summary>
|
||||
|
@ -6752,14 +6777,14 @@
|
|||
Stop the execution of the streams.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.Remember(System.Int64)">
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.Remember(System.Int32)">
|
||||
<summary>
|
||||
Set each DStreams in this context to remember RDDs it generated in the last given duration.
|
||||
DStreams remember RDDs only for a limited duration of time and releases them for garbage
|
||||
collection. This method allows the developer to specify how long to remember the RDDs (
|
||||
if the developer wishes to query old data outside the DStream computation).
|
||||
</summary>
|
||||
<param name="durationMs">Minimum duration that each DStream should remember its RDDs</param>
|
||||
<param name="durationSeconds">Minimum duration that each DStream should remember its RDDs</param>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.Checkpoint(System.String)">
|
||||
<summary>
|
||||
|
|
Различия файлов скрыты, потому что одна или несколько строк слишком длинны
|
@ -21,7 +21,7 @@ namespace AdapterTest
|
|||
[Test]
|
||||
public void TestDStreamMapReduce()
|
||||
{
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1);
|
||||
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
|
||||
|
||||
var lines = ssc.TextFileStream(Path.GetTempPath());
|
||||
|
@ -31,7 +31,8 @@ namespace AdapterTest
|
|||
|
||||
words.Slice(DateTime.MinValue, DateTime.MaxValue);
|
||||
words.Cache();
|
||||
words.Checkpoint(1000);
|
||||
words.Checkpoint(1);
|
||||
words.Window(1, 1);
|
||||
|
||||
words.Count().ForeachRDD((time, rdd) =>
|
||||
{
|
||||
|
@ -82,7 +83,7 @@ namespace AdapterTest
|
|||
[Test]
|
||||
public void TestDStreamTransform()
|
||||
{
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1);
|
||||
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
|
||||
|
||||
var lines = ssc.TextFileStream(Path.GetTempPath());
|
||||
|
@ -138,7 +139,7 @@ namespace AdapterTest
|
|||
[Test]
|
||||
public void TestDStreamJoin()
|
||||
{
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1);
|
||||
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
|
||||
|
||||
var lines = ssc.TextFileStream(Path.GetTempPath());
|
||||
|
@ -245,7 +246,7 @@ namespace AdapterTest
|
|||
[Test]
|
||||
public void TestDStreamUpdateStateByKey()
|
||||
{
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1);
|
||||
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
|
||||
|
||||
var lines = ssc.TextFileStream(Path.GetTempPath());
|
||||
|
@ -311,7 +312,7 @@ namespace AdapterTest
|
|||
SparkCLREnvironment.SparkCLRProxy = sparkClrProxy.Object;
|
||||
|
||||
var sparkConf = new SparkConf(false);
|
||||
var ssc = new StreamingContext(new SparkContext(sparkContextProxy.Object, sparkConf), 10000);
|
||||
var ssc = new StreamingContext(new SparkContext(sparkContextProxy.Object, sparkConf), 10);
|
||||
|
||||
var dstreamProxy = new Mock<IDStreamProxy>();
|
||||
var pairDStream = new DStream<KeyValuePair<string, int>>(dstreamProxy.Object, ssc);
|
||||
|
@ -412,7 +413,7 @@ namespace AdapterTest
|
|||
{
|
||||
var sc = new SparkContext("", "");
|
||||
var rdd = sc.Parallelize(Enumerable.Range(0, 10), 1);
|
||||
var ssc = new StreamingContext(sc, 1000);
|
||||
var ssc = new StreamingContext(sc, 1);
|
||||
|
||||
// test when rdd is null
|
||||
Assert.Throws<ArgumentNullException>(() => new ConstantInputDStream<int>(null, ssc));
|
||||
|
|
|
@ -26,7 +26,7 @@ namespace AdapterTest
|
|||
.Returns(mockDstreamProxy);
|
||||
|
||||
var mockSparkClrProxy = new Mock<ISparkCLRProxy>();
|
||||
mockSparkClrProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<long>()))
|
||||
mockSparkClrProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<int>()))
|
||||
.Returns(streamingContextProxy.Object);
|
||||
SparkCLREnvironment.SparkCLRProxy = mockSparkClrProxy.Object;
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ namespace AdapterTest.Mocks
|
|||
{
|
||||
}
|
||||
|
||||
public void Checkpoint(long intervalMs)
|
||||
public void Checkpoint(int intervalSeconds)
|
||||
{
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ namespace AdapterTest.Mocks
|
|||
return false;
|
||||
}
|
||||
|
||||
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
|
||||
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds)
|
||||
{
|
||||
streamingContextProxy = new MockStreamingContextProxy();
|
||||
return streamingContextProxy;
|
||||
|
|
|
@ -23,7 +23,7 @@ namespace AdapterTest.Mocks
|
|||
public void Stop()
|
||||
{}
|
||||
|
||||
public void Remember(long durationMs)
|
||||
public void Remember(int durationSeconds)
|
||||
{}
|
||||
|
||||
public void Checkpoint(string directory)
|
||||
|
|
|
@ -18,11 +18,11 @@ namespace AdapterTest
|
|||
[Test]
|
||||
public void TestStreamingContext()
|
||||
{
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
|
||||
var ssc = new StreamingContext(new SparkContext("", ""), 1);
|
||||
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
|
||||
|
||||
ssc.Start();
|
||||
ssc.Remember(1000);
|
||||
ssc.Remember(1);
|
||||
ssc.Checkpoint(Path.GetTempPath());
|
||||
|
||||
var textFile = ssc.TextFileStream(Path.GetTempPath());
|
||||
|
|
|
@ -57,7 +57,7 @@ namespace AdapterTest
|
|||
_mockSparkCLRProxy.Setup(m => m.CreateSparkConf(It.IsAny<bool>())).Returns(new MockSparkConfProxy()); // some of mocks which rarely change can be kept
|
||||
|
||||
_mockSparkCLRProxy.Setup(m => m.CreateSparkContext(It.IsAny<ISparkConfProxy>())).Returns(_mockSparkContextProxy.Object);
|
||||
_mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<long>())).Returns(_mockStreamingContextProxy.Object);
|
||||
_mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<int>())).Returns(_mockStreamingContextProxy.Object);
|
||||
_mockRddProxy.Setup(m => m.CollectAndServe()).Returns(() =>
|
||||
{
|
||||
TcpListener listener = new TcpListener(IPAddress.Loopback, 0);
|
||||
|
@ -123,7 +123,7 @@ namespace AdapterTest
|
|||
return _mockRddProxy.Object;
|
||||
});
|
||||
|
||||
_streamingContext = new StreamingContext(new SparkContext("", ""), 1000);
|
||||
_streamingContext = new StreamingContext(new SparkContext("", ""), 1);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ namespace Microsoft.Spark.CSharp
|
|||
{
|
||||
private static int count;
|
||||
private static bool stopFileServer;
|
||||
private static void StartFileServer(string directory, string pattern, int loop)
|
||||
private static void StartFileServer(StreamingContext ssc, string directory, string pattern, int loops = 1)
|
||||
{
|
||||
string testDir = Path.Combine(directory, "test");
|
||||
if (!Directory.Exists(testDir))
|
||||
|
@ -33,26 +33,29 @@ namespace Microsoft.Spark.CSharp
|
|||
|
||||
Task.Run(() =>
|
||||
{
|
||||
int loop = 0;
|
||||
while (!stopFileServer)
|
||||
{
|
||||
DateTime now = DateTime.Now;
|
||||
foreach (string path in files)
|
||||
if (loop++ < loops)
|
||||
{
|
||||
string text = File.ReadAllText(path);
|
||||
File.WriteAllText(testDir + "\\" + now.ToBinary() + "_" + Path.GetFileName(path), text);
|
||||
DateTime now = DateTime.Now;
|
||||
foreach (string path in files)
|
||||
{
|
||||
string text = File.ReadAllText(path);
|
||||
File.WriteAllText(testDir + "\\" + now.ToBinary() + "_" + Path.GetFileName(path), text);
|
||||
}
|
||||
}
|
||||
System.Threading.Thread.Sleep(200);
|
||||
}
|
||||
|
||||
System.Threading.Thread.Sleep(3000);
|
||||
|
||||
foreach (var file in Directory.GetFiles(testDir, "*"))
|
||||
File.Delete(file);
|
||||
ssc.Stop();
|
||||
});
|
||||
|
||||
System.Threading.Thread.Sleep(1);
|
||||
}
|
||||
|
||||
[Sample("experimental")]
|
||||
internal static void DStreamTextFileSamples()
|
||||
internal static void DStreamTextFileSample()
|
||||
{
|
||||
count = 0;
|
||||
|
||||
|
@ -66,7 +69,7 @@ namespace Microsoft.Spark.CSharp
|
|||
() =>
|
||||
{
|
||||
|
||||
StreamingContext context = new StreamingContext(sc, 2000);
|
||||
StreamingContext context = new StreamingContext(sc, 2);
|
||||
context.Checkpoint(checkpointPath);
|
||||
|
||||
var lines = context.TextFileStream(Path.Combine(directory, "test"));
|
||||
|
@ -78,7 +81,7 @@ namespace Microsoft.Spark.CSharp
|
|||
// separate dstream transformations defined in CSharpDStream.scala
|
||||
// an extra CSharpRDD is introduced in between these operations
|
||||
var wordCounts = pairs.ReduceByKey((x, y) => x + y);
|
||||
var join = wordCounts.Join(wordCounts, 2);
|
||||
var join = wordCounts.Window(2, 2).Join(wordCounts, 2);
|
||||
var state = join.UpdateStateByKey<string, Tuple<int, int>, int>(new UpdateStateHelper(b).Execute);
|
||||
|
||||
state.ForeachRDD((time, rdd) =>
|
||||
|
@ -94,21 +97,23 @@ namespace Microsoft.Spark.CSharp
|
|||
foreach (object record in taken)
|
||||
{
|
||||
Console.WriteLine(record);
|
||||
|
||||
var countByWord = (KeyValuePair<string, int>)record;
|
||||
Assert.AreEqual(countByWord.Value, countByWord.Key == "The" || countByWord.Key == "lazy" || countByWord.Key == "dog" ? 92 : 88);
|
||||
}
|
||||
Console.WriteLine();
|
||||
|
||||
stopFileServer = count++ > 100;
|
||||
stopFileServer = true;
|
||||
});
|
||||
|
||||
return context;
|
||||
});
|
||||
|
||||
StartFileServer(ssc, directory, "words.txt");
|
||||
|
||||
ssc.Start();
|
||||
|
||||
StartFileServer(directory, "words.txt", 100);
|
||||
|
||||
ssc.AwaitTermination();
|
||||
ssc.Stop();
|
||||
}
|
||||
|
||||
private static string brokers = ConfigurationManager.AppSettings["KafkaTestBrokers"] ?? "127.0.0.1:9092";
|
||||
|
@ -135,7 +140,7 @@ namespace Microsoft.Spark.CSharp
|
|||
() =>
|
||||
{
|
||||
SparkContext sc = SparkCLRSamples.SparkContext;
|
||||
StreamingContext context = new StreamingContext(sc, 2000);
|
||||
StreamingContext context = new StreamingContext(sc, 2);
|
||||
context.Checkpoint(checkpointPath);
|
||||
|
||||
var kafkaParams = new Dictionary<string, string> {
|
||||
|
@ -183,7 +188,7 @@ namespace Microsoft.Spark.CSharp
|
|||
internal static void DStreamConstantDStreamSample()
|
||||
{
|
||||
var sc = SparkCLRSamples.SparkContext;
|
||||
var ssc = new StreamingContext(sc, 2000);
|
||||
var ssc = new StreamingContext(sc, 2);
|
||||
|
||||
const int count = 100;
|
||||
const int partitions = 2;
|
||||
|
|
|
@ -54,7 +54,7 @@ namespace Microsoft.Spark.CSharp.Samples
|
|||
() =>
|
||||
{
|
||||
SparkContext sc = SparkCLRSamples.SparkContext;
|
||||
StreamingContext context = new StreamingContext(sc, 10000);
|
||||
StreamingContext context = new StreamingContext(sc, 10);
|
||||
context.Checkpoint(checkpointPath);
|
||||
|
||||
var lines = context.TextFileStream(Path.Combine(directory, "test1"));
|
||||
|
|
Загрузка…
Ссылка в новой задаче