Move CheckpointExists() back from IStreamingContextProxy to ISparkCLRProxy

This commit is contained in:
tqin 2016-01-07 12:21:10 +08:00
Родитель f787c0974d
Коммит 9422540a5d
7 изменённых файлов: 24 добавлений и 19 удалений

Просмотреть файл

@ -11,6 +11,11 @@ namespace Microsoft.Spark.CSharp.Proxy
IStreamingContextProxy StreamingContextProxy { get; }
ISparkConfProxy CreateSparkConf(bool loadDefaults = true);
ISparkContextProxy CreateSparkContext(ISparkConfProxy conf);
// In function StreamingContext.GetOrCreate(), CheckpointExists() is called to see whether we should create a new StreamingContext
// or restore it from checkpoint. Thus this function is called before IStreamingContextProxy is initialized. So CheckpointExists()
// should not be put to IStreamingContextProxy.
bool CheckpointExists(string checkpointPath);
IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs);
IStreamingContextProxy CreateStreamingContext(string checkpointPath);
}

Просмотреть файл

@ -14,7 +14,6 @@ namespace Microsoft.Spark.CSharp.Proxy
internal interface IStreamingContextProxy
{
SparkContext SparkContext { get; }
bool CheckpointExists(string checkpointPath);
void Start();
void Stop();
void Remember(long durationMs);

Просмотреть файл

@ -67,6 +67,19 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
return sparkContextProxy;
}
public bool CheckpointExists(string checkpointPath)
{
if (checkpointPath == null)
return false;
var path = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.hadoop.fs.Path", checkpointPath);
var conf = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.hadoop.conf.Configuration");
var fs = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(path, "getFileSystem", conf));
return (bool)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "exists", path) &&
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "listStatus", path) != null;
}
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
{
streamingContextIpcProxy = new StreamingContextIpcProxy(sparkContext, durationMs);

Просмотреть файл

@ -93,19 +93,6 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "checkpoint", new object[] { directory });
}
public bool CheckpointExists(string checkpointPath)
{
if (checkpointPath == null)
return false;
var path = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.hadoop.fs.Path", checkpointPath);
var conf = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.hadoop.conf.Configuration");
var fs = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(path, "getFileSystem", conf));
return (bool)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "exists", path) &&
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "listStatus", path) != null;
}
public IDStreamProxy CreateCSharpDStream(IDStreamProxy jdstream, byte[] func, string serializationMode)
{
var jvmDStreamReference =

Просмотреть файл

@ -69,7 +69,7 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <returns></returns>
public static StreamingContext GetOrCreate(string checkpointPath, Func<StreamingContext> creatingFunc)
{
if (!SparkCLREnvironment.SparkCLRProxy.StreamingContextProxy.CheckpointExists(checkpointPath))
if (!SparkCLREnvironment.SparkCLRProxy.CheckpointExists(checkpointPath))
{
var ssc = creatingFunc();
ssc.Checkpoint(checkpointPath);

Просмотреть файл

@ -53,6 +53,11 @@ namespace AdapterTest.Mocks
get { return streamingContextProxy; }
}
public bool CheckpointExists(string checkpointPath)
{
return false;
}
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
{
streamingContextProxy = new MockStreamingContextProxy();

Просмотреть файл

@ -107,9 +107,5 @@ namespace AdapterTest.Mocks
return new MockDStreamProxy(rdd.RddProxy);
}
public bool CheckpointExists(string checkpointPath)
{
return false;
}
}
}