Fix AwaitTermination(int timeout) error, and change to AwaitTerminationOrTimeout(long timeout)

This commit is contained in:
Quanmao LIU 2016-06-11 08:07:45 +08:00 коммит произвёл Kaarthik Sivashanmugam
Родитель 18668c3c58
Коммит a7f1360b7a
5 изменённых файлов: 30 добавлений и 7 удалений

Просмотреть файл

@ -25,7 +25,7 @@ namespace Microsoft.Spark.CSharp.Proxy
IDStreamProxy DirectKafkaStreamWithRepartition(List<string> topics, Dictionary<string, string> kafkaParams, Dictionary<string, long> fromOffsets, int numPartitions);
IDStreamProxy Union(IDStreamProxy firstDStreams, IDStreamProxy[] otherDStreams);
void AwaitTermination();
void AwaitTermination(int timeout);
void AwaitTerminationOrTimeout(long timeout);
IDStreamProxy CreateCSharpDStream(IDStreamProxy jdstream, byte[] func, string serializationMode);
IDStreamProxy CreateCSharpTransformed2DStream(IDStreamProxy jdstream, IDStreamProxy jother, byte[] func, string serializationMode, string serializationModeOther);
IDStreamProxy CreateCSharpReducedWindowedDStream(IDStreamProxy jdstream, byte[] func, byte[] invFunc, int windowSeconds, int slideSeconds, string serializationMode);

Просмотреть файл

@ -263,9 +263,9 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "awaitTermination");
}
public void AwaitTermination(int timeout)
public void AwaitTerminationOrTimeout(long timeout)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "awaitTermination", new object[] { timeout });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "awaitTerminationOrTimeout", new object[] { timeout });
}
private void ProcessCallbackRequest(object socket)

Просмотреть файл

@ -163,10 +163,10 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <summary>
/// Wait for the execution to stop.
/// </summary>
/// <param name="timeout">time to wait in seconds</param>
public void AwaitTerminationOrTimeout(int timeout)
/// <param name="timeout">time to wait in milliseconds</param>
public void AwaitTerminationOrTimeout(long timeout)
{
streamingContextProxy.AwaitTermination(timeout);
streamingContextProxy.AwaitTerminationOrTimeout(timeout);
}
/// <summary>

Просмотреть файл

@ -63,7 +63,7 @@ namespace AdapterTest.Mocks
{
}
public void AwaitTermination(int timeout)
public void AwaitTerminationOrTimeout(long timeout)
{
}

Просмотреть файл

@ -46,5 +46,28 @@ namespace AdapterTest
ssc.AwaitTermination();
ssc.Stop();
}
[Test]
public void TestStreamingAwaitTimeout()
{
var ssc = new StreamingContext(new SparkContext("", ""), 1000);
Assert.IsNotNull((ssc.streamingContextProxy as MockStreamingContextProxy));
ssc.Start();
ssc.Remember(1000);
ssc.Checkpoint(Path.GetTempPath());
var textFile = ssc.TextFileStream(Path.GetTempPath());
Assert.IsNotNull(textFile.DStreamProxy);
var socketStream = ssc.SocketTextStream(IPAddress.Loopback.ToString(), 12345);
Assert.IsNotNull(socketStream.DStreamProxy);
var union = ssc.Union(textFile, socketStream);
Assert.IsNotNull(union.DStreamProxy);
ssc.AwaitTerminationOrTimeout(3000);
ssc.Stop();
}
}
}