Change time unit of streaming back to millisecond (#504)

* Change time unit of streaming back to millisecond

* Update Microsoft.Spark.CSharp.Adapter.Doc.XML and Mobius_API_Documentation.md

* Change according to review comments
This commit is contained in:
Tao Qin 2016-07-19 19:16:40 -07:00 коммит произвёл GitHub
Родитель ae1494e547
Коммит 3d541e942e
20 изменённых файлов: 1057 добавлений и 985 удалений

Просмотреть файл

@ -71,10 +71,10 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc
return new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "toSeq", GetJavaList<T>(enumerable)));
}
public static JvmObjectReference GetJavaDuration(int durationSeconds)
public static JvmObjectReference GetJavaDuration(long durationMs)
{
// java expects Duration in mini seconds and must be of long type
return SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { (long)durationSeconds * 1000 });
return SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.Duration", new object[] { durationMs});
}
}
}

Просмотреть файл

@ -19,7 +19,7 @@ namespace Microsoft.Spark.CSharp.Proxy
void CallForeachRDD(byte[] func, string serializedMode);
void Print(int num = 10);
void Persist(StorageLevelType storageLevelType);
void Checkpoint(int intervalSeconds);
void Checkpoint(long intervalMs);
IRDDProxy[] Slice(long fromUnixTime, long toUnixTime);
}
}

Просмотреть файл

@ -16,7 +16,7 @@ namespace Microsoft.Spark.CSharp.Proxy
// or restore it from checkpoint. Thus this function is called before IStreamingContextProxy is initialized. So CheckpointExists()
// should not be put to IStreamingContextProxy.
bool CheckpointExists(string checkpointPath);
IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds);
IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs);
IStreamingContextProxy CreateStreamingContext(string checkpointPath);
}
}

Просмотреть файл

@ -16,7 +16,7 @@ namespace Microsoft.Spark.CSharp.Proxy
SparkContext SparkContext { get; }
void Start();
void Stop();
void Remember(int durationSeconds);
void Remember(long durationMs);
void Checkpoint(string directory);
IDStreamProxy TextFileStream(string directory);
IDStreamProxy SocketTextStream(string hostname, int port, StorageLevelType storageLevelType);

Просмотреть файл

@ -40,7 +40,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
public IDStreamProxy Window(int windowSeconds, int slideSeconds = 0)
{
string windowId = null;
var windowDurationReference = JvmBridgeUtils.GetJavaDuration(windowSeconds);
var windowDurationReference = JvmBridgeUtils.GetJavaDuration((long)windowSeconds * 1000);
if (slideSeconds <= 0)
{
@ -48,7 +48,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
return new DStreamIpcProxy(new JvmObjectReference(windowId));
}
var slideDurationReference = JvmBridgeUtils.GetJavaDuration(slideSeconds);
var slideDurationReference = JvmBridgeUtils.GetJavaDuration((long)slideSeconds * 1000);
windowId = (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(javaDStreamReference, "window", new object[] { windowDurationReference, slideDurationReference });
return new DStreamIpcProxy(new JvmObjectReference(windowId));
@ -77,9 +77,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "persist", new object[] { jstorageLevel });
}
public void Checkpoint(int intervalSeconds)
public void Checkpoint(long intervalMs)
{
var jinterval = JvmBridgeUtils.GetJavaDuration(intervalSeconds);
var jinterval = JvmBridgeUtils.GetJavaDuration(intervalMs);
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "checkpoint", new object[] { jinterval });
}

Просмотреть файл

@ -80,9 +80,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "listStatus", path) != null;
}
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds)
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
{
streamingContextIpcProxy = new StreamingContextIpcProxy(sparkContext, durationSeconds);
streamingContextIpcProxy = new StreamingContextIpcProxy(sparkContext, durationMs);
return streamingContextIpcProxy;
}

Просмотреть файл

