add back test project and top level proxy

This commit is contained in:
renyi 2015-10-30 12:46:17 -07:00
Родитель ec117b1cb9
Коммит 7db271cba7
45 изменённых файлов: 1931 добавлений и 487 удалений

Просмотреть файл

@ -73,10 +73,12 @@
<Compile Include="Proxy\IDataFrameProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameIpcProxy.cs" />
<Compile Include="Proxy\Ipc\RDDIpcProxy.cs" />
<Compile Include="Proxy\Ipc\SparkCLRIpcProxy.cs" />
<Compile Include="Proxy\Ipc\SqlContextIpcProxy.cs" />
<Compile Include="Proxy\Ipc\StatusTrackerIpcProxy.cs" />
<Compile Include="Proxy\Ipc\StructIpcProxy.cs" />
<Compile Include="Proxy\IRDDProxy.cs" />
<Compile Include="Proxy\ISparkCLRProxy.cs" />
<Compile Include="Proxy\ISparkConfProxy.cs" />
<Compile Include="Proxy\ISparkContextProxy.cs" />
<Compile Include="Proxy\Ipc\SparkConfIpcProxy.cs" />

Просмотреть файл

@ -28,6 +28,9 @@ namespace Microsoft.Spark.CSharp.Core
/// While C{SparkContext} supports accumulators for primitive data types like C{int} and
/// C{float}, users can also define accumulators for custom types by providing a custom
/// L{AccumulatorParam} object. Refer to the doctest of this module for an example.
///
/// See python implementation in accumulators.py, worker.py, PythonRDD.scala
///
/// </summary>
[Serializable]
public class Accumulator
@ -70,7 +73,6 @@ namespace Microsoft.Spark.CSharp.Core
{
throw new ArgumentException("Accumulator.value cannot be accessed inside tasks");
}
this.value = value;
}
}
@ -169,25 +171,28 @@ namespace Microsoft.Spark.CSharp.Core
IFormatter formatter = new BinaryFormatter();
using (Socket s = AcceptSocket())
using (var ns = new NetworkStream(s))
using (var br = new BinaryReader(ns))
using (var bw = new BinaryWriter(ns))
{
while (!serverShutdown)
{
int numUpdates = SerDe.Convert(br.ReadInt32());
int numUpdates = SerDe.ReadInt(ns);
for (int i = 0; i < numUpdates; i++)
{
var ms = new MemoryStream(br.ReadBytes(SerDe.Convert(br.ReadInt32())));
var ms = new MemoryStream(SerDe.ReadBytes(ns));
KeyValuePair<int, dynamic> update = (KeyValuePair<int, dynamic>)formatter.Deserialize(ms);
Accumulator accumulator = Accumulator.accumulatorRegistry[update.Key];
accumulator.GetType().GetMethod("Add").Invoke(accumulator, new object[] { update.Value });
}
bw.Write((byte)1); // acknowledge byte other than -1
bw.Flush();
ns.WriteByte((byte)1); // acknowledge byte other than -1
ns.Flush();
Thread.Sleep(1000);
}
}
}
catch (SocketException e)
{
if (e.ErrorCode != 10004) // A blocking operation was interrupted by a call to WSACancelBlockingCall - TcpListener.Stop cancelled AccepSocket as expected
throw e;
}
catch (Exception e)
{
logger.LogError(e.ToString());

Просмотреть файл

@ -24,6 +24,8 @@ namespace Microsoft.Spark.CSharp.Core
/// [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
/// b.Unpersist()
///
/// See python implementation in broadcast.py, worker.py, PythonRDD.scala
///
/// </summary>
[Serializable]
public class Broadcast
@ -31,8 +33,6 @@ namespace Microsoft.Spark.CSharp.Core
[NonSerialized]
public static Dictionary<long, Broadcast> broadcastRegistry = new Dictionary<long, Broadcast>();
[NonSerialized]
internal string broadcastObjId;
[NonSerialized]
internal string path;
internal long broadcastId;
@ -63,17 +63,16 @@ namespace Microsoft.Spark.CSharp.Core
public class Broadcast<T> : Broadcast
{
[NonSerialized]
internal SparkContext sparkContext;
private IBroadcastProxy broadcastProxy;
[NonSerialized]
private T value;
internal Broadcast(SparkContext sparkContext, T value)
{
this.sparkContext = sparkContext;
this.value = value;
path = Path.GetTempFileName();
DumpBroadcast<T>(value, path);
broadcastObjId = sparkContext.SparkContextProxy.ReadBroadcastFromFile(path, out broadcastId);
broadcastProxy = sparkContext.SparkContextProxy.ReadBroadcastFromFile(path, out broadcastId);
}
/// <summary>
@ -100,10 +99,9 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="blocking"></param>
public void Unpersist(bool blocking = false)
{
if (broadcastObjId == null)
if (broadcastProxy == null)
throw new ArgumentException("Broadcast can only be unpersisted in driver");
sparkContext.SparkContextProxy.UnpersistBroadcast(broadcastObjId, blocking);
sparkContext.broadcastVars.Remove(this);
broadcastProxy.Unpersist(blocking);
File.Delete(path);
}
}

Просмотреть файл

@ -305,7 +305,7 @@ namespace Microsoft.Spark.CSharp.Core
{
if (numPartitions == 0)
{
numPartitions = SparkCLREnvironment.SparkConfProxy.GetInt("spark.default.parallelism", 0);
numPartitions = self.sparkContext.SparkConf.SparkConfProxy.GetInt("spark.default.parallelism", 0);
if (numPartitions == 0)
numPartitions = self.previousRddProxy.PartitionLength();
}
@ -315,10 +315,10 @@ namespace Microsoft.Spark.CSharp.Core
// convert shuffling version of RDD[(Long, Array[Byte])] back to normal RDD[Array[Byte]]
// invoking property keyed.RddProxy marks the end of current pipeline RDD after shuffling
// and potentially starts next pipeline RDD with defult SerializedMode.Byte
var rdd = self.SparkContext.SparkContextProxy.CreatePairwiseRDD<K, V>(keyed.RddProxy, numPartitions);
var rdd = self.sparkContext.SparkContextProxy.CreatePairwiseRDD<K, V>(keyed.RddProxy, numPartitions);
//rdd.partitioner = partitioner
return new RDD<KeyValuePair<K, V>>(rdd, self.SparkContext);
return new RDD<KeyValuePair<K, V>>(rdd, self.sparkContext);
}
/// <summary>
@ -364,7 +364,7 @@ namespace Microsoft.Spark.CSharp.Core
{
if (numPartitions == 0)
{
numPartitions = SparkCLREnvironment.SparkConfProxy.GetInt("spark.default.parallelism", 0);
numPartitions = self.sparkContext.SparkConf.SparkConfProxy.GetInt("spark.default.parallelism", 0);
if (numPartitions == 0 && self.previousRddProxy != null)
numPartitions = self.previousRddProxy.PartitionLength();
}
@ -633,6 +633,7 @@ namespace Microsoft.Spark.CSharp.Core
}
/// <summary>
/// TO DO: C# version of RDDSampler.py
/// Return a subset of this RDD sampled by key (via stratified sampling).
/// Create a sample of this RDD using variable sampling rates for
/// different keys as specified by fractions, a key to sampling rate map.
@ -656,14 +657,17 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="fractions"></param>
/// <param name="seed"></param>
/// <returns></returns>
public static RDD<KeyValuePair<string, V>> SampleByKey<V>(
this RDD<KeyValuePair<string, V>> self,
bool withReplacement,
Dictionary<string, double> fractions,
long seed)
{
return new RDD<KeyValuePair<string, V>>(self.RddProxy.SampleByKey(withReplacement, fractions, seed), self.SparkContext);
}
//public static RDD<KeyValuePair<string, V>> SampleByKey<V>(
// this RDD<KeyValuePair<string, V>> self,
// bool withReplacement,
// Dictionary<string, double> fractions,
// long seed)
//{
// if (fractions.Any(f => f.Value < 0.0))
// throw new ArgumentException(string.Format("Negative fraction value found in: {0}", string.Join(",", fractions.Values.ToArray())));
// return new RDD<KeyValuePair<string, V>>(self.RddProxy.SampleByKey(withReplacement, fractions, seed), self.sparkContext);
//}
/// <summary>
/// Return each (key, value) pair in C{self} that has no pair with matching key in C{other}.

Просмотреть файл

@ -35,10 +35,10 @@ namespace Microsoft.Spark.CSharp.Core
{
func = new MapPartitionsWithIndexHelper(new NewFuncWrapper<U, U1>(newFunc).Execute, func).Execute,
preservesPartitioning = preservesPartitioning && preservesPartitioningParam,
previousRddProxy = previousRddProxy,
prevSerializedMode = prevSerializedMode,
previousRddProxy = this.previousRddProxy,
prevSerializedMode = this.prevSerializedMode,
sparkContext = sparkContext,
sparkContext = this.sparkContext,
rddProxy = null,
serializedMode = SerializedMode.Byte
};
@ -99,7 +99,7 @@ namespace Microsoft.Spark.CSharp.Core
{
rddProxy = sparkContext.SparkContextProxy.CreateCSharpRdd(previousRddProxy,
SparkContext.BuildCommand(func, prevSerializedMode, bypassSerializer ? SerializedMode.None : serializedMode),
null, null, preservesPartitioning, sparkContext.broadcastVars, null);
null, null, preservesPartitioning, null, null);
}
return rddProxy;
}

Просмотреть файл

@ -27,7 +27,7 @@ namespace Microsoft.Spark.CSharp.Core
{
internal IRDDProxy rddProxy;
internal IRDDProxy previousRddProxy;
protected SparkContext sparkContext;
internal SparkContext sparkContext;
internal SerializedMode serializedMode; //used for deserializing data before processing in C# worker
internal SerializedMode prevSerializedMode;
@ -35,14 +35,6 @@ namespace Microsoft.Spark.CSharp.Core
protected bool isCheckpointed;
internal bool bypassSerializer;
internal SparkContext SparkContext
{
get
{
return sparkContext;
}
}
internal virtual IRDDProxy RddProxy
{
get
@ -233,7 +225,7 @@ namespace Microsoft.Spark.CSharp.Core
previousRddProxy = rddProxy,
prevSerializedMode = serializedMode,
sparkContext = sparkContext,
sparkContext = this.sparkContext,
rddProxy = null,
serializedMode = SerializedMode.Byte
};
@ -327,7 +319,7 @@ namespace Microsoft.Spark.CSharp.Core
else if (num == 0)
return new T[0];
int initialCount = (int) Count();
int initialCount = (int)Count();
if (initialCount == 0)
return new T[0];
@ -487,7 +479,7 @@ namespace Microsoft.Spark.CSharp.Core
/// <returns></returns>
public RDD<Tuple<T, U>> Cartesian<U>(RDD<U> other)
{
return new RDD<Tuple<T, U>>(RddProxy.Cartesian(other.RddProxy), sparkContext);
return new RDD<Tuple<T, U>>(RddProxy.Cartesian(other.RddProxy), sparkContext, SerializedMode.Pair);
}
/// <summary>
@ -1470,7 +1462,6 @@ namespace Microsoft.Spark.CSharp.Core
}
}
}
internal enum SerializedMode
{
None,
@ -1479,4 +1470,5 @@ namespace Microsoft.Spark.CSharp.Core
Pair,
Row
}
}

Просмотреть файл

@ -20,7 +20,8 @@ namespace Microsoft.Spark.CSharp.Core
public class SparkConf
{
private ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(SparkConf));
internal ISparkConfProxy sparkConfProxy;
private ISparkConfProxy sparkConfProxy;
internal ISparkConfProxy SparkConfProxy { get { return sparkConfProxy; } }
/// <summary>
/// Create SparkConf
@ -28,8 +29,7 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="loadDefaults">indicates whether to also load values from Java system properties</param>
public SparkConf(bool loadDefaults = true)
{
SetSparkConfProxy();
sparkConfProxy.CreateSparkConf(loadDefaults);
sparkConfProxy = SparkCLREnvironment.SparkCLRProxy.CreateSparkConf(loadDefaults);
//special handling for debug mode because
//spark.master and spark.app.name will not be set in debug mode
@ -46,11 +46,6 @@ namespace Microsoft.Spark.CSharp.Core
}
}
private void SetSparkConfProxy()
{
sparkConfProxy = SparkCLREnvironment.SparkConfProxy;
}
/// <summary>
/// The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
/// run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.

Просмотреть файл

@ -19,8 +19,9 @@ namespace Microsoft.Spark.CSharp.Core
{
public class SparkContext
{
internal static ISparkContextProxy ActiveSparkContextProxy { get; private set; }
internal ISparkContextProxy SparkContextProxy { get; private set; }
internal List<Broadcast> broadcastVars = new List<Broadcast>();
internal SparkConf SparkConf { get; private set; }
private AccumulatorServer accumulatorServer;
private int nextAccumulatorId;
@ -84,8 +85,16 @@ namespace Microsoft.Spark.CSharp.Core
private SparkContext(string master, string appName, string sparkHome, SparkConf conf)
{
SparkContextProxy = SparkCLREnvironment.SparkContextProxy;
SparkContextProxy.CreateSparkContext(master, appName, sparkHome, conf.sparkConfProxy);
SparkConf = conf ?? new SparkConf();
if (master != null)
SparkConf.SetMaster(master);
if (appName != null)
SparkConf.SetAppName(appName);
if (sparkHome != null)
SparkConf.SetSparkHome(sparkHome);
SparkContextProxy = SparkCLREnvironment.SparkCLRProxy.CreateSparkContext(SparkConf.SparkConfProxy);
ActiveSparkContextProxy = SparkContextProxy;
// AddDriverFilesToSparkContext and AddWorkerToSparkContext
foreach (var file in SparkCLREnvironment.ConfigurationService.GetDriverFiles())
@ -93,12 +102,6 @@ namespace Microsoft.Spark.CSharp.Core
AddFile(file);
}
AddFile(SparkCLREnvironment.ConfigurationService.GetCSharpWorkerPath());
string host = "localhost";
accumulatorServer = new AccumulatorServer(host);
int port = accumulatorServer.StartUpdateServer();
SparkContextProxy.Accumulator(host, port);
}
public RDD<string> TextFile(string filePath, int minPartitions = 0)
@ -117,7 +120,7 @@ namespace Microsoft.Spark.CSharp.Core
/// <param name="serializableObjects"></param>
/// <param name="numSlices"></param>
/// <returns></returns>
public RDD<T> Parallelize<T>(IEnumerable<T> serializableObjects, int? numSlices)
public RDD<T> Parallelize<T>(IEnumerable<T> serializableObjects, int numSlices = 1)
{
List<byte[]> collectionOfByteRepresentationOfObjects = new List<byte[]>();
foreach (T obj in serializableObjects)
@ -128,6 +131,9 @@ namespace Microsoft.Spark.CSharp.Core
collectionOfByteRepresentationOfObjects.Add(memoryStream.ToArray());
}
if (numSlices < 1)
numSlices = 1;
return new RDD<T>(SparkContextProxy.Parallelize(collectionOfByteRepresentationOfObjects, numSlices), this);
}
@ -361,7 +367,6 @@ namespace Microsoft.Spark.CSharp.Core
public Broadcast<T> Broadcast<T>(T value)
{
var broadcast = new Broadcast<T>(this, value);
broadcastVars.Add(broadcast);
return broadcast;
}
@ -377,6 +382,14 @@ namespace Microsoft.Spark.CSharp.Core
/// <returns></returns>
public Accumulator<T> Accumulator<T>(T value)
{
if (accumulatorServer == null)
{
string host = "localhost";
accumulatorServer = new AccumulatorServer(host);
int port = accumulatorServer.StartUpdateServer();
SparkContextProxy.Accumulator(host, port);
}
return new Accumulator<T>(nextAccumulatorId++, value);
}
@ -385,8 +398,9 @@ namespace Microsoft.Spark.CSharp.Core
/// </summary>
public void Stop()
{
if (accumulatorServer != null)
accumulatorServer.Shutdown();
SparkContextProxy.Stop();
accumulatorServer.Shutdown();
}
/// <summary>

