enable streaming driver checkpoint

This commit is contained in:
renyi 2015-12-09 16:21:46 -08:00
Родитель 5fd923f369
Коммит 71b904b0ea
17 изменённых файлов: 175 добавлений и 62 удалений

Просмотреть файл

@ -23,6 +23,15 @@ namespace Microsoft.Spark.CSharp.Core
private readonly ISparkConfProxy sparkConfProxy;
internal ISparkConfProxy SparkConfProxy { get { return sparkConfProxy; } }
/// <summary>
/// when created from checkpoint
/// </summary>
/// <param name="sparkConfProxy"></param>
internal SparkConf(ISparkConfProxy sparkConfProxy)
{
this.sparkConfProxy = sparkConfProxy;
}
/// <summary>
/// Create SparkConf
/// </summary>

Просмотреть файл

@ -82,6 +82,17 @@ namespace Microsoft.Spark.CSharp.Core
{
}
/// <summary>
/// when created from checkpoint
/// </summary>
/// <param name="sparkContextProxy"></param>
/// <param name="conf"></param>
internal SparkContext(ISparkContextProxy sparkContextProxy, SparkConf conf)
{
SparkContextProxy = sparkContextProxy;
SparkConf = conf;
}
private SparkContext(string master, string appName, string sparkHome, SparkConf conf)
{
SparkConf = conf ?? new SparkConf();

Просмотреть файл

@ -7,6 +7,7 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Sql;
namespace Microsoft.Spark.CSharp.Proxy
@ -22,5 +23,8 @@ namespace Microsoft.Spark.CSharp.Proxy
IDStreamProxy CreateCSharpTransformed2DStream(IDStreamProxy jdstream, IDStreamProxy jother, byte[] func, string deserializer, string deserializerOther);
IDStreamProxy CreateCSharpReducedWindowedDStream(IDStreamProxy jdstream, byte[] func, byte[] invFunc, int windowSeconds, int slideSeconds, string deserializer);
IDStreamProxy CreateCSharpStateDStream(IDStreamProxy jdstream, byte[] func, string deserializer);
bool CheckpointExists(string checkpointPath);
IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs);
IStreamingContextProxy CreateStreamingContext(string checkpointPath);
}
}

Просмотреть файл

@ -14,7 +14,6 @@ namespace Microsoft.Spark.CSharp.Proxy
{
internal interface ISparkContextProxy
{
IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs);
ISqlContextProxy CreateSqlContext();
IColumnProxy CreateColumnFromName(string name);
IColumnProxy CreateFunction(string name, object self);

Просмотреть файл

@ -13,6 +13,7 @@ namespace Microsoft.Spark.CSharp.Proxy
{
internal interface IStreamingContextProxy
{
SparkContext SparkContext { get; }
void Start();
void Stop();
void Remember(long durationMs);

Просмотреть файл

@ -6,6 +6,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Services;
using Microsoft.Spark.CSharp.Sql;
@ -126,5 +127,28 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
var javaDStreamReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDStreamReference, "asJavaDStream"));
return new DStreamIpcProxy(javaDStreamReference, jvmDStreamReference);
}
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
{
return new StreamingContextIpcProxy(sparkContext, durationMs);
}
public IStreamingContextProxy CreateStreamingContext(string checkpointPath)
{
return new StreamingContextIpcProxy(checkpointPath);
}
public bool CheckpointExists(string checkpointPath)
{
if (checkpointPath == null)
return false;
var path = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.hadoop.fs.Path", checkpointPath);
var conf = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.hadoop.conf.Configuration");
var fs = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(path, "getFileSystem", conf));
return (bool)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "exists", path) &&
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(fs, "listStatus", path) != null;
}
}
}

Просмотреть файл

