change duration type to seconds for streaming bacth interval, Window, Checkpoint and Remember APIs

This commit is contained in:
renyi 2016-05-20 12:50:07 -07:00
Родитель d8f96a9fe5
Коммит eab9a53366
19 изменённых файлов: 72 добавлений и 47 удалений

Просмотреть файл

@ -19,7 +19,7 @@ namespace Microsoft.Spark.CSharp.Proxy
void CallForeachRDD(byte[] func, string serializedMode);
void Print(int num = 10);
void Persist(StorageLevelType storageLevelType);
void Checkpoint(long intervalMs);
void Checkpoint(int intervalSeconds);
IRDDProxy[] Slice(long fromUnixTime, long toUnixTime);
}
}

Просмотреть файл

@ -16,7 +16,7 @@ namespace Microsoft.Spark.CSharp.Proxy
// or restore it from checkpoint. Thus this function is called before IStreamingContextProxy is initialized. So CheckpointExists()
// should not be put to IStreamingContextProxy.
bool CheckpointExists(string checkpointPath);
IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs);
IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds);
IStreamingContextProxy CreateStreamingContext(string checkpointPath);
}
}

Просмотреть файл

@ -16,7 +16,7 @@ namespace Microsoft.Spark.CSharp.Proxy
SparkContext SparkContext { get; }
void Start();
void Stop();
void Remember(long durationMs);
void Remember(int durationSeconds);
void Checkpoint(string directory);
IDStreamProxy TextFileStream(string directory);
IDStreamProxy SocketTextStream(string hostname, int port, StorageLevelType storageLevelType);

Просмотреть файл

@ -77,9 +77,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "persist", new object[] { jstorageLevel });
}
public void Checkpoint(long intervalMs)
public void Checkpoint(int intervalSeconds)
{
var jinterval = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { intervalMs });
var jinterval = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)intervalSeconds * 1000 });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "checkpoint", new object[] { jinterval });
}

Просмотреть файл

@ -80,9 +80,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "listStatus", path) != null;
}
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds)
{
streamingContextIpcProxy = new StreamingContextIpcProxy(sparkContext, durationMs);
streamingContextIpcProxy = new StreamingContextIpcProxy(sparkContext, durationSeconds);
return streamingContextIpcProxy;
}

Просмотреть файл

@ -43,11 +43,11 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
}
}
public StreamingContextIpcProxy(SparkContext sparkContext, long durationMs)
public StreamingContextIpcProxy(SparkContext sparkContext, int durationSeconds)
{
this.sparkContext = sparkContext;
sparkContextProxy = sparkContext.SparkContextProxy;
var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { durationMs });
var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)durationSeconds * 1000 });
JvmObjectReference jvmSparkContextReference = (sparkContextProxy as SparkContextIpcProxy).JvmSparkContextReference;
jvmStreamingContextReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.StreamingContext", new object[] { jvmSparkContextReference, jduration });
@ -99,9 +99,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("SparkCLRHandler", "closeCallback");
}
public void Remember(long durationMs)
public void Remember(int durationSeconds)
{
var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (int)durationMs });
var jduration = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)durationSeconds * 1000 });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "remember", new object[] { jduration });
}

Просмотреть файл

@ -221,12 +221,12 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <summary>
/// Enable periodic checkpointing of RDDs of this DStream
/// </summary>
/// <param name="intervalMs">time in seconds, after each period of that, generated RDD will be checkpointed</param>
/// <param name="intervalSeconds">time in seconds, after each period of that, generated RDD will be checkpointed</param>
/// <returns></returns>
public DStream<T> Checkpoint(long intervalMs)
public DStream<T> Checkpoint(int intervalSeconds)
{
isCheckpointed = true;
DStreamProxy.Checkpoint(intervalMs);
DStreamProxy.Checkpoint(intervalSeconds);
return this;
}

Просмотреть файл