Просмотреть файл

@ -23,7 +23,7 @@ namespace Microsoft.Spark.CSharp.Core
public StatCounter(IEnumerable<double> values)
{
this.Merge(values);
Merge(values);
}
/// <summary>

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using Microsoft.Spark.CSharp.Proxy.Ipc;
namespace Microsoft.Spark.CSharp.Interop.Ipc
{
@ -27,8 +28,8 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc
public string GetDebugInfo()
{
var javaObjectReferenceForClassObject = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(this, "getClass").ToString());
var className = SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(javaObjectReferenceForClassObject, "getName").ToString();
var javaObjectReferenceForClassObject = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(this, "getClass").ToString());
var className = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(javaObjectReferenceForClassObject, "getName").ToString();
return string.Format("Java object reference id={0}, type name={1}, creation time (UTC)={2}", Id, className, creationTime.ToString("o"));
}
}

Просмотреть файл

@ -3,6 +3,7 @@
using System;
using System.Text;
using System.IO;
namespace Microsoft.Spark.CSharp.Interop.Ipc
{
@ -76,5 +77,120 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc
Array.Reverse(buffer); //Netty byte order is BigEndian
return BitConverter.ToDouble(buffer, 0);
}
public static int ReadInt(Stream s)
{
byte[] buffer = ReadBytes(s, 4);
return //Netty byte order is BigEndian
(int)buffer[3] |
(int)buffer[2] << 8 |
(int)buffer[1] << 16 |
(int)buffer[0] << 24;
}
public static long ReadLong(Stream s)
{
byte[] buffer = ReadBytes(s, 8);
return //Netty byte order is BigEndian
(long)buffer[7] |
(long)buffer[6] << 8 |
(long)buffer[5] << 16 |
(long)buffer[4] << 24 |
(long)buffer[3] << 32 |
(long)buffer[2] << 40 |
(long)buffer[1] << 48 |
(long)buffer[0] << 56;
}
public static double ReadDouble(Stream s)
{
byte[] buffer = ReadBytes(s, 8);
Array.Reverse(buffer); //Netty byte order is BigEndian
return BitConverter.ToDouble(buffer, 0);
}
public static string ReadString(Stream s)
{
return ToString(ReadBytes(s));
}
public static byte[] ReadBytes(Stream s, int length)
{
if (length <= 0)
return null;
byte[] buffer = new byte[length];
int bytesRead = 0;
while (bytesRead < length)
{
bytesRead += s.Read(buffer, bytesRead, length - bytesRead);
}
return buffer;
}
public static byte[] ReadBytes(Stream s)
{
var length = ReadInt(s);
return ReadBytes(s, length);
}
public static string ReadObjectId(Stream s)
{
var type = s.ReadByte();
if (type != 'j')
{
Console.WriteLine("Expecting java object identifier type");
return null;
}
return ReadString(s);
}
public static void Write(Stream s, byte value)
{
s.WriteByte(value);
}
public static void Write(Stream s, byte[] value)
{
s.Write(value, 0, value.Length);
}
public static void Write(Stream s, int value)
{
Write(s, new byte[] {
(byte)(value >> 24),
(byte)(value >> 16),
(byte)(value >> 8),
(byte)value
});
}
public static void Write(Stream s, long value)
{
Write(s, new byte[] {
(byte)(value >> 56),
(byte)(value >> 48),
(byte)(value >> 40),
(byte)(value >> 32),
(byte)(value >> 24),
(byte)(value >> 16),
(byte)(value >> 8),
(byte)value,
});
}
public static void Write(Stream s, double value)
{
byte[] buffer = BitConverter.GetBytes(value);
Array.Reverse(buffer);
Write(s, buffer);
}
public static void Write(Stream s, string value)
{
byte[] buffer = Encoding.UTF8.GetBytes(value);
Write(s, buffer.Length);
Write(s, buffer);
}
}
}

Просмотреть файл

@ -14,163 +14,40 @@ namespace Microsoft.Spark.CSharp.Interop
/// <summary>
/// Contains everything needed to setup an environment for using C# with Spark
/// </summary>
public class SparkCLREnvironment : IDisposable
public class SparkCLREnvironment
{
internal IJvmBridge jvmBridge;
internal static IJvmBridge JvmBridge
private static ISparkCLRProxy sparkCLRProxy;
internal static ISparkCLRProxy SparkCLRProxy
{
get
{
return Environment.jvmBridge;
if (sparkCLRProxy == null)
{
// TO DO: should get from app.config first, if not configured, then default to IPC
sparkCLRProxy = new SparkCLRIpcProxy();
}
return sparkCLRProxy;
}
}
internal ISparkConfProxy sparkConfProxy;
internal static ISparkConfProxy SparkConfProxy
{
get
set
{
return Environment.sparkConfProxy;
sparkCLRProxy = value; // for plugging test environment
}
}
internal ISparkContextProxy sparkContextProxy;
internal static ISparkContextProxy SparkContextProxy
{
get
{
return Environment.sparkContextProxy;
}
}
//internal IStreamingContextProxy streamingContextProxy;
//internal static IStreamingContextProxy StreamingContextProxy
//{
// get
// {
// return Environment.streamingContextProxy;
// }
//}
internal ISqlContextProxy sqlContextProxy;
internal static ISqlContextProxy SqlContextProxy
{
get
{
return Environment.sqlContextProxy;
}
}
internal IConfigurationService configurationService;
internal static IConfigurationService configurationService;
internal static IConfigurationService ConfigurationService
{
get
{
return Environment.configurationService;
if (configurationService == null)
configurationService = new ConfigurationService();
return configurationService;
}
set
{
Environment.configurationService = value;
configurationService = value;
}
}
protected static SparkCLREnvironment Environment = new SparkCLREnvironment();
protected SparkCLREnvironment() { }
/// <summary>
/// Initializes and returns the environment for SparkCLR execution
/// </summary>
/// <returns></returns>
public static SparkCLREnvironment Initialize()
{
Environment.InitializeEnvironment();
return Environment;
}
/// <summary>
/// Disposes the socket used in the JVM-CLR bridge
/// </summary>
public void Dispose()
{
jvmBridge.Dispose();
}
protected virtual void InitializeEnvironment()
{
var proxyFactory = new ProxyFactory();
configurationService = new ConfigurationService();
sparkConfProxy = proxyFactory.GetSparkConfProxy();
sparkContextProxy = proxyFactory.GetSparkContextProxy();
//streamingContextProxy = new StreamingContextIpcProxy();
sqlContextProxy = proxyFactory.GetSqlContextProxy();
jvmBridge = new JvmBridge();
InitializeJvmBridge();
}
private void InitializeJvmBridge()
{
int portNo = ConfigurationService.BackendPortNumber;
if (portNo == 0) //fail early
{
throw new Exception("Port number is not set");
}
Console.WriteLine("CSharpBackend port number to be used in JvMBridge is " + portNo);//TODO - send to logger
jvmBridge.Initialize(portNo);
}
private class ProxyFactory
{
private readonly InteropType interopType;
internal ProxyFactory(InteropType interopType = InteropType.IPC)
{
this.interopType = interopType;
}
internal ISparkConfProxy GetSparkConfProxy()
{
switch (interopType)
{
case InteropType.IPC:
return new SparkConfIpcProxy();
default:
throw new NotImplementedException();
}
}
internal ISparkContextProxy GetSparkContextProxy()
{
switch (interopType)
{
case InteropType.IPC:
return new SparkContextIpcProxy();
default:
throw new NotImplementedException();
}
}
internal ISqlContextProxy GetSqlContextProxy()
{
switch (interopType)
{
case InteropType.IPC:
return new SqlContextIpcProxy();
default:
throw new NotImplementedException();
}
}
}
public enum InteropType
{
IPC
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Sql;
namespace Microsoft.Spark.CSharp.Proxy
{
interface ISparkCLRProxy
{
ISparkContextProxy SparkContextProxy { get; }
ISparkConfProxy CreateSparkConf(bool loadDefaults = true);
ISparkContextProxy CreateSparkContext(ISparkConfProxy conf);
IStructFieldProxy CreateStructField(string name, string dataType, bool isNullable);
IStructTypeProxy CreateStructType(List<StructField> fields);
}
}

Просмотреть файл

@ -12,7 +12,6 @@ namespace Microsoft.Spark.CSharp.Proxy
{
internal interface ISparkConfProxy
{
void CreateSparkConf(bool loadDefaults = true);
void SetMaster(string master);
void SetAppName(string appName);
void SetSparkHome(string sparkHome);

Просмотреть файл

@ -14,7 +14,7 @@ namespace Microsoft.Spark.CSharp.Proxy
{
internal interface ISparkContextProxy
{
void CreateSparkContext(string master, string appName, string sparkHome, ISparkConfProxy conf);
ISqlContextProxy CreateSqlContext();
IColumnProxy CreateColumnFromName(string name);
IColumnProxy CreateFunction(string name, object self);
IColumnProxy CreateBinaryMathFunction(string name, object self, object other);
@ -27,7 +27,7 @@ namespace Microsoft.Spark.CSharp.Proxy
int DefaultMinPartitions { get; }
void Stop();
IRDDProxy EmptyRDD<T>();
IRDDProxy Parallelize(IEnumerable<byte[]> values, int? numSlices);
IRDDProxy Parallelize(IEnumerable<byte[]> values, int numSlices);
IRDDProxy TextFile(string filePath, int minPartitions);
IRDDProxy WholeTextFiles(string filePath, int minPartitions);
IRDDProxy BinaryFiles(string filePath, int minPartitions);
@ -48,10 +48,13 @@ namespace Microsoft.Spark.CSharp.Proxy
void CancelAllJobs();
IStatusTrackerProxy StatusTracker { get; }
int RunJob(IRDDProxy rdd, IEnumerable<int> partitions, bool allowLocal);
string ReadBroadcastFromFile(string path, out long broadcastId);
void UnpersistBroadcast(string broadcastObjId, bool blocking);
IBroadcastProxy ReadBroadcastFromFile(string path, out long broadcastId);
IRDDProxy CreateCSharpRdd(IRDDProxy prefvJavaRddReference, byte[] command, Dictionary<string, string> environmentVariables, List<string> pythonIncludes, bool preservePartitioning, List<Broadcast> broadcastVariables, List<byte[]> accumulator);
IRDDProxy CreatePairwiseRDD<K, V>(IRDDProxy javaReferenceInByteArrayRdd, int numPartitions);
IRDDProxy CreateUserDefinedCSharpFunction(string name, byte[] command, string returnType);
}
internal interface IBroadcastProxy
{
void Unpersist(bool blocking);
}
}

Просмотреть файл

@ -13,9 +13,6 @@ namespace Microsoft.Spark.CSharp.Proxy
{
internal interface ISqlContextProxy
{
void CreateSqlContext(ISparkContextProxy sparkContextProxy);
StructField CreateStructField(string name, string dataType, bool isNullable);
StructType CreateStructType(List<StructField> fields);
IDataFrameProxy ReaDataFrame(string path, StructType schema, Dictionary<string, string> options);
IDataFrameProxy JsonFile(string path);
IDataFrameProxy TextFile(string path, StructType schema, string delimiter);

Просмотреть файл

@ -24,7 +24,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
public void RegisterTempTable(string tableName)
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReference,
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReference,
"registerTempTable", new object[] {tableName});
}
@ -32,21 +32,21 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
long.Parse(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "count").ToString());
}
public string GetQueryExecution()
{
var queryExecutionReference = GetQueryExecutionReference();
return SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(queryExecutionReference, "toString").ToString();
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(queryExecutionReference, "toString").ToString();
}
private JvmObjectReference GetQueryExecutionReference()
{
return
new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "queryExecution").ToString());
}
@ -55,15 +55,15 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
var queryExecutionReference = GetQueryExecutionReference();
var executedPlanReference =
new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(queryExecutionReference, "executedPlan")
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(queryExecutionReference, "executedPlan")
.ToString());
return SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(executedPlanReference, "toString", new object[] { }).ToString();
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(executedPlanReference, "toString", new object[] { }).ToString();
}
public string GetShowString(int numberOfRows, bool truncate)
{
return
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "showString",
new object[] {numberOfRows /*, truncate*/ }).ToString(); //1.4.1 does not support second param
}
@ -72,23 +72,23 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
new StructTypeIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "schema").ToString()));
}
public IRDDProxy ToJSON()
{
return new RDDIpcProxy(
new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReference, "toJSON")),
new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReference, "toJSON")),
"toJavaRDD")));
}
public IRDDProxy ToRDD()
{
return new RDDIpcProxy(
new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "dfToRowRDD", new object[] {jvmDataFrameReference})),
new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "dfToRowRDD", new object[] {jvmDataFrameReference})),
"toJavaRDD")));
}
@ -96,7 +96,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
new ColumnIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "col", new object[] {columnName}).ToString()));
}
@ -105,7 +105,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
var javaObjectReferenceList = objectList.Cast<JvmObjectReference>().ToList();
return
new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils",
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils",
"toSeq", new object[] {javaObjectReferenceList}).ToString());
}
@ -114,7 +114,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
var javaObjectReferenceList = columnRefList.Select(s => (s as ColumnIpcProxy).ScalaColumnReference).ToList().Cast<JvmObjectReference>();
return
new ColumnIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils",
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils",
"toSeq", new object[] { javaObjectReferenceList }).ToString()));
}
@ -122,7 +122,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "select",
new object[] { (columnSequenceReference as ColumnIpcProxy).ScalaColumnReference }).ToString()), sqlContextProxy);
}
@ -131,7 +131,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "filter", new object[] { condition }).ToString()), sqlContextProxy);
}
@ -140,7 +140,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
new GroupedDataIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "groupBy",
new object[] { firstColumnName, (otherColumnSequenceReference as ColumnIpcProxy).ScalaColumnReference }).ToString()));
}
@ -149,7 +149,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
new GroupedDataIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "groupBy",
new object[] { (columnSequenceReference as ColumnIpcProxy).ScalaColumnReference}).ToString()));
}
@ -158,21 +158,21 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
new GroupedDataIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "groupBy",
new object[] { columnSequenceReference as JvmObjectReference }).ToString()));
}
public IDataFrameProxy Agg(IGroupedDataProxy scalaGroupedDataReference, Dictionary<string, string> columnNameAggFunctionDictionary)
{
var mapReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallConstructor("java.util.HashMap").ToString());
var mapReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallConstructor("java.util.HashMap").ToString());
foreach (var key in columnNameAggFunctionDictionary.Keys)
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(mapReference, "put", new object[] { key, columnNameAggFunctionDictionary[key]});
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(mapReference, "put", new object[] { key, columnNameAggFunctionDictionary[key]});
}
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
(scalaGroupedDataReference as GroupedDataIpcProxy).ScalaGroupedDataReference, "agg", new object[] { mapReference }).ToString()), sqlContextProxy);
}
@ -180,7 +180,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReference, "join", new object[]
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReference, "join", new object[]
{
(otherScalaDataFrameReference as DataFrameIpcProxy).jvmDataFrameReference,
joinColumnName
@ -194,11 +194,11 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
//TODO - uncomment this in 1.5
//var stringSequenceReference = new JvmObjectReference(
// SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "toSeq", new object[] { joinColumnNames }).ToString());
// SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "toSeq", new object[] { joinColumnNames }).ToString());
//return
// new JvmObjectReference(
// SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(scalaDataFrameReference, "join", new object[]
// SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(scalaDataFrameReference, "join", new object[]
// {
// otherScalaDataFrameReference,
// stringSequenceReference
@ -210,7 +210,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
new DataFrameIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameReference, "join",
new object[]
{
@ -236,26 +236,26 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return
new ColumnIpcProxy(new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
scalaColumnReference, "equalTo",
new object[] { (secondColumn as ColumnIpcProxy).scalaColumnReference }).ToString()));
}
public IColumnProxy UnaryOp(string name)
{
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(scalaColumnReference, name)));
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(scalaColumnReference, name)));
}
public IColumnProxy FuncOp(string name)
{
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.functions", name, scalaColumnReference)));
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.functions", name, scalaColumnReference)));
}
public IColumnProxy BinOp(string name, object other)
{
if (other is ColumnIpcProxy)
other = (other as ColumnIpcProxy).scalaColumnReference;
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(scalaColumnReference, name, other)));
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(scalaColumnReference, name, other)));
}
}

