From 9422540a5d327c11ccb6664ec090924d850414f4 Mon Sep 17 00:00:00 2001 From: tqin Date: Thu, 7 Jan 2016 12:21:10 +0800 Subject: [PATCH] Move CheckpointExists() back from IStreamingContextProxy to ISparkCLRProxy --- .../Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs | 5 +++++ .../Proxy/IStreamingContextProxy.cs | 1 - .../Proxy/Ipc/SparkCLRIpcProxy.cs | 13 +++++++++++++ .../Proxy/Ipc/StreamingContextIpcProxy.cs | 13 ------------- .../Streaming/StreamingContext.cs | 2 +- csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs | 5 +++++ .../AdapterTest/Mocks/MockStreamingContextProxy.cs | 4 ---- 7 files changed, 24 insertions(+), 19 deletions(-) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs index fe1cf92..454e58c 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs @@ -11,6 +11,11 @@ namespace Microsoft.Spark.CSharp.Proxy IStreamingContextProxy StreamingContextProxy { get; } ISparkConfProxy CreateSparkConf(bool loadDefaults = true); ISparkContextProxy CreateSparkContext(ISparkConfProxy conf); + + // In function StreamingContext.GetOrCreate(), CheckpointExists() is called to see whether we should create a new StreamingContext + // or restore it from checkpoint. Thus this function is called before IStreamingContextProxy is initialized. So CheckpointExists() + // should not be put to IStreamingContextProxy. + bool CheckpointExists(string checkpointPath); IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs); IStreamingContextProxy CreateStreamingContext(string checkpointPath); } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IStreamingContextProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IStreamingContextProxy.cs index 7ab025a..6631aa5 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IStreamingContextProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IStreamingContextProxy.cs @@ -14,7 +14,6 @@ namespace Microsoft.Spark.CSharp.Proxy internal interface IStreamingContextProxy { SparkContext SparkContext { get; } - bool CheckpointExists(string checkpointPath); void Start(); void Stop(); void Remember(long durationMs); diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs index 358558c..c120d9e 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs @@ -67,6 +67,19 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc return sparkContextProxy; } + public bool CheckpointExists(string checkpointPath) + { + if (checkpointPath == null) + return false; + + var path = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.hadoop.fs.Path", checkpointPath); + var conf = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.hadoop.conf.Configuration"); + var fs = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(path, "getFileSystem", conf)); + + return (bool)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "exists", path) && + SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "listStatus", path) != null; + } + public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs) { streamingContextIpcProxy = new StreamingContextIpcProxy(sparkContext, durationMs); diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs index 5dbb9d9..2ee2d8f 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/StreamingContextIpcProxy.cs @@ -93,19 +93,6 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "checkpoint", new object[] { directory }); } - public bool CheckpointExists(string checkpointPath) - { - if (checkpointPath == null) - return false; - - var path = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.hadoop.fs.Path", checkpointPath); - var conf = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.hadoop.conf.Configuration"); - var fs = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(path, "getFileSystem", conf)); - - return (bool)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "exists", path) && - SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "listStatus", path) != null; - } - public IDStreamProxy CreateCSharpDStream(IDStreamProxy jdstream, byte[] func, string serializationMode) { var jvmDStreamReference = diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/StreamingContext.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/StreamingContext.cs index 466d88c..57b9fbc 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/StreamingContext.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Streaming/StreamingContext.cs @@ -69,7 +69,7 @@ namespace Microsoft.Spark.CSharp.Streaming /// public static StreamingContext GetOrCreate(string checkpointPath, Func creatingFunc) { - if (!SparkCLREnvironment.SparkCLRProxy.StreamingContextProxy.CheckpointExists(checkpointPath)) + if (!SparkCLREnvironment.SparkCLRProxy.CheckpointExists(checkpointPath)) { var ssc = creatingFunc(); ssc.Checkpoint(checkpointPath); diff --git a/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs b/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs index 5027f95..911fac3 100644 --- a/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs +++ b/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs @@ -53,6 +53,11 @@ namespace AdapterTest.Mocks get { return streamingContextProxy; } } + public bool CheckpointExists(string checkpointPath) + { + return false; + } + public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs) { streamingContextProxy = new MockStreamingContextProxy(); diff --git a/csharp/AdapterTest/Mocks/MockStreamingContextProxy.cs b/csharp/AdapterTest/Mocks/MockStreamingContextProxy.cs index 0ef9b41..c1afb28 100644 --- a/csharp/AdapterTest/Mocks/MockStreamingContextProxy.cs +++ b/csharp/AdapterTest/Mocks/MockStreamingContextProxy.cs @@ -107,9 +107,5 @@ namespace AdapterTest.Mocks return new MockDStreamProxy(rdd.RddProxy); } - public bool CheckpointExists(string checkpointPath) - { - return false; - } } }