@ -30,11 +30,6 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
this.jvmSparkContextReference = jvmSparkContextReference;
this.jvmJavaContextReference = jvmJavaContextReference;
}
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
{
return new StreamingContextIpcProxy(sparkContext, durationMs);
}
public ISqlContextProxy CreateSqlContext()
{

Просмотреть файл

@ -33,6 +33,14 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
// flag to denote whether the callback socket is shutdown explicitly
private volatile bool callbackSocketShutdown = false;
public SparkContext SparkContext
{
get
{
return sparkContext;
}
}
public StreamingContextIpcProxy(SparkContext sparkContext, long durationMs)
{
this.sparkContext = sparkContext;
@ -42,13 +50,24 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
JvmObjectReference jvmSparkContextReference = (sparkContextProxy as SparkContextIpcProxy).JvmSparkContextReference;
jvmStreamingContextReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.StreamingContext", new object[] { jvmSparkContextReference, jduration });
jvmJavaStreamingReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.api.java.JavaStreamingContext", new object[] { jvmStreamingContextReference });
int port = StartCallback();
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("SparkCLRHandler", "connectCallback", port); //className and methodName hardcoded in CSharpBackendHandler
}
public StreamingContextIpcProxy(string checkpointPath)
{
jvmJavaStreamingReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.api.java.JavaStreamingContext", new object[] { checkpointPath });
jvmStreamingContextReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaStreamingReference, "ssc"));
JvmObjectReference jvmSparkContextReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "sc"));
JvmObjectReference jvmSparkConfReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "conf"));
JvmObjectReference jvmJavaContextReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaStreamingReference, "sparkContext"));
sparkContextProxy = new SparkContextIpcProxy(jvmSparkContextReference, jvmJavaContextReference);
var sparkConfProxy = new SparkConfIpcProxy(jvmSparkConfReference);
sparkContext = new SparkContext(sparkContextProxy, new SparkConf(sparkConfProxy));
}
public void Start()
{
int port = StartCallback();
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("SparkCLRHandler", "connectCallback", port); //className and methodName hardcoded in CSharpBackendHandler
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStreamingContextReference, "start");
}

Просмотреть файл

@ -365,7 +365,7 @@ namespace Microsoft.Spark.CSharp.Streaming
long fromUnixTime = (long)(fromTimeUtc - startUtc).TotalMilliseconds;
long toUnixTime = (long)(toTimeUtc - startUtc).TotalMilliseconds;
return DStreamProxy.Slice(fromUnixTime, toUnixTime).Select(r => new RDD<T>(r, streamingContext.sparkContext, serializedMode)).ToArray();
return DStreamProxy.Slice(fromUnixTime, toUnixTime).Select(r => new RDD<T>(r, streamingContext.SparkContext, serializedMode)).ToArray();
}
internal void ValidatWindowParam(int windowSeconds, int slideSeconds)

Просмотреть файл