Просмотреть файл

@ -27,8 +27,8 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
get
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return (string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(rdd, "name");
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(rdd, "name");
}
}
@ -36,8 +36,8 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
get
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return (bool)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(rdd, "isCheckpointed");
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return (bool)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(rdd, "isCheckpointed");
}
}
@ -48,213 +48,199 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
public long Count()
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return long.Parse(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(rdd, "count").ToString());
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return long.Parse(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(rdd, "count").ToString());
}
public int CollectAndServe()
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return int.Parse(SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "collectAndServe", new object[] { rdd }).ToString());
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return int.Parse(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "collectAndServe", new object[] { rdd }).ToString());
}
public IRDDProxy Union(IRDDProxy javaRddReferenceOther)
{
var jref = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "union", new object[] { (javaRddReferenceOther as RDDIpcProxy).jvmRddReference }).ToString());
var jref = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "union", new object[] { (javaRddReferenceOther as RDDIpcProxy).jvmRddReference }).ToString());
return new RDDIpcProxy(jref);
}
public int PartitionLength()
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
var partitions = SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(rdd, "partitions", new object[] { });
return int.Parse(SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("java.lang.reflect.Array", "getLength", new object[] { partitions }).ToString());
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
var partitions = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(rdd, "partitions", new object[] { });
return int.Parse(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("java.lang.reflect.Array", "getLength", new object[] { partitions }).ToString());
}
public IRDDProxy Coalesce(int numPartitions, bool shuffle)
{
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "coalesce", new object[] { numPartitions, shuffle })));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "coalesce", new object[] { numPartitions, shuffle })));
}
public IRDDProxy Sample(bool withReplacement, double fraction, long seed)
{
var jref = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "sample", new object[] { withReplacement, fraction, seed }));
var jref = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "sample", new object[] { withReplacement, fraction, seed }));
return new RDDIpcProxy(jref);
}
public IRDDProxy[] RandomSplit(double[] weights, long seed)
{
return ((List<JvmObjectReference>)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "randomSplit", new object[] { weights, seed }))
return ((List<JvmObjectReference>)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "randomSplit", new object[] { weights, seed }))
.Select(obj => new RDDIpcProxy(obj)).ToArray();
}
public IRDDProxy RandomSampleWithRange(double lb, double ub, long seed)
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(rdd, "randomSampleWithRange", new object[] { lb, ub, seed })));
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(rdd, "randomSampleWithRange", new object[] { lb, ub, seed })));
}
public void Cache()
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "cache");
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "cache");
}
public void Persist(StorageLevelType storageLevelType)
{
var jstorageLevel = GetJavaStorageLevel(storageLevelType);
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "persist", new object[] { jstorageLevel });
var jstorageLevel = SparkContextIpcProxy.GetJavaStorageLevel(storageLevelType);
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "persist", new object[] { jstorageLevel });
}
public void Unpersist()
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "unpersist");
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "unpersist");
}
public void Checkpoint()
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(rdd, "checkpoint");
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(rdd, "checkpoint");
}
public string GetCheckpointFile()
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return (string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(rdd, "getCheckpointFile");
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(rdd, "getCheckpointFile");
}
public int GetNumPartitions()
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return ((List<JvmObjectReference>)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(rdd, "partitions")).Count;
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return ((List<JvmObjectReference>)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(rdd, "partitions")).Count;
}
public IRDDProxy Intersection(IRDDProxy other)
{
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "intersection", new object[] { (other as RDDIpcProxy).jvmRddReference })));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "intersection", new object[] { (other as RDDIpcProxy).jvmRddReference })));
}
public IRDDProxy Repartition(int numPartitions)
{
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "repartition", new object[] { numPartitions })));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "repartition", new object[] { numPartitions })));
}
public IRDDProxy Cartesian(IRDDProxy other)
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
var otherRdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod((other as RDDIpcProxy).jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "cartesian", new object[] { otherRdd })));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "cartesian", (other as RDDIpcProxy).jvmRddReference)));
}
public IRDDProxy Pipe(string command)
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "pipe", new object[] { command })));
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "pipe", new object[] { command })));
}
public void SetName(string name)
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "setName", new object[] { name });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "setName", new object[] { name });
}
public IRDDProxy SampleByKey(bool withReplacement, Dictionary<string, double> fractions, long seed)
{
var jfractions = SparkContextIpcProxy.GetJavaMap(fractions) as JvmObjectReference;
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "sampleByKey", new object[] { withReplacement, jfractions, seed })));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "sampleByKey", new object[] { withReplacement, jfractions, seed })));
}
public string ToDebugString()
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return (string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "toDebugString");
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "toDebugString");
}
public IRDDProxy Zip(IRDDProxy other)
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "zip", new object[] { (other as RDDIpcProxy).jvmRddReference })));
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "zip", new object[] { (other as RDDIpcProxy).jvmRddReference })));
}
public IRDDProxy ZipWithIndex()
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "zipWithIndex")));
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "zipWithIndex")));
}
public IRDDProxy ZipWithUniqueId()
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "zipWithUniqueId")));
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "zipWithUniqueId")));
}
public void SaveAsNewAPIHadoopDataset(IEnumerable<KeyValuePair<string, string>> conf)
{
var jconf = SparkContextIpcProxy.GetJavaMap<string, string>(conf);
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "saveAsHadoopDataset", new object[] { jvmRddReference, false, jconf, null, null, true });
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "saveAsHadoopDataset", new object[] { jvmRddReference, false, jconf, null, null, true });
}
public void SaveAsNewAPIHadoopFile(string path, string outputFormatClass, string keyClass, string valueClass, IEnumerable<KeyValuePair<string, string>> conf)
{
var jconf = SparkContextIpcProxy.GetJavaMap<string, string>(conf);
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "saveAsNewAPIHadoopFile", new object[] { jvmRddReference, false, path, outputFormatClass, keyClass, valueClass, null, null, jconf });
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "saveAsNewAPIHadoopFile", new object[] { jvmRddReference, false, path, outputFormatClass, keyClass, valueClass, null, null, jconf });
}
public void SaveAsHadoopDataset(IEnumerable<KeyValuePair<string, string>> conf)
{
var jconf = SparkContextIpcProxy.GetJavaMap<string, string>(conf);
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "saveAsHadoopDataset", new object[] { jvmRddReference, false, jconf, null, null, false });
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "saveAsHadoopDataset", new object[] { jvmRddReference, false, jconf, null, null, false });
}
public void saveAsHadoopFile(string path, string outputFormatClass, string keyClass, string valueClass, IEnumerable<KeyValuePair<string, string>> conf, string compressionCodecClass)
{
var jconf = SparkContextIpcProxy.GetJavaMap<string, string>(conf);
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "saveAsHadoopFile", new object[] { jvmRddReference, false, path, outputFormatClass, keyClass, valueClass, null, null, jconf, compressionCodecClass });
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "saveAsHadoopFile", new object[] { jvmRddReference, false, path, outputFormatClass, keyClass, valueClass, null, null, jconf, compressionCodecClass });
}
public void SaveAsSequenceFile(string path, string compressionCodecClass)
{
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "SaveAsSequenceFile", new object[] { jvmRddReference, false, path, compressionCodecClass });
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "SaveAsSequenceFile", new object[] { jvmRddReference, false, path, compressionCodecClass });
}
public void SaveAsTextFile(string path, string compressionCodecClass)
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
if (!string.IsNullOrEmpty(compressionCodecClass))
{
var codec = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("java.lang.Class", "forName", new object[] { compressionCodecClass }));
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "saveAsTextFile", new object[] { path, codec });
var codec = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("java.lang.Class", "forName", new object[] { compressionCodecClass }));
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "saveAsTextFile", new object[] { path, codec });
}
else
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "saveAsTextFile", new object[] { path });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "saveAsTextFile", new object[] { path });
}
}
public StorageLevel GetStorageLevel()
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
var storageLevel = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(rdd, "getStorageLevel"));
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmRddReference, "rdd"));
var storageLevel = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(rdd, "getStorageLevel"));
return new StorageLevel
(
(bool)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(storageLevel, "useDisk"),
(bool)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(storageLevel, "useMemory"),
(bool)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(storageLevel, "useOffHeap"),
(bool)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(storageLevel, "deserialized"),
(int)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(storageLevel, "replication")
(bool)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(storageLevel, "useDisk"),
(bool)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(storageLevel, "useMemory"),
(bool)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(storageLevel, "useOffHeap"),
(bool)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(storageLevel, "deserialized"),
(int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(storageLevel, "replication")
);
}
private JvmObjectReference GetJavaStorageLevel(StorageLevelType storageLevelType)
{
return new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.java.StorageLevels", "create",
new object[]
{
StorageLevel.storageLevel[storageLevelType].useDisk,
StorageLevel.storageLevel[storageLevelType].useMemory,
StorageLevel.storageLevel[storageLevelType].useOffHeap,
StorageLevel.storageLevel[storageLevelType].deserialized,
StorageLevel.storageLevel[storageLevelType].replication
}).ToString());
}
}
}

Просмотреть файл

@ -0,0 +1,83 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Sql;
using Microsoft.Spark.CSharp.Interop;
namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
internal class SparkCLRIpcProxy : ISparkCLRProxy
{
private SparkContextIpcProxy sparkContextProxy;
private static IJvmBridge jvmBridge = new JvmBridge();
internal static IJvmBridge JvmBridge
{
get
{
return jvmBridge;
}
}
public SparkCLRIpcProxy()
{
int portNo = SparkCLREnvironment.ConfigurationService.BackendPortNumber;
if (portNo == 0) //fail early
{
throw new Exception("Port number is not set");
}
Console.WriteLine("CSharpBackend port number to be used in JvMBridge is " + portNo);//TODO - send to logger
JvmBridge.Initialize(portNo);
}
~SparkCLRIpcProxy()
{
JvmBridge.Dispose();
}
public ISparkContextProxy SparkContextProxy { get { return sparkContextProxy; } }
public ISparkConfProxy CreateSparkConf(bool loadDefaults = true)
{
return new SparkConfIpcProxy(JvmBridge.CallConstructor("org.apache.spark.SparkConf", new object[] { loadDefaults }));
}
public ISparkContextProxy CreateSparkContext(ISparkConfProxy conf)
{
JvmObjectReference jvmSparkContextReference = JvmBridge.CallConstructor("org.apache.spark.SparkContext", (conf as SparkConfIpcProxy).JvmSparkConfReference);
JvmObjectReference jvmJavaContextReference = JvmBridge.CallConstructor("org.apache.spark.api.java.JavaSparkContext", new object[] { jvmSparkContextReference });
sparkContextProxy = new SparkContextIpcProxy(jvmSparkContextReference, jvmJavaContextReference);
return sparkContextProxy;
}
public IStructFieldProxy CreateStructField(string name, string dataType, bool isNullable)
{
return new StructFieldIpcProxy(
new JvmObjectReference(
JvmBridge.CallStaticJavaMethod(
"org.apache.spark.sql.api.csharp.SQLUtils", "createStructField",
new object[] { name, dataType, isNullable }).ToString()
)
);
}
public IStructTypeProxy CreateStructType(List<StructField> fields)
{
var fieldsReference = fields.Select(s => (s.StructFieldProxy as StructFieldIpcProxy).JvmStructFieldReference).ToList().Cast<JvmObjectReference>();
//var javaObjectReferenceList = objectList.Cast<JvmObjectReference>().ToList();
var seq =
new JvmObjectReference(
JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils",
"toSeq", new object[] { fieldsReference }).ToString());
return new StructTypeIpcProxy(
new JvmObjectReference(
JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "createStructType", new object[] { seq }).ToString()
)
);
}
}
}