@ -43,11 +43,11 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
}
}
public StreamingContextIpcProxy(SparkContext sparkContext, int durationSeconds)
public StreamingContextIpcProxy(SparkContext sparkContext, long durationMs)
{
this.sparkContext = sparkContext;
sparkContextProxy = sparkContext.SparkContextProxy;
var jduration = JvmBridgeUtils.GetJavaDuration(durationSeconds);
var jduration = JvmBridgeUtils.GetJavaDuration(durationMs);
JvmObjectReference jvmSparkContextReference = (sparkContextProxy as SparkContextIpcProxy).JvmSparkContextReference;
jvmStreamingContextReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.StreamingContext", new object[] { jvmSparkContextReference, jduration });
@ -99,9 +99,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("SparkCLRHandler", "closeCallback");
}
public void Remember(int durationSeconds)
public void Remember(long durationMs)
{
var jduration = JvmBridgeUtils.GetJavaDuration(durationSeconds);
var jduration = JvmBridgeUtils.GetJavaDuration(durationMs);
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "remember", new object[] { jduration });
}
@ -134,8 +134,8 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
public IDStreamProxy CreateCSharpReducedWindowedDStream(IDStreamProxy jdstream, byte[] func, byte[] invFunc, int windowSeconds, int slideSeconds, string serializationMode)
{
var windowDurationReference = JvmBridgeUtils.GetJavaDuration(windowSeconds);
var slideDurationReference = JvmBridgeUtils.GetJavaDuration(slideSeconds);
var windowDurationReference = JvmBridgeUtils.GetJavaDuration((long)windowSeconds * 1000);
var slideDurationReference = JvmBridgeUtils.GetJavaDuration((long)slideSeconds * 1000);
var jvmDStreamReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.api.csharp.CSharpReducedWindowedDStream",
new object[] { (jdstream as DStreamIpcProxy).jvmDStreamReference, func, invFunc, windowDurationReference, slideDurationReference, serializationMode });

Просмотреть файл

@ -221,12 +221,12 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <summary>
/// Enable periodic checkpointing of RDDs of this DStream
/// </summary>
/// <param name="intervalSeconds">time in seconds, after each period of that, generated RDD will be checkpointed</param>
/// <param name="intervalMs">time in milliseconds, after each period of that, generated RDD will be checkpointed</param>
/// <returns></returns>
public DStream<T> Checkpoint(int intervalSeconds)
public DStream<T> Checkpoint(long intervalMs)
{
isCheckpointed = true;
DStreamProxy.Checkpoint(intervalSeconds);
DStreamProxy.Checkpoint(intervalMs);
return this;
}

Просмотреть файл