@ -53,7 +53,7 @@ namespace Microsoft.Spark.CSharp.Streaming
int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.sparkContext.DefaultParallelism;
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
return self.Transform<KeyValuePair<K, C>>(new CombineByKeyHelper<K, V, C>(createCombiner, mergeValue, mergeCombiners, numPartitions).Execute);
}
@ -69,7 +69,7 @@ namespace Microsoft.Spark.CSharp.Streaming
public static DStream<KeyValuePair<K, V>> PartitionBy<K, V>(this DStream<KeyValuePair<K, V>> self, int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.sparkContext.DefaultParallelism;
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
return self.Transform<KeyValuePair<K, V>>(new PartitionByHelper<K, V>(numPartitions).Execute);
}
@ -131,7 +131,7 @@ namespace Microsoft.Spark.CSharp.Streaming
public static DStream<KeyValuePair<K, Tuple<List<V>, List<W>>>> GroupWith<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.sparkContext.DefaultParallelism;
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<List<V>, List<W>>>>(new GroupWithHelper<K, V, W>(numPartitions).Execute, other);
}
@ -150,7 +150,7 @@ namespace Microsoft.Spark.CSharp.Streaming
public static DStream<KeyValuePair<K, Tuple<V, W>>> Join<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.sparkContext.DefaultParallelism;
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<V, W>>>(new JoinHelper<K, V, W>(numPartitions).Execute, other);
}
@ -169,7 +169,7 @@ namespace Microsoft.Spark.CSharp.Streaming
public static DStream<KeyValuePair<K, Tuple<V, W>>> LeftOuterJoin<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.sparkContext.DefaultParallelism;
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<V, W>>>(new LeftOuterJoinHelper<K, V, W>(numPartitions).Execute, other);
}
@ -188,7 +188,7 @@ namespace Microsoft.Spark.CSharp.Streaming
public static DStream<KeyValuePair<K, Tuple<V, W>>> RightOuterJoin<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.sparkContext.DefaultParallelism;
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<V, W>>>(new RightOuterJoinHelper<K, V, W>(numPartitions).Execute, other);
}
@ -207,7 +207,7 @@ namespace Microsoft.Spark.CSharp.Streaming
public static DStream<KeyValuePair<K, Tuple<V, W>>> FullOuterJoin<K, V, W>(this DStream<KeyValuePair<K, V>> self, DStream<KeyValuePair<K, W>> other, int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.sparkContext.DefaultParallelism;
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
return self.TransformWith<KeyValuePair<K, W>, KeyValuePair<K, Tuple<V, W>>>(new FullOuterJoinHelper<K, V, W>(numPartitions).Execute, other);
}
@ -320,7 +320,7 @@ namespace Microsoft.Spark.CSharp.Streaming
int numPartitions = 0)
{
if (numPartitions <= 0)
numPartitions = self.streamingContext.sparkContext.DefaultParallelism;
numPartitions = self.streamingContext.SparkContext.DefaultParallelism;
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> func = new UpdateStateByKeyHelper<K, V, S>(updateFunc, numPartitions).Execute;

Просмотреть файл

@ -30,12 +30,32 @@ namespace Microsoft.Spark.CSharp.Streaming
public class StreamingContext
{
internal readonly IStreamingContextProxy streamingContextProxy;
internal readonly SparkContext sparkContext;
private SparkContext sparkContext;
internal SparkContext SparkContext
{
get
{
if (sparkContext == null)
{
sparkContext = streamingContextProxy.SparkContext;
}
return sparkContext;
}
}
/// <summary>
/// when created from checkpoint
/// </summary>
/// <param name="streamingContextProxy"></param>
private StreamingContext(IStreamingContextProxy streamingContextProxy)
{
this.streamingContextProxy = streamingContextProxy;
}
public StreamingContext(SparkContext sparkContext, long durationMs)
{
this.sparkContext = sparkContext;
this.streamingContextProxy = sparkContext.SparkContextProxy.CreateStreamingContext(sparkContext, durationMs);
this.streamingContextProxy = SparkCLREnvironment.SparkCLRProxy.CreateStreamingContext(sparkContext, durationMs);
}
/// <summary>
@ -47,9 +67,16 @@ namespace Microsoft.Spark.CSharp.Streaming
/// <param name="checkpointPath">Checkpoint directory used in an earlier JavaStreamingContext program</param>
/// <param name="creatingFunc">Function to create a new JavaStreamingContext and setup DStreams</param>
/// <returns></returns>
public StreamingContext GetOrCreate(string checkpointPath, Func<StreamingContext> creatingFunc)
public static StreamingContext GetOrCreate(string checkpointPath, Func<StreamingContext> creatingFunc)
{
throw new NotImplementedException();
if (!SparkCLREnvironment.SparkCLRProxy.CheckpointExists(checkpointPath))
{
var ssc = creatingFunc();
ssc.Checkpoint(checkpointPath);
return ssc;
}
return new StreamingContext(SparkCLREnvironment.SparkCLRProxy.CreateStreamingContext(checkpointPath));
}
public void Start()

Просмотреть файл

@ -95,5 +95,20 @@ namespace AdapterTest.Mocks
new RDD<dynamic>((jdstream as MockDStreamProxy).rddProxy ?? new MockRddProxy(null), new SparkContext("", "")));
return new MockDStreamProxy(rdd.RddProxy);
}
public bool CheckpointExists(string checkpointPath)
{
return false;
}
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
{
return new MockStreamingContextProxy();
}
public IStreamingContextProxy CreateStreamingContext(string checkpointPath)
{
return new MockStreamingContextProxy();
}
}
}

Просмотреть файл

@ -272,10 +272,5 @@ namespace AdapterTest.Mocks
{
return new MockRddProxy(null);
}
public IStreamingContextProxy CreateStreamingContext(SparkContext sparkContext, long durationMs)
{
return new MockStreamingContextProxy();
}
}
}

Просмотреть файл

@ -6,6 +6,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Proxy;
namespace AdapterTest.Mocks
@ -60,5 +61,10 @@ namespace AdapterTest.Mocks
public void AwaitTermination(int timeout)
{
}
public SparkContext SparkContext
{
get { throw new NotImplementedException(); }
}
}
}

Просмотреть файл