Просмотреть файл

@ -10,7 +10,7 @@ using Microsoft.Spark.CSharp.Interop;
using Microsoft.Spark.CSharp.Interop.Ipc;
namespace Microsoft.Spark.CSharp.Proxy
namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
internal class SparkConfIpcProxy : ISparkConfProxy
{
@ -21,39 +21,39 @@ namespace Microsoft.Spark.CSharp.Proxy
get { return jvmSparkConfReference; }
}
public void CreateSparkConf(bool loadDefaults = true)
public SparkConfIpcProxy(JvmObjectReference jvmSparkConfReference)
{
jvmSparkConfReference = SparkCLREnvironment.JvmBridge.CallConstructor("org.apache.spark.SparkConf", new object[] { loadDefaults });
this.jvmSparkConfReference = jvmSparkConfReference;
}
public void SetMaster(string master)
{
jvmSparkConfReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "setMaster", new object[] { master }).ToString());
jvmSparkConfReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "setMaster", new object[] { master }).ToString());
}
public void SetAppName(string appName)
{
jvmSparkConfReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "setAppName", new object[] { appName }).ToString());
jvmSparkConfReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "setAppName", new object[] { appName }).ToString());
}
public void SetSparkHome(string sparkHome)
{
jvmSparkConfReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "setSparkHome", new object[] { sparkHome }).ToString());
jvmSparkConfReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "setSparkHome", new object[] { sparkHome }).ToString());
}
public void Set(string key, string value)
{
jvmSparkConfReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "set", new object[] { key, value }).ToString());
jvmSparkConfReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "set", new object[] { key, value }).ToString());
}
public int GetInt(string key, int defaultValue)
{
return int.Parse(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "getInt", new object[] { key, defaultValue }).ToString());
return int.Parse(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "getInt", new object[] { key, defaultValue }).ToString());
}
public string Get(string key, string defaultValue)
{
return SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "get", new object[] { key, defaultValue }).ToString();
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "get", new object[] { key, defaultValue }).ToString();
}
}
}

Просмотреть файл

@ -11,103 +11,113 @@ using Microsoft.Spark.CSharp.Interop;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Proxy.Ipc;
namespace Microsoft.Spark.CSharp.Proxy
namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
internal class SparkContextIpcProxy : ISparkContextProxy
{
private JvmObjectReference jvmSparkContextReference;
private JvmObjectReference jvmJavaContextReference;
private JvmObjectReference jvmAccumulatorReference;
internal List<JvmObjectReference> jvmBroadcastReferences = new List<JvmObjectReference>();
internal JvmObjectReference JvmSparkContextReference
{
get { return jvmSparkContextReference; }
}
public SparkContextIpcProxy(JvmObjectReference jvmSparkContextReference, JvmObjectReference jvmJavaContextReference)
{
this.jvmSparkContextReference = jvmSparkContextReference;
this.jvmJavaContextReference = jvmJavaContextReference;
}
public ISqlContextProxy CreateSqlContext()
{
return new SqlContextIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "createSQLContext", new object[] { jvmSparkContextReference })));
}
public void CreateSparkContext(string master, string appName, string sparkHome, ISparkConfProxy conf)
{
object[] args = (new object[] { master, appName, sparkHome, (conf == null ? null : (conf as SparkConfIpcProxy).JvmSparkConfReference) }).Where(x => x != null).ToArray();
jvmSparkContextReference = SparkCLREnvironment.JvmBridge.CallConstructor("org.apache.spark.SparkContext", args);
jvmJavaContextReference = SparkCLREnvironment.JvmBridge.CallConstructor("org.apache.spark.api.java.JavaSparkContext", new object[] { jvmSparkContextReference });
jvmSparkContextReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.SparkContext", args);
jvmJavaContextReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.api.java.JavaSparkContext", new object[] { jvmSparkContextReference });
}
public void SetLogLevel(string logLevel)
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "setLogLevel", new object[] { logLevel });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "setLogLevel", new object[] { logLevel });
}
private string version;
public string Version
{
get { if (version == null) { version = (string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "version"); } return version; }
get { if (version == null) { version = (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "version"); } return version; }
}
private long? startTime;
public long StartTime
{
get { if (startTime == null) { startTime = (long)(double)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "startTime"); } return (long)startTime; }
get { if (startTime == null) { startTime = (long)(double)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "startTime"); } return (long)startTime; }
}
private int? defaultParallelism;
public int DefaultParallelism
{
get { if (defaultParallelism == null) { defaultParallelism = (int)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "defaultParallelism"); } return (int)defaultParallelism; }
get { if (defaultParallelism == null) { defaultParallelism = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "defaultParallelism"); } return (int)defaultParallelism; }
}
private int? defaultMinPartitions;
public int DefaultMinPartitions
{
get { if (defaultMinPartitions == null) { defaultMinPartitions = (int)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "defaultMinPartitions"); } return (int)defaultMinPartitions; }
get { if (defaultMinPartitions == null) { defaultMinPartitions = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "defaultMinPartitions"); } return (int)defaultMinPartitions; }
}
public void Accumulator(string host, int port)
{
jvmAccumulatorReference = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "accumulator",
SparkCLREnvironment.JvmBridge.CallConstructor("java.util.ArrayList"),
SparkCLREnvironment.JvmBridge.CallConstructor("org.apache.spark.api.python.PythonAccumulatorParam", host, port)
jvmAccumulatorReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "accumulator",
SparkCLRIpcProxy.JvmBridge.CallConstructor("java.util.ArrayList"),
SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.api.python.PythonAccumulatorParam", host, port)
));
}
public void Stop()
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "stop", new object[] { });
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("SparkCLRHandler", "stopBackend", new object[] { }); //className and methodName hardcoded in CSharpBackendHandler
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "stop", new object[] { });
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("SparkCLRHandler", "stopBackend", new object[] { }); //className and methodName hardcoded in CSharpBackendHandler
}
public IRDDProxy EmptyRDD<T>()
{
var jvmRddReference = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "emptyRDD"));
var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "emptyRDD"));
return new RDDIpcProxy(jvmRddReference);
}
//TODO - this implementation is slow. Replace with call to createRDDFromArray() in CSharpRDD
public IRDDProxy Parallelize(IEnumerable<byte[]> values, int? numSlices)
public IRDDProxy Parallelize(IEnumerable<byte[]> values, int numSlices)
{
JvmObjectReference jvalues = GetJavaList(values);
var jvmRddReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "parallelize", new object[] { jvalues, numSlices }).ToString());
var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.csharp.CSharpRDD", "createRDDFromArray", new object[] { jvmSparkContextReference, values, numSlices }));
return new RDDIpcProxy(jvmRddReference);
}
public IRDDProxy TextFile(string filePath, int minPartitions)
{
var jvmRddReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "textFile", new object[] { filePath, minPartitions }).ToString());
var jvmRddReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "textFile", new object[] { filePath, minPartitions }).ToString());
return new RDDIpcProxy(jvmRddReference);
}
public IRDDProxy WholeTextFiles(string filePath, int minPartitions)
{
var jvmRddReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "wholeTextFiles", new object[] { filePath, minPartitions }).ToString());
var jvmRddReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "wholeTextFiles", new object[] { filePath, minPartitions }).ToString());
return new RDDIpcProxy(jvmRddReference);
}
public IRDDProxy BinaryFiles(string filePath, int minPartitions)
{
var jvmRddReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "binaryFiles", new object[] { filePath, minPartitions }).ToString());
var jvmRddReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "binaryFiles", new object[] { filePath, minPartitions }).ToString());
return new RDDIpcProxy(jvmRddReference);
}
public IRDDProxy SequenceFile(string filePath, string keyClass, string valueClass, string keyConverterClass, string valueConverterClass, int minSplits, int batchSize)
{
var jvmRddReference = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "sequenceFile",
var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "sequenceFile",
new object[] { jvmJavaContextReference, filePath, keyClass, valueClass, keyConverterClass, valueConverterClass, minSplits, batchSize }));
return new RDDIpcProxy(jvmRddReference);
}
@ -115,7 +125,7 @@ namespace Microsoft.Spark.CSharp.Proxy
public IRDDProxy NewAPIHadoopFile(string filePath, string inputFormatClass, string keyClass, string valueClass, string keyConverterClass, string valueConverterClass, IEnumerable<KeyValuePair<string, string>> conf, int batchSize)
{
var jconf = GetJavaMap<string, string>(conf) as JvmObjectReference;
var jvmRddReference = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "newAPIHadoopFile",
var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "newAPIHadoopFile",
new object[] { jvmJavaContextReference, filePath, inputFormatClass, keyClass, valueClass, keyConverterClass, valueConverterClass, jconf, batchSize }));
return new RDDIpcProxy(jvmRddReference);
}
@ -123,7 +133,7 @@ namespace Microsoft.Spark.CSharp.Proxy
public IRDDProxy NewAPIHadoopRDD(string inputFormatClass, string keyClass, string valueClass, string keyConverterClass, string valueConverterClass, IEnumerable<KeyValuePair<string, string>> conf, int batchSize)
{
var jconf = GetJavaMap<string, string>(conf) as JvmObjectReference;
var jvmRddReference = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "newAPIHadoopRDD",
var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "newAPIHadoopRDD",
new object[] { jvmJavaContextReference, inputFormatClass, keyClass, valueClass, keyConverterClass, valueConverterClass, jconf, batchSize }));
return new RDDIpcProxy(jvmRddReference);
}
@ -131,7 +141,7 @@ namespace Microsoft.Spark.CSharp.Proxy
public IRDDProxy HadoopFile(string filePath, string inputFormatClass, string keyClass, string valueClass, string keyConverterClass, string valueConverterClass, IEnumerable<KeyValuePair<string, string>> conf, int batchSize)
{
var jconf = GetJavaMap<string, string>(conf) as JvmObjectReference;
var jvmRddReference = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "hadoopFile",
var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "hadoopFile",
new object[] { jvmJavaContextReference, filePath, inputFormatClass, keyClass, valueClass, keyConverterClass, valueConverterClass, jconf, batchSize }));
return new RDDIpcProxy(jvmRddReference);
}
@ -139,14 +149,14 @@ namespace Microsoft.Spark.CSharp.Proxy
public IRDDProxy HadoopRDD(string inputFormatClass, string keyClass, string valueClass, string keyConverterClass, string valueConverterClass, IEnumerable<KeyValuePair<string, string>> conf, int batchSize)
{
var jconf = GetJavaMap<string, string>(conf) as JvmObjectReference;
var jvmRddReference = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "hadoopRDD",
var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "hadoopRDD",
new object[] { jvmJavaContextReference, inputFormatClass, keyClass, valueClass, keyConverterClass, valueConverterClass, jconf, batchSize }));
return new RDDIpcProxy(jvmRddReference);
}
public IRDDProxy CheckpointFile(string filePath)
{
var jvmRddReference = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "checkpointFile", new object[] { filePath }));
var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "checkpointFile", new object[] { filePath }));
return new RDDIpcProxy(jvmRddReference);
}
@ -160,48 +170,48 @@ namespace Microsoft.Spark.CSharp.Proxy
var jfirst = (rdds.First().RddProxy as RDDIpcProxy).JvmRddReference;
var jrest = GetJavaList(rdds.TakeWhile((r, i) => i > 0).Select(r => (r.RddProxy as RDDIpcProxy).JvmRddReference)) as JvmObjectReference;
var jvmRddReference = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "union", new object[] { jfirst, jrest }));
var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "union", new object[] { jfirst, jrest }));
return new RDDIpcProxy(jvmRddReference);
}
public void AddFile(string path)
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSparkContextReference, "addFile", new object[] { path });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkContextReference, "addFile", new object[] { path });
}
public void SetCheckpointDir(string directory)
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSparkContextReference, "setCheckpointDir", new object[] { directory });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkContextReference, "setCheckpointDir", new object[] { directory });
}
public void SetJobGroup(string groupId, string description, bool interruptOnCancel)
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "setCheckpointDir", new object[] { groupId, description, interruptOnCancel });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "setCheckpointDir", new object[] { groupId, description, interruptOnCancel });
}
public void SetLocalProperty(string key, string value)
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "setLocalProperty", new object[] { key, value });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "setLocalProperty", new object[] { key, value });
}
public string GetLocalProperty(string key)
{
return (string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "getLocalProperty", new object[] { key });
return (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "getLocalProperty", new object[] { key });
}
public string SparkUser
{
get
{
return (string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSparkContextReference, "sparkUser");
return (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkContextReference, "sparkUser");
}
}
public void CancelJobGroup(string groupId)
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "cancelJobGroup", new object[] { groupId });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "cancelJobGroup", new object[] { groupId });
}
public void CancelAllJobs()
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "cancelAllJobs");
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "cancelAllJobs");
}
private IStatusTrackerProxy statusTracker;
@ -211,7 +221,7 @@ namespace Microsoft.Spark.CSharp.Proxy
{
if (statusTracker == null)
{
statusTracker = new StatusTrackerIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "statusTracker")));
statusTracker = new StatusTrackerIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmJavaContextReference, "statusTracker")));
}
return statusTracker;
}
@ -219,26 +229,26 @@ namespace Microsoft.Spark.CSharp.Proxy
public IRDDProxy CreatePairwiseRDD<K, V>(IRDDProxy jvmReferenceOfByteArrayRdd, int numPartitions)
{
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod((jvmReferenceOfByteArrayRdd as RDDIpcProxy).JvmRddReference, "rdd"));
var pairwiseRdd = SparkCLREnvironment.JvmBridge.CallConstructor("org.apache.spark.api.python.PairwiseRDD", rdd);
var pairRddJvmReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(pairwiseRdd, "asJavaPairRDD", new object[] { }).ToString());
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod((jvmReferenceOfByteArrayRdd as RDDIpcProxy).JvmRddReference, "rdd"));
var pairwiseRdd = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.api.python.PairwiseRDD", rdd);
var pairRddJvmReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(pairwiseRdd, "asJavaPairRDD", new object[] { }).ToString());
var jpartitionerJavaReference = SparkCLREnvironment.JvmBridge.CallConstructor("org.apache.spark.api.python.PythonPartitioner", new object[] { numPartitions, 0 });
var partitionedPairRddJvmReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(pairRddJvmReference, "partitionBy", new object[] { jpartitionerJavaReference }).ToString());
var jvmRddReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "valueOfPair", new object[] { partitionedPairRddJvmReference }).ToString());
//var jvmRddReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(partitionedRddJvmReference, "rdd", new object[] { }).ToString());
var jpartitionerJavaReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.api.python.PythonPartitioner", new object[] { numPartitions, 0 });
var partitionedPairRddJvmReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(pairRddJvmReference, "partitionBy", new object[] { jpartitionerJavaReference }).ToString());
var jvmRddReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "valueOfPair", new object[] { partitionedPairRddJvmReference }).ToString());
//var jvmRddReference = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(partitionedRddJvmReference, "rdd", new object[] { }).ToString());
return new RDDIpcProxy(jvmRddReference);
}
public IRDDProxy CreateCSharpRdd(IRDDProxy prevJvmRddReference, byte[] command, Dictionary<string, string> environmentVariables, List<string> pythonIncludes, bool preservesPartitioning, List<Broadcast> broadcastVariables, List<byte[]> accumulator)
{
var hashTableReference = SparkCLREnvironment.JvmBridge.CallConstructor("java.util.Hashtable", new object[] { });
var arrayListReference = SparkCLREnvironment.JvmBridge.CallConstructor("java.util.ArrayList", new object[] { });
var jbroadcastVariables = GetJavaList<JvmObjectReference>(broadcastVariables.Select(x => new JvmObjectReference(x.broadcastObjId)));
var hashTableReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("java.util.Hashtable", new object[] { });
var arrayListReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("java.util.ArrayList", new object[] { });
var jbroadcastVariables = GetJavaList<JvmObjectReference>(jvmBroadcastReferences);
var rdd = new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod((prevJvmRddReference as RDDIpcProxy).JvmRddReference, "rdd"));
var rdd = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod((prevJvmRddReference as RDDIpcProxy).JvmRddReference, "rdd"));
var csRdd = SparkCLREnvironment.JvmBridge.CallConstructor("org.apache.spark.api.csharp.CSharpRDD",
var csRdd = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.api.csharp.CSharpRDD",
new object[]
{
rdd, command, hashTableReference, arrayListReference, preservesPartitioning,
@ -247,24 +257,25 @@ namespace Microsoft.Spark.CSharp.Proxy
jbroadcastVariables, jvmAccumulatorReference
});
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(csRdd, "asJavaRDD")));
return new RDDIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(csRdd, "asJavaRDD")));
}
public IRDDProxy CreateUserDefinedCSharpFunction(string name, byte[] command, string returnType = "string")
{
var jSqlContext = SparkCLREnvironment.JvmBridge.CallConstructor("org.apache.spark.sql.SQLContext", new object[] { (SparkCLREnvironment.SparkContextProxy as SparkContextIpcProxy).jvmSparkContextReference });
var jDataType = SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jSqlContext, "parseDataType", new object[] { "\"" + returnType + "\"" });
var jSqlContext = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.sql.SQLContext", new object[] { jvmSparkContextReference });
var jDataType = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jSqlContext, "parseDataType", new object[] { "\"" + returnType + "\"" });
var jbroadcastVariables = GetJavaList<JvmObjectReference>(jvmBroadcastReferences);
var hashTableReference = SparkCLREnvironment.JvmBridge.CallConstructor("java.util.Hashtable", new object[] { });
var arrayListReference = SparkCLREnvironment.JvmBridge.CallConstructor("java.util.ArrayList", new object[] { });
var hashTableReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("java.util.Hashtable", new object[] { });
var arrayListReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("java.util.ArrayList", new object[] { });
return new RDDIpcProxy(SparkCLREnvironment.JvmBridge.CallConstructor("org.apache.spark.sql.UserDefinedPythonFunction",
return new RDDIpcProxy(SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.sql.UserDefinedPythonFunction",
new object[]
{
name, command, hashTableReference, arrayListReference,
SparkCLREnvironment.ConfigurationService.GetCSharpRDDExternalProcessName(),
"1.0",
arrayListReference, null, jDataType
jbroadcastVariables, jvmAccumulatorReference, jDataType
}));
}
@ -272,24 +283,25 @@ namespace Microsoft.Spark.CSharp.Proxy
public int RunJob(IRDDProxy rdd, IEnumerable<int> partitions, bool allowLocal)
{
var jpartitions = GetJavaList<int>(partitions);
return int.Parse(SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "runJob", new object[] { jvmSparkContextReference, (rdd as RDDIpcProxy).JvmRddReference, jpartitions, allowLocal }).ToString());
return int.Parse(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "runJob", new object[] { jvmSparkContextReference, (rdd as RDDIpcProxy).JvmRddReference, jpartitions, allowLocal }).ToString());
}
public string ReadBroadcastFromFile(string path, out long broadcastId)
public IBroadcastProxy ReadBroadcastFromFile(string path, out long broadcastId)
{
string broadcastObjId = (string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "readBroadcastFromFile", new object[] { jvmJavaContextReference, path });
broadcastId = (long)(double)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(new JvmObjectReference(broadcastObjId), "id");
return broadcastObjId;
JvmObjectReference jbroadcast = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "readBroadcastFromFile", new object[] { jvmJavaContextReference, path }));
broadcastId = (long)(double)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jbroadcast, "id");
jvmBroadcastReferences.Add(jbroadcast);
return new BroadcastIpcProxy(jbroadcast, this);
}
public void UnpersistBroadcast(string broadcastObjId, bool blocking)
{
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(new JvmObjectReference(broadcastObjId), "unpersist", new object[] { blocking });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(new JvmObjectReference(broadcastObjId), "unpersist", new object[] { blocking });
}
public IColumnProxy CreateColumnFromName(string name)
{
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.functions", "col", name)));
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.functions", "col", name)));
}
public IColumnProxy CreateFunction(string name, object self)
@ -298,7 +310,7 @@ namespace Microsoft.Spark.CSharp.Proxy
self = (self as ColumnIpcProxy).ScalaColumnReference;
else if (self is IColumnProxy[])
self = GetJavaSeq<JvmObjectReference>((self as IColumnProxy[]).Select(x => (x as ColumnIpcProxy).ScalaColumnReference));
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.functions", name, self)));
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.functions", name, self)));
}
public IColumnProxy CreateBinaryMathFunction(string name, object self, object other)
@ -307,47 +319,76 @@ namespace Microsoft.Spark.CSharp.Proxy
self = (self as ColumnIpcProxy).ScalaColumnReference;
if (other is ColumnIpcProxy)
other = (self as ColumnIpcProxy).ScalaColumnReference;
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.functions", name, self, other)));
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.functions", name, self, other)));
}
public IColumnProxy CreateWindowFunction(string name)
{
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.functions", name)));
return new ColumnIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.functions", name)));
}
public static JvmObjectReference GetJavaMap<K, V>(IEnumerable<KeyValuePair<K, V>> enumerable)
{
var jmap = SparkCLREnvironment.JvmBridge.CallConstructor("java.util.Hashtable", new object[] { });
var jmap = SparkCLRIpcProxy.JvmBridge.CallConstructor("java.util.Hashtable", new object[] { });
if (enumerable != null)
{
foreach (var item in enumerable)
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jmap, "put", new object[] { item.Key, item.Value });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jmap, "put", new object[] { item.Key, item.Value });
}
return jmap;
}
public static JvmObjectReference GetJavaSet<T>(IEnumerable<T> enumerable)
{
var jset = SparkCLREnvironment.JvmBridge.CallConstructor("java.util.HashSet", new object[] { });
var jset = SparkCLRIpcProxy.JvmBridge.CallConstructor("java.util.HashSet", new object[] { });
if (enumerable != null)
{
foreach (var item in enumerable)
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jset, "add", new object[] { item });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jset, "add", new object[] { item });
}
return jset;
}
public static JvmObjectReference GetJavaList<T>(IEnumerable<T> enumerable)
{
var jlist = SparkCLREnvironment.JvmBridge.CallConstructor("java.util.ArrayList", new object[] { });
var jlist = SparkCLRIpcProxy.JvmBridge.CallConstructor("java.util.ArrayList", new object[] { });
if (enumerable != null)
{
foreach (var item in enumerable)
SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jlist, "add", new object[] { item });
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jlist, "add", new object[] { item });
}
return jlist;
}
public JvmObjectReference GetJavaSeq<T>(IEnumerable<T> enumerable)
public static JvmObjectReference GetJavaSeq<T>(IEnumerable<T> enumerable)
{
return new JvmObjectReference((string)SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "toSeq", GetJavaList<T>(enumerable)));
return new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "toSeq", GetJavaList<T>(enumerable)));
}
public static JvmObjectReference GetJavaStorageLevel(StorageLevelType storageLevelType)
{
return new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.java.StorageLevels", "create",
new object[]
{
StorageLevel.storageLevel[storageLevelType].useDisk,
StorageLevel.storageLevel[storageLevelType].useMemory,
StorageLevel.storageLevel[storageLevelType].useOffHeap,
StorageLevel.storageLevel[storageLevelType].deserialized,
StorageLevel.storageLevel[storageLevelType].replication
}).ToString());
}
}
internal class BroadcastIpcProxy : IBroadcastProxy
{
private JvmObjectReference jvmBroadcastReference;
private SparkContextIpcProxy sparkContextIpcProxy;
public BroadcastIpcProxy(JvmObjectReference jvmBroadcastReference, SparkContextIpcProxy sparkContextIpcProxy)
{
this.jvmBroadcastReference = jvmBroadcastReference;
this.sparkContextIpcProxy = sparkContextIpcProxy;
}
public void Unpersist(bool blocking)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmBroadcastReference, "unpersist", new object[] { blocking });
sparkContextIpcProxy.jvmBroadcastReferences.Remove(jvmBroadcastReference);
}
}
}