@ -56,11 +56,11 @@ namespace Microsoft.Spark.CSharp.Streaming
/// Initializes a new instance of StreamingContext with a existing SparkContext
/// </summary>
/// <param name="sparkContext">An existing SparkContext</param>
/// <param name="durationMs">the time interval at which streaming data will be divided into batches</param>
public StreamingContext(SparkContext sparkContext, long durationMs)
/// <param name="durationSeconds">the time interval at which streaming data will be divided into batches</param>
public StreamingContext(SparkContext sparkContext, int durationSeconds)
{
this.sparkContext = sparkContext;
streamingContextProxy = SparkCLREnvironment.SparkCLRProxy.CreateStreamingContext(sparkContext, durationMs);
streamingContextProxy = SparkCLREnvironment.SparkCLRProxy.CreateStreamingContext(sparkContext, durationSeconds);
}
/// <summary>
@ -106,10 +106,10 @@ namespace Microsoft.Spark.CSharp.Streaming
/// collection. This method allows the developer to specify how long to remember the RDDs (
/// if the developer wishes to query old data outside the DStream computation).
/// </summary>
/// <param name="durationMs">Minimum duration that each DStream should remember its RDDs</param>
public void Remember(long durationMs)
/// <param name="durationSeconds">Minimum duration that each DStream should remember its RDDs</param>
public void Remember(int durationSeconds)
{
streamingContextProxy.Remember(durationMs);
streamingContextProxy.Remember(durationSeconds);
}
/// <summary>

Просмотреть файл