@ -57,7 +57,7 @@ namespace AdapterTest
_mockSparkCLRProxy.Setup(m => m.CreateSparkConf(It.IsAny<bool>())).Returns(new MockSparkConfProxy()); // some of mocks which rarely change can be kept
_mockSparkCLRProxy.Setup(m => m.CreateSparkContext(It.IsAny<ISparkConfProxy>())).Returns(_mockSparkContextProxy.Object);
_mockSparkContextProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<long>())).Returns(_mockStreamingContextProxy.Object);
_mockSparkCLRProxy.Setup(m => m.CreateStreamingContext(It.IsAny<SparkContext>(), It.IsAny<long>())).Returns(_mockStreamingContextProxy.Object);
_mockRddProxy.Setup(m => m.CollectAndServe()).Returns(() =>
{
TcpListener listener = new TcpListener(IPAddress.Parse("127.0.0.1"), 0);

Просмотреть файл

@ -51,40 +51,48 @@ namespace Microsoft.Spark.CSharp
[Sample("experimental")]
internal static void DStreamTextFileSamples()
{
SparkContext sc = SparkCLRSamples.SparkContext;
string directory = SparkCLRSamples.Configuration.SampleDataLocation;
sc.SetCheckpointDir(directory);
StreamingContext ssc = new StreamingContext(sc, 2000);
string checkpointPath = Path.Combine(directory, "checkpoint");
var lines = ssc.TextFileStream(Path.Combine(directory, "test"));
var words = lines.FlatMap(l => l.Split(' '));
var pairs = words.Map(w => new KeyValuePair<string, int>(w, 1));
// since operations like ReduceByKey, Join and UpdateStateByKey are
// separate dstream transformations defined in CSharpDStream.scala
// an extra CSharpRDD is introduced in between these operations
var wordCounts = pairs.ReduceByKey((x, y) => x + y);
var join = wordCounts.Join(wordCounts, 2);
var state = join.UpdateStateByKey<string, Tuple<int, int>, int>((vs, s) => vs.Sum(x => x.Item1 + x.Item2) + s);
state.ForeachRDD((time, rdd) =>
{
// there's chance rdd.Take conflicts with ssc.Stop
if (stopFileServer)
return;
object[] taken = rdd.Take(10);
Console.WriteLine("-------------------------------------------");
Console.WriteLine("Time: {0}", time);
Console.WriteLine("-------------------------------------------");
foreach (object record in taken)
StreamingContext ssc = StreamingContext.GetOrCreate(checkpointPath,
() =>
{
Console.WriteLine(record);
}
Console.WriteLine();
SparkContext sc = SparkCLRSamples.SparkContext;
StreamingContext context = new StreamingContext(sc, 2000);
context.Checkpoint(checkpointPath);
stopFileServer = count++ > 100;
});
var lines = context.TextFileStream(Path.Combine(directory, "test"));
var words = lines.FlatMap(l => l.Split(' '));
var pairs = words.Map(w => new KeyValuePair<string, int>(w, 1));
// since operations like ReduceByKey, Join and UpdateStateByKey are
// separate dstream transformations defined in CSharpDStream.scala
// an extra CSharpRDD is introduced in between these operations
var wordCounts = pairs.ReduceByKey((x, y) => x + y);
var join = wordCounts.Join(wordCounts, 2);
var state = join.UpdateStateByKey<string, Tuple<int, int>, int>((vs, s) => vs.Sum(x => x.Item1 + x.Item2) + s);
state.ForeachRDD((time, rdd) =>
{
// there's chance rdd.Take conflicts with ssc.Stop
if (stopFileServer)
return;
object[] taken = rdd.Take(10);
Console.WriteLine("-------------------------------------------");
Console.WriteLine("Time: {0}", time);
Console.WriteLine("-------------------------------------------");
foreach (object record in taken)
{
Console.WriteLine(record);
}
Console.WriteLine();
stopFileServer = count++ > 100;
});
return context;
});
ssc.Start();

Просмотреть файл

@ -131,7 +131,7 @@ class CSharpDStream(
class CSharpTransformed2DStream(
parent: DStream[_],
parent2: DStream[_],
@transient rfunc: Array[Byte],
rfunc: Array[Byte],
deserializer: String,
deserializer2: String)
extends DStream[Array[Byte]] (parent.ssc) {
@ -156,8 +156,8 @@ class CSharpTransformed2DStream(
*/
class CSharpReducedWindowedDStream(
parent: DStream[Array[Byte]],
@transient rreduceFunc: Array[Byte],
@transient rinvReduceFunc: Array[Byte],
rreduceFunc: Array[Byte],
rinvReduceFunc: Array[Byte],
_windowDuration: Duration,
_slideDuration: Duration,
deserializer: String)
@ -233,7 +233,7 @@ class CSharpReducedWindowedDStream(
*/
class CSharpStateDStream(
parent: DStream[Array[Byte]],
@transient reduceFunc: Array[Byte],
reduceFunc: Array[Byte],
deserializer: String)
extends DStream[Array[Byte]](parent.ssc) {