Просмотреть файл

@ -15,43 +15,10 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
internal class SqlContextIpcProxy : ISqlContextProxy
{
private JvmObjectReference jvmSqlContextReference;
private ISparkContextProxy sparkContextProxy;
public void CreateSqlContext(ISparkContextProxy scProxy)
public SqlContextIpcProxy(JvmObjectReference jvmSqlContextReference)
{
sparkContextProxy = scProxy;
jvmSqlContextReference = new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "createSQLContext", new object[] { (sparkContextProxy as SparkContextIpcProxy).JvmSparkContextReference }).ToString());
}
public StructField CreateStructField(string name, string dataType, bool isNullable)
{
return new StructField(
new StructFieldIpcProxy(
new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod(
"org.apache.spark.sql.api.csharp.SQLUtils", "createStructField",
new object[] {name, dataType, isNullable}).ToString()
)
)
);
}
public StructType CreateStructType(List<StructField> fields)
{
var fieldsReference = fields.Select(s => (s.StructFieldProxy as StructFieldIpcProxy).JvmStructFieldReference).ToList().Cast<JvmObjectReference>();
//var javaObjectReferenceList = objectList.Cast<JvmObjectReference>().ToList();
var seq =
new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils",
"toSeq", new object[] { fieldsReference }).ToString());
return new StructType(
new StructTypeIpcProxy(
new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "createStructType", new object[] { seq }).ToString()
)
)
);
this.jvmSqlContextReference = jvmSqlContextReference;
}
public IDataFrameProxy ReaDataFrame(string path, StructType schema, Dictionary<string, string> options)
@ -59,14 +26,14 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
//parameter Dictionary<string, string> options is not used right now - it is meant to be passed on to data sources
return new DataFrameIpcProxy(
new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "loadDF", new object[] { jvmSqlContextReference, path, (schema.StructTypeProxy as StructTypeIpcProxy).JvmStructTypeReference }).ToString()
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "loadDF", new object[] { jvmSqlContextReference, path, (schema.StructTypeProxy as StructTypeIpcProxy).JvmStructTypeReference }).ToString()
), this
);
}
public IDataFrameProxy JsonFile(string path)
{
var javaDataFrameReference = SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSqlContextReference, "jsonFile", new object[] {path});
var javaDataFrameReference = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSqlContextReference, "jsonFile", new object[] {path});
var javaObjectReferenceForDataFrame = new JvmObjectReference(javaDataFrameReference.ToString());
return new DataFrameIpcProxy(javaObjectReferenceForDataFrame, this);
}
@ -75,7 +42,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return new DataFrameIpcProxy(
new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod(
"org.apache.spark.sql.api.csharp.SQLUtils", "loadTextFile",
new object[] {jvmSqlContextReference, path, delimiter, (schema.StructTypeProxy as StructTypeIpcProxy).JvmStructTypeReference}).ToString()
), this
@ -86,7 +53,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return new DataFrameIpcProxy(
new JvmObjectReference(
SparkCLREnvironment.JvmBridge.CallStaticJavaMethod(
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod(
"org.apache.spark.sql.api.csharp.SQLUtils", "loadTextFile",
new object[] {jvmSqlContextReference, path, hasHeader, inferSchema}).ToString()
), this
@ -95,7 +62,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
public IDataFrameProxy Sql(string sqlQuery)
{
var javaDataFrameReference = SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmSqlContextReference, "sql", new object[] { sqlQuery });
var javaDataFrameReference = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSqlContextReference, "sql", new object[] { sqlQuery });
var javaObjectReferenceForDataFrame = new JvmObjectReference(javaDataFrameReference.ToString());
return new DataFrameIpcProxy(javaObjectReferenceForDataFrame, this);
}

Просмотреть файл