@ -56,11 +56,11 @@ namespace Microsoft.Spark.CSharp.Streaming
/// Initializes a new instance of StreamingContext with a existing SparkContext
/// </summary>
/// <param name="sparkContext">An existing SparkContext</param>
/// <param name="durationSeconds">the time interval at which streaming data will be divided into batches</param>
public StreamingContext(SparkContext sparkContext, int durationSeconds)
/// <param name="durationMs">the time interval in milliseconds at which streaming data will be divided into batches</param>
public StreamingContext(SparkContext sparkContext, long durationMs)
{
this.sparkContext = sparkContext;
streamingContextProxy = SparkCLREnvironment.SparkCLRProxy.CreateStreamingContext(sparkContext, durationSeconds);
streamingContextProxy = SparkCLREnvironment.SparkCLRProxy.CreateStreamingContext(sparkContext, durationMs);
}
/// <summary>
@ -106,10 +106,10 @@ namespace Microsoft.Spark.CSharp.Streaming
/// collection. This method allows the developer to specify how long to remember the RDDs (
/// if the developer wishes to query old data outside the DStream computation).
/// </summary>
/// <param name="durationSeconds">Minimum duration that each DStream should remember its RDDs</param>
public void Remember(int durationSeconds)
/// <param name="durationMs">Minimum duration in milliseconds that each DStream should remember its RDDs</param>
public void Remember(long durationMs)
{
streamingContextProxy.Remember(durationSeconds);
streamingContextProxy.Remember(durationMs);
}
/// <summary>

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -21,7 +21,7 @@ namespace AdapterTest
[Test]
public void TestDStreamMapReduce()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1);
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
@ -31,7 +31,7 @@ namespace AdapterTest
words.Slice(DateTime.MinValue, DateTime.MaxValue);
words.Cache();
words.Checkpoint(1);
words.Checkpoint(1000);
words.Window(1, 1);
words.Count().ForeachRDD((time, rdd) =>
@ -83,7 +83,7 @@ namespace AdapterTest
[Test]
public void TestDStreamTransform()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1);
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
@ -139,7 +139,7 @@ namespace AdapterTest
[Test]
public void TestDStreamJoin()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1);
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
@ -246,7 +246,7 @@ namespace AdapterTest
[Test]
public void TestDStreamUpdateStateByKey()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1);
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
var lines = ssc.TextFileStream(Path.GetTempPath());
@ -327,7 +327,7 @@ namespace AdapterTest
SparkCLREnvironment.SparkCLRProxy = sparkClrProxy.Object;
var sparkConf = new SparkConf(false);
var ssc = new StreamingContext(new SparkContext(sparkContextProxy.Object, sparkConf), 10);
var ssc = new StreamingContext(new SparkContext(sparkContextProxy.Object, sparkConf), 10000L);
var dstreamProxy = new Mock<IDStreamProxy>();
var pairDStream = new DStream<KeyValuePair<string, int>>(dstreamProxy.Object, ssc);
@ -428,7 +428,7 @@ namespace AdapterTest
{
var sc = new SparkContext("", "");
var rdd = sc.Parallelize(Enumerable.Range(0, 10), 1);
var ssc = new StreamingContext(sc, 1);
var ssc = new StreamingContext(sc, 1000L);
// test when rdd is null
Assert.Throws<ArgumentNullException>(() => new ConstantInputDStream<int>(null, ssc));
@ -443,7 +443,7 @@ namespace AdapterTest
{
// test create CSharpInputDStream
var sc = new SparkContext("", "");
var ssc = new StreamingContext(sc, 1);
var ssc = new StreamingContext(sc, 1000L);
Func<double, int, IEnumerable<string>> func =
(double time, int pid) =>
{

Просмотреть файл

@ -26,12 +26,12 @@ namespace AdapterTest
.Returns(mockDstreamProxy);
var mockSparkClrProxy = new Mock<ISparkCLRProxy>();
mockSparkClrProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<int>()))
mockSparkClrProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<long>()))
.Returns(streamingContextProxy.Object);
SparkCLREnvironment.SparkCLRProxy = mockSparkClrProxy.Object;
var sparkContext = new SparkContext(SparkCLREnvironment.SparkCLRProxy.SparkContextProxy, new SparkConf(new Mock<ISparkConfProxy>().Object));
var streamingContext = new StreamingContext(sparkContext, 123);
var streamingContext = new StreamingContext(sparkContext, 123L);
var dstream = EventHubsUtils.CreateUnionStream(streamingContext, new Dictionary<string, string>());
Assert.AreEqual(mockDstreamProxy, dstream.DStreamProxy);
}

Просмотреть файл

@ -57,7 +57,7 @@ namespace AdapterTest.Mocks
{
}
public void Checkpoint(int intervalSeconds)
public void Checkpoint(long intervalMs)
{
}

Просмотреть файл

@ -58,7 +58,7 @@ namespace AdapterTest.Mocks
return false;
}
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, int durationSeconds)
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
{
streamingContextProxy = new MockStreamingContextProxy();
return streamingContextProxy;

Просмотреть файл

@ -23,7 +23,7 @@ namespace AdapterTest.Mocks
public void Stop()
{}
public void Remember(int durationSeconds)
public void Remember(long durationMs)
{}
public void Checkpoint(string directory)

Просмотреть файл

@ -18,11 +18,11 @@ namespace AdapterTest
[Test]
public void TestStreamingContext()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1);
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
ssc.Start();
ssc.Remember(1);
ssc.Remember(1000L);
ssc.Checkpoint(Path.GetTempPath());
var textFile = ssc.TextFileStream(Path.GetTempPath());
@ -58,11 +58,11 @@ namespace AdapterTest
[Test]
public void TestStreamingAwaitTimeout()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
var ssc = new StreamingContext(new SparkContext("", ""), 1000L);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
ssc.Start();
ssc.Remember(1000);
ssc.Remember(1000L);
ssc.Checkpoint(Path.GetTempPath());
var textFile = ssc.TextFileStream(Path.GetTempPath());

Просмотреть файл