@ -1951,6 +1951,11 @@
and broadcast variables on that cluster.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Core.SparkContext.GetActiveSparkContext">
<summary>
Get existing SparkContext
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Core.SparkContext.#ctor(System.String,System.String,System.String)">
<summary>
Initializes a SparkContext instance with a specific master, application name, and spark home
@ -6059,11 +6064,11 @@
<param name="storageLevelType"></param>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.DStream`1.Checkpoint(System.Int64)">
<member name="M:Microsoft.Spark.CSharp.Streaming.DStream`1.Checkpoint(System.Int32)">
<summary>
Enable periodic checkpointing of RDDs of this DStream
</summary>
<param name="intervalMs">time in seconds, after each period of that, generated RDD will be checkpointed</param>
<param name="intervalSeconds">time in seconds, after each period of that, generated RDD will be checkpointed</param>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.DStream`1.CountByValue(System.Int32)">
@ -6320,7 +6325,7 @@
<param name="fromOffsets">Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.</param>
<returns>A DStream object</returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.CreateDirectStreamWithRepartition(Microsoft.Spark.CSharp.Streaming.StreamingContext,System.Collections.Generic.List{System.String},System.Collections.Generic.Dictionary{System.String,System.String},System.Collections.Generic.Dictionary{System.String,System.Int64},System.Int32)">
<member name="M:Microsoft.Spark.CSharp.Streaming.KafkaUtils.CreateDirectStreamWithRepartition(Microsoft.Spark.CSharp.Streaming.StreamingContext,System.Collections.Generic.List{System.String},System.Collections.Generic.Dictionary{System.String,System.String},System.Collections.Generic.Dictionary{System.String,System.Int64},System.Int32,System.Func{Microsoft.Spark.CSharp.Streaming.OffsetRange,System.Collections.Generic.IEnumerable{System.Collections.Generic.IList{System.Byte[]}}},System.Int32)">
<summary>
Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
@ -6352,8 +6357,28 @@
If numPartitions = 0, repartition using original kafka partition count
If numPartitions > 0, repartition using this parameter
</param>
<param name="readFunc">if defined, will be used to read kafka on C# side</param>
<param name="readTimeout">timeout for readFunc if value > 0</param>
<returns>A DStream object</returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.DirectKafkaStreamHelper.Execute(System.Int32,System.Collections.Generic.IEnumerable{System.Collections.Generic.KeyValuePair{System.Byte[],System.Byte[]}})">
<summary>
wrapping user defined kafka read function through OffsetRange interface
collecting and passing back dropped offset range due to timeout if defined
for JVM side driver to pick up and schedule a tast in next batch
</summary>
<param name="pid"></param>
<param name="input"></param>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.OffsetRange.op_Addition(Microsoft.Spark.CSharp.Streaming.OffsetRange,Microsoft.Spark.CSharp.Streaming.OffsetRange)">
<summary>
Accumulator friendly
</summary>
<param name="self"></param>
<param name="other"></param>
<returns></returns>
</member>
<member name="T:Microsoft.Spark.CSharp.Streaming.MapWithStateDStream`4">
<summary>
DStream representing the stream of data generated by `mapWithState` operation on a pair DStream.
@ -6724,12 +6749,12 @@
</summary>
<param name="streamingContextProxy"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.#ctor(Microsoft.Spark.CSharp.Core.SparkContext,System.Int64)">
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.#ctor(Microsoft.Spark.CSharp.Core.SparkContext,System.Int32)">
<summary>
Initializes a new instance of StreamingContext with a existing SparkContext
</summary>
<param name="sparkContext">An existing SparkContext</param>
<param name="durationMs">the time interval at which streaming data will be divided into batches</param>
<param name="durationSeconds">the time interval at which streaming data will be divided into batches</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.GetOrCreate(System.String,System.Func{Microsoft.Spark.CSharp.Streaming.StreamingContext})">
<summary>
@ -6752,14 +6777,14 @@
Stop the execution of the streams.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.Remember(System.Int64)">
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.Remember(System.Int32)">
<summary>
Set each DStreams in this context to remember RDDs it generated in the last given duration.
DStreams remember RDDs only for a limited duration of time and releases them for garbage
collection. This method allows the developer to specify how long to remember the RDDs (
if the developer wishes to query old data outside the DStream computation).
</summary>
<param name="durationMs">Minimum duration that each DStream should remember its RDDs</param>
<param name="durationSeconds">Minimum duration that each DStream should remember its RDDs</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.Checkpoint(System.String)">
<summary>

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -21,7 +21,7 @@ namespace AdapterTest
[Test]
public void TestDStreamMapReduce()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
var ssc = new StreamingContext(new SparkContext("", ""), 1);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
@ -31,7 +31,7 @@ namespace AdapterTest
words.Slice(DateTime.MinValue, DateTime.MaxValue);
words.Cache();
words.Checkpoint(1000);
words.Checkpoint(1);
words.Count().ForeachRDD((time, rdd) =>
{
@ -82,7 +82,7 @@ namespace AdapterTest
[Test]
public void TestDStreamTransform()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
var ssc = new StreamingContext(new SparkContext("", ""), 1);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
@ -138,7 +138,7 @@ namespace AdapterTest
[Test]
public void TestDStreamJoin()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
var ssc = new StreamingContext(new SparkContext("", ""), 1);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
@ -245,7 +245,7 @@ namespace AdapterTest
[Test]
public void TestDStreamUpdateStateByKey()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
var ssc = new StreamingContext(new SparkContext("", ""), 1);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
@ -311,7 +311,7 @@ namespace AdapterTest
SparkCLREnvironment.SparkCLRProxy = sparkClrProxy.Object;
var sparkConf = new SparkConf(false);
var ssc = new StreamingContext(new SparkContext(sparkContextProxy.Object, sparkConf), 10000);
var ssc = new StreamingContext(new SparkContext(sparkContextProxy.Object, sparkConf), 10);
var dstreamProxy = new Mock<IDStreamProxy>();
var pairDStream = new DStream<KeyValuePair<string, int>>(dstreamProxy.Object, ssc);
@ -412,7 +412,7 @@ namespace AdapterTest
{
var sc = new SparkContext("", "");
var rdd = sc.Parallelize(Enumerable.Range(0, 10), 1);
var ssc = new StreamingContext(sc, 1000);
var ssc = new StreamingContext(sc, 1);
// test when rdd is null
Assert.Throws<ArgumentNullException>(() => new ConstantInputDStream<int>(null, ssc));

Просмотреть файл

@ -26,7 +26,7 @@ namespace AdapterTest
.Returns(mockDstreamProxy);
var mockSparkClrProxy = new Mock<ISparkCLRProxy>();
mockSparkClrProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<long>()))
mockSparkClrProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<int>()))
.Returns(streamingContextProxy.Object);
SparkCLREnvironment.SparkCLRProxy = mockSparkClrProxy.Object;