@ -18,48 +18,48 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
private JvmObjectReference jvmStatusTrackerReference;
public StatusTrackerIpcProxy(JvmObjectReference jStatusTracker)
{
this.jvmStatusTrackerReference = jStatusTracker;
jvmStatusTrackerReference = jStatusTracker;
}
public int[] GetJobIdsForGroup(string jobGroup)
{
return (int[])SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStatusTrackerReference, "getJobIdsForGroup", new object[] { jobGroup });
return (int[])SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStatusTrackerReference, "getJobIdsForGroup", new object[] { jobGroup });
}
public int[] GetActiveStageIds()
{
return (int[])SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStatusTrackerReference, "getActiveStageIds");
return (int[])SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStatusTrackerReference, "getActiveStageIds");
}
public int[] GetActiveJobsIds()
{
return (int[])SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStatusTrackerReference, "getActiveJobsIds");
return (int[])SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStatusTrackerReference, "getActiveJobsIds");
}
public SparkJobInfo GetJobInfo(int jobId)
{
var jobInfoId = SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStatusTrackerReference, "getJobInfo", new object[] { jobId });
var jobInfoId = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStatusTrackerReference, "getJobInfo", new object[] { jobId });
if (jobInfoId == null)
return null;
JvmObjectReference jJobInfo = new JvmObjectReference((string)jobInfoId);
int[] stageIds = (int[])SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jJobInfo, "stageIds");
string status = SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jJobInfo, "status").ToString();
int[] stageIds = (int[])SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jJobInfo, "stageIds");
string status = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jJobInfo, "status").ToString();
return new SparkJobInfo(jobId, stageIds, status);
}
public SparkStageInfo GetStageInfo(int stageId)
{
var stageInfoId = SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStatusTrackerReference, "getStageInfo", new object[] { stageId });
var stageInfoId = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStatusTrackerReference, "getStageInfo", new object[] { stageId });
if (stageInfoId == null)
return null;
JvmObjectReference jStageInfo = new JvmObjectReference((string)stageInfoId);
int currentAttemptId = (int)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "currentAttemptId");
int submissionTime = (int)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "submissionTime");
string name = (string)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "name");
int numTasks = (int)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "numTasks");
int numActiveTasks = (int)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "numActiveTasks");
int numCompletedTasks = (int)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "numCompletedTasks");
int numFailedTasks = (int)SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "numFailedTasks");
int currentAttemptId = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "currentAttemptId");
int submissionTime = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "submissionTime");
string name = (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "name");
int numTasks = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "numTasks");
int numActiveTasks = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "numActiveTasks");
int numCompletedTasks = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "numCompletedTasks");
int numFailedTasks = (int)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jStageInfo, "numFailedTasks");
return new SparkStageInfo(stageId, currentAttemptId, (long)submissionTime, name, numTasks, numActiveTasks, numCompletedTasks, numFailedTasks);
}

Просмотреть файл

@ -27,7 +27,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
public List<IStructFieldProxy> GetStructTypeFields()
{
var fieldsReferenceList = SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStructTypeReference, "fields");
var fieldsReferenceList = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStructTypeReference, "fields");
return (fieldsReferenceList as List<JvmObjectReference>).Select(s => new StructFieldIpcProxy(s)).Cast<IStructFieldProxy>().ToList();
}
}
@ -43,12 +43,12 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
public string GetDataTypeString()
{
return SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStructDataTypeReference, "toString").ToString();
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStructDataTypeReference, "toString").ToString();
}
public string GetDataTypeSimpleString()
{
return SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStructDataTypeReference, "simpleString").ToString();
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStructDataTypeReference, "simpleString").ToString();
}
}
@ -64,17 +64,17 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
public string GetStructFieldName()
{
return SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStructFieldReference, "name").ToString();
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStructFieldReference, "name").ToString();
}
public IStructDataTypeProxy GetStructFieldDataType()
{
return new StructDataTypeIpcProxy(new JvmObjectReference(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStructFieldReference, "dataType").ToString()));
return new StructDataTypeIpcProxy(new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStructFieldReference, "dataType").ToString()));
}
public bool GetStructFieldIsNullable()
{
return bool.Parse(SparkCLREnvironment.JvmBridge.CallNonStaticJavaMethod(jvmStructFieldReference, "nullable").ToString());
return bool.Parse(SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmStructFieldReference, "nullable").ToString());
}
}
}

Просмотреть файл

@ -20,16 +20,11 @@ namespace Microsoft.Spark.CSharp.Sql
{
private ISqlContextProxy sqlContextProxy;
private SparkContext sparkContext;
internal ISqlContextProxy SqlContextProxy { get { return sqlContextProxy; } }
public SqlContext(SparkContext sparkContext)
{
this.sparkContext = sparkContext;
SetSqlContextProxy();
sqlContextProxy.CreateSqlContext(sparkContext.SparkContextProxy);
}
private void SetSqlContextProxy()
{
sqlContextProxy = SparkCLREnvironment.SqlContextProxy;
sqlContextProxy = sparkContext.SparkContextProxy.CreateSqlContext();
}
/// <summary>

Просмотреть файл

@ -45,7 +45,7 @@ namespace Microsoft.Spark.CSharp.Sql
public static StructType CreateStructType(List<StructField> structFields)
{
return SparkCLREnvironment.SqlContextProxy.CreateStructType(structFields);
return new StructType(SparkCLREnvironment.SparkCLRProxy.CreateStructType(structFields));
}
}
@ -95,7 +95,7 @@ namespace Microsoft.Spark.CSharp.Sql
public static StructField CreateStructField(string name, string dataType, bool isNullable)
{
return SparkCLREnvironment.SqlContextProxy.CreateStructField(name, dataType, isNullable);
return new StructField(SparkCLREnvironment.SparkCLRProxy.CreateStructField(name, dataType, isNullable));
}
}

Просмотреть файл