@ -58,7 +58,7 @@ namespace AdapterTest
_mockSparkCLRProxy.Setup(m => m.CreateSparkConf(It.IsAny<bool>())).Returns(new MockSparkConfProxy()); // some of mocks which rarely change can be kept
_mockSparkCLRProxy.Setup(m => m.CreateSparkContext(It.IsAny<ISparkConfProxy>())).Returns(_mockSparkContextProxy.Object);
_mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<int>())).Returns(_mockStreamingContextProxy.Object);
_mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<long>())).Returns(_mockStreamingContextProxy.Object);
_mockRddProxy.Setup(m => m.CollectAndServe()).Returns(() =>
{
var listener = SocketFactory.CreateSocket();
@ -124,7 +124,7 @@ namespace AdapterTest
return _mockRddProxy.Object;
});
_streamingContext = new StreamingContext(new SparkContext("", ""), 1);
_streamingContext = new StreamingContext(new SparkContext("", ""), 1000L);
}

Просмотреть файл

@ -69,7 +69,7 @@ namespace Microsoft.Spark.CSharp
() =>
{
StreamingContext context = new StreamingContext(sc, 2);
StreamingContext context = new StreamingContext(sc, 2000L); // batch interval is in milliseconds
context.Checkpoint(checkpointPath);
var lines = context.TextFileStream(Path.Combine(directory, "test"));
@ -141,7 +141,7 @@ namespace Microsoft.Spark.CSharp
() =>
{
SparkContext sc = SparkCLRSamples.SparkContext;
StreamingContext context = new StreamingContext(sc, 2);
StreamingContext context = new StreamingContext(sc, 2000L);
context.Checkpoint(checkpointPath);
var kafkaParams = new Dictionary<string, string> {
@ -189,7 +189,7 @@ namespace Microsoft.Spark.CSharp
internal static void DStreamConstantDStreamSample()
{
var sc = SparkCLRSamples.SparkContext;
var ssc = new StreamingContext(sc, 2);
var ssc = new StreamingContext(sc, 2000L);
const int count = 100;
const int partitions = 2;
@ -244,12 +244,12 @@ namespace Microsoft.Spark.CSharp
{
count = 0;
const int bacthInterval = 2;
const int windowDuration = 26;
const long bacthIntervalMs = 2000; // batch interval is in milliseconds
const int windowDuration = 26; // window duration in seconds
const int numPartitions = 2;
var sc = SparkCLRSamples.SparkContext;
var ssc = new StreamingContext(sc, bacthInterval);
var ssc = new StreamingContext(sc, bacthIntervalMs);
// create the RDD
var seedRDD = sc.Parallelize(Enumerable.Range(0, 100), numPartitions);
@ -284,7 +284,7 @@ namespace Microsoft.Spark.CSharp
KeyValuePair<int, int> sum = (KeyValuePair<int, int>)record;
Console.WriteLine("Key: {0}, Value: {1}", sum.Key, sum.Value);
// when batch count reaches window size, sum of even/odd number stay at windowDuration / slideDuration * (2450, 2500) respectively
Assert.AreEqual(sum.Value, (count > windowDuration / slideDuration ? windowDuration : count * slideDuration) / bacthInterval * (sum.Key == 0 ? 2450 : 2500));
Assert.AreEqual(sum.Value, (count > windowDuration / slideDuration ? windowDuration : count * slideDuration) / (bacthIntervalMs / 1000) * (sum.Key == 0 ? 2450 : 2500));
}
});
@ -295,11 +295,10 @@ namespace Microsoft.Spark.CSharp
[Sample("experimental")]
internal static void DStreamCSharpInputSample()
{
const int bacthInterval = 2;
const int numPartitions = 5;
var sc = SparkCLRSamples.SparkContext;
var ssc = new StreamingContext(sc, bacthInterval);
var ssc = new StreamingContext(sc, 2000L); // batch interval is in milliseconds
var inputDStream = CSharpInputDStreamUtils.CreateStream<string>(
ssc,

Просмотреть файл

@ -54,7 +54,7 @@ namespace Microsoft.Spark.CSharp.Samples
() =>
{
SparkContext sc = SparkCLRSamples.SparkContext;
StreamingContext context = new StreamingContext(sc, 10);
StreamingContext context = new StreamingContext(sc, 10000L); // batch interval is in milliseconds
context.Checkpoint(checkpointPath);
var lines = context.TextFileStream(Path.Combine(directory, "test1"));