Просмотреть файл

@ -57,7 +57,7 @@ namespace AdapterTest.Mocks
{
}
public void Checkpoint(long intervalMs)
public void Checkpoint(int intervalSeconds)
{
}

Просмотреть файл

@ -58,7 +58,7 @@ namespace AdapterTest.Mocks
return false;
}
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds)
{
streamingContextProxy = new MockStreamingContextProxy();
return streamingContextProxy;

Просмотреть файл

@ -23,7 +23,7 @@ namespace AdapterTest.Mocks
public void Stop()
{}
public void Remember(long durationMs)
public void Remember(int durationSeconds)
{}
public void Checkpoint(string directory)

Просмотреть файл

@ -18,11 +18,11 @@ namespace AdapterTest
[Test]
public void TestStreamingContext()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
var ssc = new StreamingContext(new SparkContext("", ""), 1);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
ssc.Start();
ssc.Remember(1000);
ssc.Remember(1);
ssc.Checkpoint(Path.GetTempPath());
var textFile = ssc.TextFileStream(Path.GetTempPath());

Просмотреть файл

@ -57,7 +57,7 @@ namespace AdapterTest
_mockSparkCLRProxy.Setup(m => m.CreateSparkConf(It.IsAny<bool>())).Returns(new MockSparkConfProxy()); // some of mocks which rarely change can be kept
_mockSparkCLRProxy.Setup(m => m.CreateSparkContext(It.IsAny<ISparkConfProxy>())).Returns(_mockSparkContextProxy.Object);
_mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<long>())).Returns(_mockStreamingContextProxy.Object);
_mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<int>())).Returns(_mockStreamingContextProxy.Object);
_mockRddProxy.Setup(m => m.CollectAndServe()).Returns(() =>
{
TcpListener listener = new TcpListener(IPAddress.Loopback, 0);
@ -123,7 +123,7 @@ namespace AdapterTest
return _mockRddProxy.Object;
});
_streamingContext = new StreamingContext(new SparkContext("", ""), 1000);
_streamingContext = new StreamingContext(new SparkContext("", ""), 1);
}

Просмотреть файл

@ -69,7 +69,7 @@ namespace Microsoft.Spark.CSharp
() =>
{
StreamingContext context = new StreamingContext(sc, 2000);
StreamingContext context = new StreamingContext(sc, 2);
context.Checkpoint(checkpointPath);
var lines = context.TextFileStream(Path.Combine(directory, "test"));
@ -140,7 +140,7 @@ namespace Microsoft.Spark.CSharp
() =>
{
SparkContext sc = SparkCLRSamples.SparkContext;
StreamingContext context = new StreamingContext(sc, 2000);
StreamingContext context = new StreamingContext(sc, 2);
context.Checkpoint(checkpointPath);
var kafkaParams = new Dictionary<string, string> {
@ -188,7 +188,7 @@ namespace Microsoft.Spark.CSharp
internal static void DStreamConstantDStreamSample()
{
var sc = SparkCLRSamples.SparkContext;
var ssc = new StreamingContext(sc, 2000);
var ssc = new StreamingContext(sc, 2);
const int count = 100;
const int partitions = 2;

Просмотреть файл

@ -54,7 +54,7 @@ namespace Microsoft.Spark.CSharp.Samples
() =>
{
SparkContext sc = SparkCLRSamples.SparkContext;
StreamingContext context = new StreamingContext(sc, 10000);
StreamingContext context = new StreamingContext(sc, 10);
context.Checkpoint(checkpointPath);
var lines = context.TextFileStream(Path.Combine(directory, "test1"));