@ -0,0 +1,106 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{D5C2C46E-3FEC-473B-8ABA-3B0FC5A7319C}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>AdapterTest</RootNamespace>
<AssemblyName>AdapterTest</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<ProjectTypeGuids>{3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">10.0</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
<ReferencePath>$(ProgramFiles)\Common Files\microsoft shared\VSTT\$(VisualStudioVersion)\UITestExtensionPackages</ReferencePath>
<IsCodedUITest>False</IsCodedUITest>
<TestProjectType>UnitTest</TestProjectType>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.CSharp" />
<Reference Include="System" />
</ItemGroup>
<Choose>
<When Condition="('$(VisualStudioVersion)' == '10.0' or '$(VisualStudioVersion)' == '') and '$(TargetFrameworkVersion)' == 'v3.5'">
<ItemGroup>
<Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.1.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" />
</ItemGroup>
</When>
<Otherwise>
<ItemGroup>
<Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework" />
</ItemGroup>
</Otherwise>
</Choose>
<ItemGroup>
<Compile Include="SparkCLRTestEnvironment.cs">
<SubType>Code</SubType>
</Compile>
<Compile Include="DataFrameTest.cs" />
<Compile Include="Mocks\MockSparkCLRProxy.cs" />
<Compile Include="Mocks\MockConfigurationService.cs" />
<Compile Include="Mocks\MockDataFrameProxy.cs" />
<Compile Include="Mocks\MockRddProxy.cs" />
<Compile Include="Mocks\MockSparkConfProxy.cs" />
<Compile Include="Mocks\MockSparkContextProxy.cs" />
<Compile Include="Mocks\MockSqlContextProxy.cs" />
<Compile Include="RDDTest.cs" />
<Compile Include="SparkConfTest.cs" />
<Compile Include="SparkContextTest.cs" />
<Compile Include="SqlContextTest.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Adapter\Microsoft.Spark.CSharp\Adapter.csproj">
<Project>{ce999a96-f42b-4e80-b208-709d7f49a77c}</Project>
<Name>Adapter</Name>
</ProjectReference>
</ItemGroup>
<ItemGroup>
<Folder Include="Properties\" />
</ItemGroup>
<Choose>
<When Condition="'$(VisualStudioVersion)' == '10.0' And '$(IsCodedUITest)' == 'True'">
<ItemGroup>
<Reference Include="Microsoft.VisualStudio.QualityTools.CodedUITestFramework, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<Private>False</Private>
</Reference>
<Reference Include="Microsoft.VisualStudio.TestTools.UITest.Common, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<Private>False</Private>
</Reference>
<Reference Include="Microsoft.VisualStudio.TestTools.UITest.Extension, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<Private>False</Private>
</Reference>
<Reference Include="Microsoft.VisualStudio.TestTools.UITesting, Version=10.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<Private>False</Private>
</Reference>
</ItemGroup>
</When>
</Choose>
<Import Project="$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets" Condition="Exists('$(VSToolsPath)\TeamTest\Microsoft.TestTools.targets')" />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

Просмотреть файл

@ -0,0 +1,35 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using AdapterTest.Mocks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Sql;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace AdapterTest
{
/// <summary>
/// Validates interaction between DataFrame and its proxies
/// </summary>
[TestClass]
public class DataFrameTest
{
//TODO - complete impl
[TestMethod]
public void TestJoin()
{
var sqlContext = new SqlContext(new SparkContext("", ""));
var dataFrame = sqlContext.JsonFile(@"c:\path\to\input.json");
var dataFrame2 = sqlContext.JsonFile(@"c:\path\to\input2.json");
var joinedDataFrame = dataFrame.Join(dataFrame2, "JoinCol");
var paramValuesToJoinMethod = (joinedDataFrame.DataFrameProxy as MockDataFrameProxy).mockDataFrameReference as object[];
var paramValuesToSecondDataFrameJsonFileMethod = ((paramValuesToJoinMethod[0] as MockDataFrameProxy).mockDataFrameReference as object[]);
Assert.AreEqual(@"c:\path\to\input2.json", paramValuesToSecondDataFrameJsonFileMethod[0]);
Assert.AreEqual("JoinCol", paramValuesToJoinMethod[1]);
}
}
}

Просмотреть файл

@ -0,0 +1,49 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Configuration;
namespace AdapterTest.Mocks
{
//Fakes is not supported in AppVeyor (CI that will probably be used with CSharpSpark in GitHub)
//Using custom implementation of mock for now. Replace with Moq or similiar framework //TODO
internal class MockConfigurationService : IConfigurationService
{
internal void InjectCSharpSparkWorkerPath(string path)
{
workerPath = path;
}
private string workerPath;
public string GetCSharpRDDExternalProcessName()
{
return workerPath;
}
public int BackendPortNumber
{
get { throw new NotImplementedException(); }
}
public bool IsShipCSharpSparkBinariesToExecutors
{
get { throw new NotImplementedException(); }
}
public string GetCSharpWorkerPath()
{
return "";
}
public IEnumerable<string> GetDriverFiles()
{
return new string[] {};
}
}
}

Просмотреть файл

@ -0,0 +1,235 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Proxy;
namespace AdapterTest.Mocks
{
internal class MockDataFrameProxy : IDataFrameProxy
{
internal object[] mockDataFrameReference;
private ISqlContextProxy mockSqlContextProxy;
public ISqlContextProxy SqlContextProxy
{
get { return mockSqlContextProxy; }
}
//just saving the parameter collection to mock the proxy reference that will be used in Assert statements
internal MockDataFrameProxy(object[] parameterCollection, ISqlContextProxy scProxy)
{
mockDataFrameReference = parameterCollection;
mockSqlContextProxy = scProxy;
}
public void RegisterTempTable(string tableName)
{
throw new NotImplementedException();
}
public long Count()
{
throw new NotImplementedException();
}
public string GetQueryExecution()
{
throw new NotImplementedException();
}
public string GetExecutedPlan()
{
throw new NotImplementedException();
}
public string GetShowString(int numberOfRows, bool truncate)
{
throw new NotImplementedException();
}
public IStructTypeProxy GetSchema()
{
throw new NotImplementedException();
}
public IRDDProxy ToJSON()
{
throw new NotImplementedException();
}
public IRDDProxy ToRDD()
{
throw new NotImplementedException();
}
public IColumnProxy GetColumn(string columnName)
{
throw new NotImplementedException();
}
public object ToObjectSeq(List<object> objectList)
{
throw new NotImplementedException();
}
public IColumnProxy ToColumnSeq(List<IColumnProxy> columnRefList)
{
throw new NotImplementedException();
}
public IDataFrameProxy Select(IColumnProxy columnSequenceReference)
{
throw new NotImplementedException();
}
public IDataFrameProxy Filter(string condition)
{
throw new NotImplementedException();
}
public IGroupedDataProxy GroupBy(string firstColumnName, IColumnProxy otherColumnSequenceReference)
{
throw new NotImplementedException();
}
public IGroupedDataProxy GroupBy(IColumnProxy columnSequenceReference)
{
throw new NotImplementedException();
}
public IGroupedDataProxy GroupBy(object columnSequenceReference)
{
throw new NotImplementedException();
}
public IDataFrameProxy Agg(IGroupedDataProxy scalaGroupedDataReference, System.Collections.Generic.Dictionary<string, string> columnNameAggFunctionDictionary)
{
throw new NotImplementedException();
}
public IDataFrameProxy Join(IDataFrameProxy otherScalaDataFrameReference, string joinColumnName)
{
return new MockDataFrameProxy(new object[] { otherScalaDataFrameReference, joinColumnName }, SqlContextProxy);
}
public IDataFrameProxy Join(IDataFrameProxy otherScalaDataFrameReference, string[] joinColumnNames)
{
throw new NotImplementedException();
}
public IDataFrameProxy Join(IDataFrameProxy otherScalaDataFrameReference, IColumnProxy scalaColumnReference, string joinType)
{
throw new NotImplementedException();
}
public bool IsLocal
{
get { throw new NotImplementedException(); }
}
public void Cache()
{
throw new NotImplementedException();
}
public void Persist(Microsoft.Spark.CSharp.Core.StorageLevelType storageLevelType)
{
throw new NotImplementedException();
}
public void Unpersist(bool blocking)
{
throw new NotImplementedException();
}
public IRDDProxy JavaToCSharp()
{
throw new NotImplementedException();
}
public IDataFrameProxy Limit(int num)
{
throw new NotImplementedException();
}
public IDataFrameProxy Coalesce(int numPartitions)
{
throw new NotImplementedException();
}
public IDataFrameProxy Repartition(int numPartitions)
{
throw new NotImplementedException();
}
public IDataFrameProxy Distinct()
{
throw new NotImplementedException();
}
public IDataFrameProxy Sample(bool withReplacement, double fraction, long? seed)
{
throw new NotImplementedException();
}
public IDataFrameProxy[] RandomSplit(double[] weights, long? seed)
{
throw new NotImplementedException();
}
public IDataFrameProxy Alias(string alias)
{
throw new NotImplementedException();
}
public IDataFrameProxy Describe(string[] columns)
{
throw new NotImplementedException();
}
public IDataFrameProxy SelectExpr(string[] expressions)
{
throw new NotImplementedException();
}
public IGroupedDataProxy Rollup(IColumnProxy[] columns)
{
throw new NotImplementedException();
}
public IGroupedDataProxy Cube(IColumnProxy[] columns)
{
throw new NotImplementedException();
}
public IDataFrameProxy UnionAll(IDataFrameProxy other)
{
throw new NotImplementedException();
}
public IDataFrameProxy Intersect(IDataFrameProxy other)
{
throw new NotImplementedException();
}
public IDataFrameProxy Subtract(IDataFrameProxy other)
{
throw new NotImplementedException();
}
public IDataFrameProxy DropDuplicates(string[] subset)
{
throw new NotImplementedException();
}
public void Drop(IColumnProxy column)
{
throw new NotImplementedException();
}
}
}

Просмотреть файл

@ -0,0 +1,256 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Proxy;
namespace AdapterTest.Mocks
{
internal class MockRddProxy : IRDDProxy
{
internal object[] mockRddReference;
public MockRddProxy(object[] parameterCollection)
{
mockRddReference = parameterCollection;
}
public IRDDProxy Distinct<T>()
{
throw new NotImplementedException();
}
public IRDDProxy Distinct<T>(int numPartitions)
{
throw new NotImplementedException();
}
public IRDDProxy Repartition<T>(int numPartitions)
{
throw new NotImplementedException();
}
public IRDDProxy Coalesce<T>(int numPartitions, bool shuffle)
{
throw new NotImplementedException();
}
public IRDDProxy Sample<T>(bool withReplacement, double fraction, long seed)
{
throw new NotImplementedException();
}
public IRDDProxy[] RandomSplit<T>(double[] weights, long seed)
{
throw new NotImplementedException();
}
public IRDDProxy RandomSampleWithRange<T>(double lb, double ub, long seed)
{
throw new NotImplementedException();
}
public long Count()
{
throw new NotImplementedException();
}
public IRDDProxy Union(IRDDProxy javaRddReferenceOther)
{
return new MockRddProxy(new object[] { this, javaRddReferenceOther });
}
public int CollectAndServe()
{
throw new NotImplementedException();
}
public int PartitionLength()
{
throw new NotImplementedException();
}
public IRDDProxy Cache()
{
throw new NotImplementedException();
}
public IRDDProxy Unpersist()
{
throw new NotImplementedException();
}
public void Checkpoint()
{
throw new NotImplementedException();
}
public bool IsCheckpointed
{
get { throw new NotImplementedException(); }
}
public string GetCheckpointFile()
{
throw new NotImplementedException();
}
public int GetNumPartitions()
{
throw new NotImplementedException();
}
public IRDDProxy Distinct()
{
throw new NotImplementedException();
}
public IRDDProxy Distinct(int numPartitions)
{
throw new NotImplementedException();
}
public IRDDProxy Sample(bool withReplacement, double fraction, long seed)
{
throw new NotImplementedException();
}
public IRDDProxy[] RandomSplit(double[] weights, long seed)
{
throw new NotImplementedException();
}
public IRDDProxy Intersection(IRDDProxy[] other)
{
throw new NotImplementedException();
}
public IRDDProxy Cartesian(IRDDProxy other)
{
throw new NotImplementedException();
}
public IRDDProxy Pipe(string command)
{
throw new NotImplementedException();
}
public IRDDProxy Repartition(int numPartitions)
{
throw new NotImplementedException();
}
public IRDDProxy Coalesce(int numPartitions)
{
throw new NotImplementedException();
}
public IRDDProxy RandomSampleWithRange(double lb, double ub, long seed)
{
throw new NotImplementedException();
}
public string Name
{
get { throw new NotImplementedException(); }
}
public void SetName(string name)
{
throw new NotImplementedException();
}
void IRDDProxy.Cache()
{
throw new NotImplementedException();
}
void IRDDProxy.Unpersist()
{
throw new NotImplementedException();
}
public IRDDProxy Intersection(IRDDProxy other)
{
throw new NotImplementedException();
}
public IRDDProxy Coalesce(int numPartitions, bool shuffle)
{
throw new NotImplementedException();
}
public IRDDProxy SampleByKey(bool withReplacement, Dictionary<string, double> fractions, long seed)
{
throw new NotImplementedException();
}
public IRDDProxy Zip(IRDDProxy other)
{
throw new NotImplementedException();
}
public IRDDProxy ZipWithIndex()
{
throw new NotImplementedException();
}
public IRDDProxy ZipWithUniqueId()
{
throw new NotImplementedException();
}
public string ToDebugString()
{
throw new NotImplementedException();
}
public void SaveAsNewAPIHadoopDataset(IEnumerable<KeyValuePair<string, string>> conf)
{
throw new NotImplementedException();
}
public void SaveAsNewAPIHadoopFile(string path, string outputFormatClass, string keyClass, string valueClass, IEnumerable<KeyValuePair<string, string>> conf)
{
throw new NotImplementedException();
}
public void SaveAsHadoopDataset(IEnumerable<KeyValuePair<string, string>> conf)
{
throw new NotImplementedException();
}
public void SaveAsSequenceFile(string path, string compressionCodecClass)
{
throw new NotImplementedException();
}
public void SaveAsTextFile(string path, string compressionCodecClass)
{
throw new NotImplementedException();
}
public void saveAsHadoopFile(string path, string outputFormatClass, string keyClass, string valueClass, IEnumerable<KeyValuePair<string, string>> conf, string compressionCodecClass)
{
throw new NotImplementedException();
}
public void Persist(Microsoft.Spark.CSharp.Core.StorageLevelType storageLevelType)
{
throw new NotImplementedException();
}
public Microsoft.Spark.CSharp.Core.StorageLevel GetStorageLevel()
{
throw new NotImplementedException();
}
}
}

Просмотреть файл

@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Sql;
namespace AdapterTest.Mocks
{
internal class MockSparkCLRProxy : ISparkCLRProxy
{
public ISparkConfProxy CreateSparkConf(bool loadDefaults = true)
{
return new MockSparkConfProxy();
}
public ISparkContextProxy CreateSparkContext(ISparkConfProxy conf)
{
string master = null;
string appName = null;
string sparkHome = null;
if (conf != null)
{
MockSparkConfProxy proxy = conf as MockSparkConfProxy;
if (proxy.stringConfDictionary.ContainsKey("mockmaster"))
master = proxy.stringConfDictionary["mockmaster"];
if (proxy.stringConfDictionary.ContainsKey("mockappName"))
appName = proxy.stringConfDictionary["mockappName"];
if (proxy.stringConfDictionary.ContainsKey("mockhome"))
sparkHome = proxy.stringConfDictionary["mockhome"];
}
return new MockSparkContextProxy(conf);
}
public IStructFieldProxy CreateStructField(string name, string dataType, bool isNullable)
{
throw new NotImplementedException();
}
public IStructTypeProxy CreateStructType(List<StructField> fields)
{
throw new NotImplementedException();
}
public ISparkContextProxy SparkContextProxy
{
get { throw new NotImplementedException(); }
}
}
}

Просмотреть файл

@ -0,0 +1,59 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Proxy;
namespace AdapterTest.Mocks
{
internal class MockSparkConfProxy : ISparkConfProxy
{
internal Dictionary<string, string> stringConfDictionary = new Dictionary<string, string>();
private Dictionary<string, int> intConfDictionary = new Dictionary<string, int>();
internal const string MockMasterKey = "mockmaster";
public void SetMaster(string master)
{
stringConfDictionary["mockmaster"] = master;
}
internal const string MockAppNameKey = "mockappName";
public void SetAppName(string appName)
{
stringConfDictionary["mockappName"] = appName;
}
internal const string MockHomeKey = "mockhome";
public void SetSparkHome(string sparkHome)
{
stringConfDictionary["mockhome"] = sparkHome;
}
public void Set(string key, string value)
{
stringConfDictionary[key] = value;
}
public int GetInt(string key, int defaultValue)
{
if (intConfDictionary.ContainsKey(key))
{
return intConfDictionary[key];
}
return defaultValue;
}
public string Get(string key, string defaultValue)
{
if (stringConfDictionary.ContainsKey(key))
{
return stringConfDictionary[key];
}
return defaultValue;
}
}
}

Просмотреть файл

@ -0,0 +1,237 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Proxy;
namespace AdapterTest.Mocks
{
internal class MockSparkContextProxy : ISparkContextProxy
{
internal object mockSparkContextReference;
public MockSparkContextProxy(ISparkConfProxy conf)
{
mockSparkContextReference = new object[] { conf };
}
public void AddFile(string filePath)
{ }
public IRDDProxy TextFile(string filePath, int minPartitions)
{
return new MockRddProxy(new object[] { filePath, minPartitions });
}
public void Stop()
{
mockSparkContextReference = null;
}
public IRDDProxy CreateCSharpRdd(IRDDProxy prefvJavaRddReference, byte[] command, Dictionary<string, string> environmentVariables, List<string> cSharpIncludes, bool preservePartitioning, List<Broadcast> broadcastVariables, List<byte[]> accumulator)
{
throw new NotImplementedException();
}
public IRDDProxy CreatePairwiseRDD<K, V>(IRDDProxy javaReferenceInByteArrayRdd, int numPartitions)
{
throw new NotImplementedException();
}
public void SetLogLevel(string logLevel)
{
throw new NotImplementedException();
}
public string Version
{
get { throw new NotImplementedException(); }
}
public long StartTime
{
get { throw new NotImplementedException(); }
}
public int DefaultParallelism
{
get { throw new NotImplementedException(); }
}
public int DefaultMinPartitions
{
get { throw new NotImplementedException(); }
}
public IRDDProxy EmptyRDD<T>()
{
throw new NotImplementedException();
}
public IRDDProxy WholeTextFiles(string filePath, int minPartitions)
{
throw new NotImplementedException();
}
public IRDDProxy BinaryFiles(string filePath, int minPartitions)
{
throw new NotImplementedException();
}
public IRDDProxy SequenceFile(string filePath, string keyClass, string valueClass, string keyConverterClass, string valueConverterClass, int minSplits, int batchSize)
{
throw new NotImplementedException();
}
public IRDDProxy NewAPIHadoopFile(string filePath, string inputFormatClass, string keyClass, string valueClass, string keyConverterClass, string valueConverterClass, IEnumerable<KeyValuePair<string, string>> conf, int batchSize)
{
throw new NotImplementedException();
}
public IRDDProxy NewAPIHadoopRDD(string inputFormatClass, string keyClass, string valueClass, string keyConverterClass, string valueConverterClass, IEnumerable<KeyValuePair<string, string>> conf, int batchSize)
{
throw new NotImplementedException();
}
public IRDDProxy HadoopFile(string filePath, string inputFormatClass, string keyClass, string valueClass, string keyConverterClass, string valueConverterClass, IEnumerable<KeyValuePair<string, string>> conf, int batchSize)
{
throw new NotImplementedException();
}
public IRDDProxy HadoopRDD(string inputFormatClass, string keyClass, string valueClass, string keyConverterClass, string valueConverterClass, IEnumerable<KeyValuePair<string, string>> conf, int batchSize)
{
throw new NotImplementedException();
}
public IRDDProxy CheckpointFile(string filePath)
{
throw new NotImplementedException();
}
public IRDDProxy Union<T>(IEnumerable<RDD<T>> rdds)
{
throw new NotImplementedException();
}
public void SetCheckpointDir(string directory)
{
throw new NotImplementedException();
}
public void SetJobGroup(string groupId, string description, bool interruptOnCancel)
{
throw new NotImplementedException();
}
public void SetLocalProperty(string key, string value)
{
throw new NotImplementedException();
}
public string GetLocalProperty(string key)
{
throw new NotImplementedException();
}
public string SparkUser
{
get { throw new NotImplementedException(); }
}
public void CancelJobGroup(string groupId)
{
throw new NotImplementedException();
}
public void CancelAllJobs()
{
throw new NotImplementedException();
}
public IRDDProxy CreateUserDefinedCSharpFunction(string name, byte[] command, string returnType)
{
throw new NotImplementedException();
}
public object GetJavaMap<K, V>(IEnumerable<KeyValuePair<K, V>> enumerable)
{
throw new NotImplementedException();
}
public object GetJavaSet<T>(IEnumerable<T> enumerable)
{
throw new NotImplementedException();
}
public object GetJavaList<T>(IEnumerable<T> enumerable)
{
throw new NotImplementedException();
}
public int RunJob(IRDDProxy rdd, IEnumerable<int> partitions, bool allowLocal)
{
throw new NotImplementedException();
}
public IRDDProxy CreateCSharpRdd(IRDDProxy prefvJavaRddReference, byte[] command, Dictionary<string, string> environmentVariables, List<string> pythonIncludes, bool preservePartitioning, List<Broadcast<dynamic>> broadcastVariables, List<byte[]> accumulator)
{
throw new NotImplementedException();
}
public IStatusTrackerProxy StatusTracker
{
get { throw new NotImplementedException(); }
}
public void Accumulator(string host, int port)
{
throw new NotImplementedException();
}
public IColumnProxy CreateColumnFromName(string name)
{
throw new NotImplementedException();
}
public IColumnProxy CreateFunction(string name, object self)
{
throw new NotImplementedException();
}
public IColumnProxy CreateBinaryMathFunction(string name, object self, object other)
{
throw new NotImplementedException();
}
public IColumnProxy CreateWindowFunction(string name)
{
throw new NotImplementedException();
}
IBroadcastProxy ISparkContextProxy.ReadBroadcastFromFile(string path, out long broadcastId)
{
throw new NotImplementedException();
}
public ISqlContextProxy CreateSqlContext()
{
return new MockSqlContextProxy(this);
}
public IRDDProxy Parallelize(IEnumerable<byte[]> values, int numSlices)
{
throw new NotImplementedException();
}
}
}

Просмотреть файл

@ -0,0 +1,55 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Sql;
namespace AdapterTest.Mocks
{
internal class MockSqlContextProxy : ISqlContextProxy
{
internal object mockSqlContextReference;
private ISparkContextProxy mockSparkContextProxy;
public ISparkContextProxy SparkContextProxy
{
get { return mockSparkContextProxy; }
}
public MockSqlContextProxy(ISparkContextProxy scProxy)
{
mockSqlContextReference = new object();
mockSparkContextProxy = scProxy;
}
public IDataFrameProxy ReaDataFrame(string path, StructType schema, System.Collections.Generic.Dictionary<string, string> options)
{
throw new NotImplementedException();
}
public IDataFrameProxy JsonFile(string path)
{
return new MockDataFrameProxy(new object[] { path }, this);
}
public IDataFrameProxy TextFile(string path, StructType schema, string delimiter)
{
return new MockDataFrameProxy(new object[] { path, schema, delimiter }, this);
}
public IDataFrameProxy TextFile(string path, string delimiter, bool hasHeader, bool inferSchema)
{
return new MockDataFrameProxy(new object[] { path, delimiter, hasHeader, inferSchema }, this);
}
public IDataFrameProxy Sql(string query)
{
return new MockDataFrameProxy(new object[] { query }, this);
}
}
}

Просмотреть файл

@ -0,0 +1,65 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Linq;
using AdapterTest.Mocks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace AdapterTest
{
/// <summary>
/// Validates interaction between RDD and its proxy
/// </summary>
[TestClass]
public class RDDTest
{
//TODO - complete impl
[TestMethod]
public void TestMap()
{
var sparkContext = new SparkContext(null);
var rdd = sparkContext.TextFile(@"c:\path\to\rddinput.txt");
var rdd2 = rdd.Map(s => s.ToLower() + ".com");
Assert.IsTrue(rdd2.GetType() == typeof(PipelinedRDD<string>));
var pipelinedRdd = rdd2 as PipelinedRDD<string>;
var func = pipelinedRdd.func;
var result = func(1, new String[] { "ABC" });
var output = result.First();
Assert.AreEqual("ABC".ToLower() + ".com", output);
var pipelinedRdd2 = rdd2.Map(s => "HTTP://" + s) as PipelinedRDD<string>;
var func2 = pipelinedRdd2.func;
var result2 = func2(1, new String[] { "ABC" });
var output2 = result2.First();
Assert.AreEqual("HTTP://" + ("ABC".ToLower() + ".com"), output2); //tolower and ".com" appended first before adding prefix due to the way func2 wraps func in implementation
}
[TestMethod]
public void TestTextFile()
{
var sparkContext = new SparkContext(null);
var rdd = sparkContext.TextFile(@"c:\path\to\rddinput.txt");
var paramValuesToTextFileMethod = (rdd.RddProxy as MockRddProxy).mockRddReference as object[];
Assert.AreEqual(@"c:\path\to\rddinput.txt", paramValuesToTextFileMethod[0]);
Assert.AreEqual(0, int.Parse(paramValuesToTextFileMethod[1].ToString())); //checking default partitions
}
[TestMethod]
public void TestUnion()
{
var sparkContext = new SparkContext(null);
var rdd = sparkContext.TextFile(@"c:\path\to\rddinput.txt");
var rdd2 = sparkContext.TextFile(@"c:\path\to\rddinput2.txt");
var unionRdd = rdd.Union(rdd2);
var paramValuesToUnionMethod = ((unionRdd.RddProxy as MockRddProxy).mockRddReference as object[]);
var paramValuesToTextFileMethodInRdd1 = (paramValuesToUnionMethod[0] as MockRddProxy).mockRddReference as object[];
Assert.AreEqual(@"c:\path\to\rddinput.txt", paramValuesToTextFileMethodInRdd1[0]);
var paramValuesToTextFileMethodInRdd2 = (paramValuesToUnionMethod[1] as MockRddProxy).mockRddReference as object[];
Assert.AreEqual(@"c:\path\to\rddinput2.txt", paramValuesToTextFileMethodInRdd2[0]);
}
}
}

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using AdapterTest.Mocks;
using Microsoft.Spark.CSharp.Configuration;
using Microsoft.Spark.CSharp.Interop;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Proxy.Ipc;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace AdapterTest
{
[TestClass]
public class SparkCLRTestEnvironment
{
[AssemblyInitialize()]
public static void Initialize(TestContext context)
{
SparkCLREnvironment.SparkCLRProxy = new MockSparkCLRProxy();
}
}
}

Просмотреть файл

@ -0,0 +1,42 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using AdapterTest.Mocks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Proxy.Ipc;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace AdapterTest
{
/// <summary>
/// Validates interaction between SparkConf and its proxy
/// </summary>
[TestClass]
public class SparkConfTest
{
//TODO - complete impl
[TestMethod]
public void TestSparkConfMethods()
{
var sparkConf = new SparkConf();
sparkConf.SetMaster("masterUrl");
Assert.AreEqual("masterUrl", sparkConf.Get(MockSparkConfProxy.MockMasterKey, ""));
sparkConf.SetAppName("app name ");
Assert.AreEqual("app name ", sparkConf.Get(MockSparkConfProxy.MockAppNameKey, ""));
sparkConf.SetSparkHome(@"c:\path\to\sparkfolder");
Assert.AreEqual(@"c:\path\to\sparkfolder", sparkConf.Get(MockSparkConfProxy.MockHomeKey, ""));
Assert.AreEqual("default value", sparkConf.Get("non existent key", "default value"));
Assert.AreEqual(3, sparkConf.GetInt("non existent key", 3));
}
}
}

Просмотреть файл

@ -0,0 +1,65 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Text;
using System.Collections.Generic;
using AdapterTest.Mocks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Proxy.Ipc;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace AdapterTest
{
/// <summary>
/// Validates interaction between SparkContext and its proxy
/// </summary>
[TestClass]
public class SparkContextTest
{
//TODO - complete impl
[TestMethod]
public void TestSparkContextConstructor()
{
var sparkContext = new SparkContext("masterUrl", "appName");
Assert.IsNotNull((sparkContext.SparkContextProxy as MockSparkContextProxy).mockSparkContextReference);
var paramValuesToConstructor = (sparkContext.SparkContextProxy as MockSparkContextProxy).mockSparkContextReference as object[];
Assert.AreEqual("masterUrl", (paramValuesToConstructor[0] as MockSparkConfProxy).stringConfDictionary["mockmaster"]);
Assert.AreEqual("appName", (paramValuesToConstructor[0] as MockSparkConfProxy).stringConfDictionary["mockappName"]);
sparkContext = new SparkContext("masterUrl", "appName", "sparkhome");
Assert.IsNotNull((sparkContext.SparkContextProxy as MockSparkContextProxy).mockSparkContextReference);
paramValuesToConstructor = (sparkContext.SparkContextProxy as MockSparkContextProxy).mockSparkContextReference as object[];
Assert.AreEqual("masterUrl", (paramValuesToConstructor[0] as MockSparkConfProxy).stringConfDictionary["mockmaster"]);
Assert.AreEqual("appName", (paramValuesToConstructor[0] as MockSparkConfProxy).stringConfDictionary["mockappName"]);
Assert.AreEqual("sparkhome", (paramValuesToConstructor[0] as MockSparkConfProxy).stringConfDictionary["mockhome"]);
sparkContext = new SparkContext(null);
Assert.IsNotNull((sparkContext.SparkContextProxy as MockSparkContextProxy).mockSparkContextReference);
paramValuesToConstructor = (sparkContext.SparkContextProxy as MockSparkContextProxy).mockSparkContextReference as object[];
Assert.IsNotNull(paramValuesToConstructor[0]); //because SparkContext constructor create default sparkConf
}
[TestMethod]
public void TestSparkContextStop()
{
var sparkContext = new SparkContext(null);
Assert.IsNotNull((sparkContext.SparkContextProxy as MockSparkContextProxy).mockSparkContextReference);
sparkContext.Stop();
Assert.IsNull((sparkContext.SparkContextProxy as MockSparkContextProxy).mockSparkContextReference);
}
[TestMethod]
public void TestSparkContextTextFile()
{
var sparkContext = new SparkContext(null);
var rdd = sparkContext.TextFile(@"c:\path\to\rddinput.txt", 8);
var paramValuesToTextFileMethod = (rdd.RddProxy as MockRddProxy).mockRddReference as object[];
Assert.AreEqual(@"c:\path\to\rddinput.txt", paramValuesToTextFileMethod[0]);
Assert.AreEqual(8, paramValuesToTextFileMethod[1]);
}
}
}

Просмотреть файл

@ -0,0 +1,58 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using AdapterTest.Mocks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Sql;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace AdapterTest
{
/// <summary>
/// Validates interaction between SqlContext and its proxies
/// </summary>
[TestClass]
public class SqlContextTest
{
//TODO - complete impl
[TestMethod]
public void TestSqlContextConstructor()
{
var sqlContext = new SqlContext(new SparkContext("", ""));
Assert.IsNotNull((sqlContext.SqlContextProxy as MockSqlContextProxy).mockSqlContextReference);
}
[TestMethod]
public void TestJsonFile()
{
var sqlContext = new SqlContext(new SparkContext("", ""));
var dataFrame = sqlContext.JsonFile(@"c:\path\to\input.json");
var paramValuesToJsonFileMethod = (dataFrame.DataFrameProxy as MockDataFrameProxy).mockDataFrameReference as object[];
Assert.AreEqual(@"c:\path\to\input.json", paramValuesToJsonFileMethod[0]);
}
[TestMethod]
public void TestTextFile()
{
var sqlContext = new SqlContext(new SparkContext("", ""));
var dataFrame = sqlContext.TextFile(@"c:\path\to\input.txt");
var paramValuesToTextFileMethod = (dataFrame.DataFrameProxy as MockDataFrameProxy).mockDataFrameReference as object[];
Assert.AreEqual(@"c:\path\to\input.txt", paramValuesToTextFileMethod[0]);
Assert.AreEqual(@",", paramValuesToTextFileMethod[1]);
Assert.IsFalse(bool.Parse(paramValuesToTextFileMethod[2].ToString()));
Assert.IsFalse(bool.Parse(paramValuesToTextFileMethod[3].ToString()));
sqlContext = new SqlContext(new SparkContext("", ""));
dataFrame = sqlContext.TextFile(@"c:\path\to\input.txt", "|", true, true);
paramValuesToTextFileMethod = (dataFrame.DataFrameProxy as MockDataFrameProxy).mockDataFrameReference as object[];
Assert.AreEqual(@"c:\path\to\input.txt", paramValuesToTextFileMethod[0]);
Assert.AreEqual(@"|", paramValuesToTextFileMethod[1]);
Assert.IsTrue(bool.Parse(paramValuesToTextFileMethod[2].ToString()));
Assert.IsTrue(bool.Parse(paramValuesToTextFileMethod[3].ToString()));
}
}
}

Просмотреть файл

@ -331,15 +331,16 @@ namespace Microsoft.Spark.CSharp.Samples
Console.WriteLine(kv);
}
//TO DO: implement PairRDDFunctions.SampleByKey
//[Sample]
internal static void PairRDDSampleByKeySample()
{
var fractions = new Dictionary<string, double> { { "a", 0.2 }, { "b", 0.1 } };
var rdd = SparkCLRSamples.SparkContext.Parallelize(fractions.Keys.ToArray(), 2).Cartesian(SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(0, 1000), 2));
var sample = rdd.Map(t => new KeyValuePair<string, int>(t.Item1, t.Item2)).SampleByKey(false, fractions, 2).GroupByKey().Collect();
//internal static void PairRDDSampleByKeySample()
//{
// var fractions = new Dictionary<string, double> { { "a", 0.2 }, { "b", 0.1 } };
// var rdd = SparkCLRSamples.SparkContext.Parallelize(fractions.Keys.ToArray(), 2).Cartesian(SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(0, 1000), 2));
// var sample = rdd.Map(t => new KeyValuePair<string, int>(t.Item1, t.Item2)).SampleByKey(false, fractions, 2).GroupByKey().Collect();
Console.WriteLine(sample);
}
// Console.WriteLine(sample);
//}
[Sample]
internal static void PairRDDSubtractByKeySample()

Просмотреть файл

@ -23,12 +23,10 @@ namespace Microsoft.Spark.CSharp.Samples
{
ProcessArugments(args);
using (SparkCLREnvironment.Initialize())
{
SparkContext = CreateSparkContext();
RunSamples();
SparkContext.Stop();
}
SparkContext = CreateSparkContext();
SparkContext.SetCheckpointDir(Path.GetTempPath());
RunSamples();
SparkContext.Stop();
}
// Creates and returns a context

Просмотреть файл

@ -14,7 +14,7 @@ namespace Microsoft.Spark.CSharp.Samples
{
class RDDSamples
{
//[Sample]
[Sample]
internal static void RDDCheckpointSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(Enumerable.Range(0, 100), 4);
@ -234,7 +234,7 @@ namespace Microsoft.Spark.CSharp.Samples
rdd.SaveAsTextFile(path);
}
//[Sample]
[Sample]
internal static void RDDCartesianSample()
{
var rdd = SparkCLRSamples.SparkContext.Parallelize(new int[] { 1, 2 }, 1);

Просмотреть файл

@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 2013
VisualStudioVersion = 12.0.30501.0
VisualStudioVersion = 12.0.21005.1
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Adapter", "Adapter\Microsoft.Spark.CSharp\Adapter.csproj", "{CE999A96-F42B-4E80-B208-709D7F49A77C}"
EndProject
@ -13,6 +13,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Samples", "Samples\Microsof
{82C9D3B2-E4FB-4713-B980-948C1E96A10A} = {82C9D3B2-E4FB-4713-B980-948C1E96A10A}
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AdapterTest", "AdapterTest\AdapterTest.csproj", "{D5C2C46E-3FEC-473B-8ABA-3B0FC5A7319C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -31,6 +33,10 @@ Global
{913E6A56-9839-4379-8B3C-855BA9341663}.Debug|Any CPU.Build.0 = Debug|Any CPU
{913E6A56-9839-4379-8B3C-855BA9341663}.Release|Any CPU.ActiveCfg = Release|Any CPU
{913E6A56-9839-4379-8B3C-855BA9341663}.Release|Any CPU.Build.0 = Release|Any CPU
{D5C2C46E-3FEC-473B-8ABA-3B0FC5A7319C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D5C2C46E-3FEC-473B-8ABA-3B0FC5A7319C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D5C2C46E-3FEC-473B-8ABA-3B0FC5A7319C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D5C2C46E-3FEC-473B-8ABA-3B0FC5A7319C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

Просмотреть файл

@ -9,6 +9,7 @@ import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
/**
* RDD used for forking an external C# process and pipe in & out the data
@ -30,7 +31,7 @@ class CSharpRDD(
}
object CSharpRDD {
def createRDDFromArray(sc: SparkContext, arr: Array[Array[Byte]], numSlices: Int): RDD[Array[Byte]] = {
sc.parallelize(arr, numSlices)
def createRDDFromArray(sc: SparkContext, arr: Array[Array[Byte]], numSlices: Int): JavaRDD[Array[Byte]] = {
JavaRDD.fromRDD(sc.parallelize(arr, numSlices))
}
}