From cd104bf31cd2ee049632d506ce30bfc79585d03c Mon Sep 17 00:00:00 2001 From: guwang Date: Thu, 17 Dec 2015 09:48:02 +0800 Subject: [PATCH 1/5] [Spark 1.4.1] - Refactor DataFrame schema and create data type classes, add CreateDataFrame of SqlContexxt --- .../Microsoft.Spark.CSharp/Adapter.csproj | 2 +- .../Interop/Ipc/SerDe.cs | 67 ++++ .../Proxy/ISparkCLRProxy.cs | 2 - .../Proxy/ISqlContextProxy.cs | 1 + .../Proxy/Ipc/SparkCLRIpcProxy.cs | 27 -- .../Proxy/Ipc/SparkContextIpcProxy.cs | 1 - .../Proxy/Ipc/SqlContextIpcProxy.cs | 13 +- .../Microsoft.Spark.CSharp/Sql/DataFrame.cs | 22 +- .../Adapter/Microsoft.Spark.CSharp/Sql/Row.cs | 224 ++++------- .../Microsoft.Spark.CSharp/Sql/SqlContext.cs | 9 +- .../Microsoft.Spark.CSharp/Sql/Struct.cs | 135 ------- .../Microsoft.Spark.CSharp/Sql/Types.cs | 366 ++++++++++++++++++ csharp/AdapterTest/DataFrameTest.cs | 102 ++--- csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs | 11 - .../AdapterTest/Mocks/MockSqlContextProxy.cs | 5 + .../DataFrameSamples.cs | 183 ++++++--- .../Microsoft.Spark.CSharp/Samples.csproj | 4 + .../Microsoft.Spark.CSharp/packages.config | 1 + .../spark/sql/api/csharp/SQLUtils.scala | 51 ++- 19 files changed, 737 insertions(+), 489 deletions(-) delete mode 100644 csharp/Adapter/Microsoft.Spark.CSharp/Sql/Struct.cs create mode 100644 csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj b/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj index eef8436..500f36a 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj @@ -109,7 +109,7 @@ - + diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs index 4e1b136..42f47c1 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs @@ -2,8 +2,13 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Collections.Generic; +using System.Linq; using System.Text; using System.IO; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using Newtonsoft.Json.Serialization; namespace Microsoft.Spark.CSharp.Interop.Ipc { @@ -232,4 +237,66 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc Write(s, buffer); } } + + /// + /// Json.NET Serialization/Deserialization helper class. + /// + public static class JsonSerDe + { + /* + * Note: Scala side uses JSortedObject when parse Json, so the properties in JObject need to be sorted. + */ + + /// + /// Extend method to sort items in a JSON object by keys. + /// + /// + /// + public static JObject SortProperties(this JObject jObject) + { + JObject sortedJObject = new JObject(); + foreach (var property in jObject.Properties().OrderBy(p => p.Name)) + { + if (property.Value is JObject) + { + var propJObject = property.Value as JObject; + sortedJObject.Add(property.Name, propJObject.SortProperties()); + } + else if (property.Value is JArray) + { + var propJArray = property.Value as JArray; + sortedJObject.Add(property.Name, propJArray.SortProperties()); + } + else + { + sortedJObject.Add(property.Name, property.Value); + } + } + return sortedJObject; + } + + /// + /// Extend method to sort items in a JSON array by keys. + /// + public static JArray SortProperties(this JArray jArray) + { + JArray sortedJArray = new JArray(); + if(jArray.Count == 0) return jArray; + + foreach (var item in jArray) + { + if (item is JObject) + { + var sortedItem = ((JObject)item).SortProperties(); + sortedJArray.Add(sortedItem); + } + else if (item is JArray) + { + var sortedItem = ((JArray)item).SortProperties(); + sortedJArray.Add(sortedItem); + } + } + return sortedJArray; + } + } } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs index 1958b0c..403e86d 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkCLRProxy.cs @@ -16,8 +16,6 @@ namespace Microsoft.Spark.CSharp.Proxy ISparkContextProxy SparkContextProxy { get; } ISparkConfProxy CreateSparkConf(bool loadDefaults = true); ISparkContextProxy CreateSparkContext(ISparkConfProxy conf); - IStructFieldProxy CreateStructField(string name, string dataType, bool isNullable); - IStructTypeProxy CreateStructType(List fields); IDStreamProxy CreateCSharpDStream(IDStreamProxy jdstream, byte[] func, string deserializer); IDStreamProxy CreateCSharpTransformed2DStream(IDStreamProxy jdstream, IDStreamProxy jother, byte[] func, string deserializer, string deserializerOther); IDStreamProxy CreateCSharpReducedWindowedDStream(IDStreamProxy jdstream, byte[] func, byte[] invFunc, int windowSeconds, int slideSeconds, string deserializer); diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISqlContextProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISqlContextProxy.cs index e49204c..b84c86e 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISqlContextProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISqlContextProxy.cs @@ -13,6 +13,7 @@ namespace Microsoft.Spark.CSharp.Proxy { internal interface ISqlContextProxy { + IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy); IDataFrameProxy ReadDataFrame(string path, StructType schema, Dictionary options); IDataFrameProxy JsonFile(string path); IDataFrameProxy TextFile(string path, StructType schema, string delimiter); diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs index e4b0276..e0b791d 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkCLRIpcProxy.cs @@ -61,33 +61,6 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc return sparkContextProxy; } - public IStructFieldProxy CreateStructField(string name, string dataType, bool isNullable) - { - return new StructFieldIpcProxy( - new JvmObjectReference( - JvmBridge.CallStaticJavaMethod( - "org.apache.spark.sql.api.csharp.SQLUtils", "createStructField", - new object[] { name, dataType, isNullable }).ToString() - ) - ); - } - - public IStructTypeProxy CreateStructType(List fields) - { - var fieldsReference = fields.Select(s => (s.StructFieldProxy as StructFieldIpcProxy).JvmStructFieldReference).ToList().Cast(); - - var seq = - new JvmObjectReference( - JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", - "toSeq", new object[] { fieldsReference }).ToString()); - - return new StructTypeIpcProxy( - new JvmObjectReference( - JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "createStructType", new object[] { seq }).ToString() - ) - ); - } - public IDStreamProxy CreateCSharpDStream(IDStreamProxy jdstream, byte[] func, string deserializer) { var jvmDStreamReference = SparkCLRIpcProxy.JvmBridge.CallConstructor("org.apache.spark.streaming.api.csharp.CSharpDStream", diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs index 70681b7..51f05e0 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs @@ -121,7 +121,6 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc return new RDDIpcProxy(jvmRddReference); } - //TODO - this implementation is slow. Replace with call to createRDDFromArray() in CSharpRDD public IRDDProxy Parallelize(IEnumerable values, int numSlices) { var jvmRddReference = new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.csharp.CSharpRDD", "createRDDFromArray", new object[] { jvmSparkContextReference, values, numSlices })); diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SqlContextIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SqlContextIpcProxy.cs index 051c214..d66a925 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SqlContextIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SqlContextIpcProxy.cs @@ -21,6 +21,17 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc this.jvmSqlContextReference = jvmSqlContextReference; } + public IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy) + { + var rdd = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "byteArrayRDDToAnyArrayRDD", + new object[] { (rddProxy as RDDIpcProxy).JvmRddReference }).ToString()); + + return new DataFrameIpcProxy( + new JvmObjectReference( + SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSqlContextReference, "applySchemaToPythonRDD", + new object[] { rdd, (structTypeProxy as StructTypeIpcProxy).JvmStructTypeReference }).ToString()), this); + } + public IDataFrameProxy ReadDataFrame(string path, StructType schema, Dictionary options) { //TODO parameter Dictionary options is not used right now - it is meant to be passed on to data sources @@ -44,7 +55,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc new JvmObjectReference( SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod( "org.apache.spark.sql.api.csharp.SQLUtils", "loadTextFile", - new object[] {jvmSqlContextReference, path, delimiter, (schema.StructTypeProxy as StructTypeIpcProxy).JvmStructTypeReference}).ToString() + new object[] { jvmSqlContextReference, path, delimiter, schema.Json}).ToString() ), this ); } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs index d570532..082cd05 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs @@ -24,9 +24,8 @@ namespace Microsoft.Spark.CSharp.Sql private readonly IDataFrameProxy dataFrameProxy; [NonSerialized] private readonly SparkContext sparkContext; - [NonSerialized] + private StructType schema; - private RowSchema rowSchema; [NonSerialized] private RDD rdd; @@ -39,14 +38,14 @@ namespace Microsoft.Spark.CSharp.Sql { get { + if (schema == null) + { + schema = new StructType(dataFrameProxy.GetSchema()); + } if (rdd == null) { - if (rowSchema == null) - { - rowSchema = RowSchema.ParseRowSchemaFromJson(Schema.ToJson()); - } IRDDProxy rddProxy = dataFrameProxy.JavaToCSharp(); - rdd = new RDD(rddProxy, sparkContext, SerializedMode.Row).Map(item => (Row)new RowImpl(item, rowSchema)); + rdd = new RDD(rddProxy, sparkContext, SerializedMode.Row).Map(item => (Row)new RowImpl(item, schema)); } return rdd; } @@ -130,7 +129,7 @@ namespace Microsoft.Spark.CSharp.Sql /// public void ShowSchema() { - List nameTypeList = Schema.Fields.Select(structField => string.Format("{0}:{1}", structField.Name, structField.DataType.SimpleString())).ToList(); + var nameTypeList = Schema.Fields.Select(structField => structField.SimpleString); Console.WriteLine(string.Join(", ", nameTypeList)); } @@ -139,18 +138,13 @@ namespace Microsoft.Spark.CSharp.Sql /// public IEnumerable Collect() { - if (rowSchema == null) - { - rowSchema = RowSchema.ParseRowSchemaFromJson(Schema.ToJson()); - } - IRDDProxy rddProxy = dataFrameProxy.JavaToCSharp(); RDD rdd = new RDD(rddProxy, sparkContext, SerializedMode.Row); int port = rddProxy.CollectAndServe(); foreach (var item in rdd.Collect(port)) { - yield return new RowImpl(item, rowSchema); + yield return new RowImpl(item, Schema); } } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs index 0555510..aeb92f0 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs @@ -2,12 +2,9 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; +using System.Collections; using System.Collections.Generic; using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; using Microsoft.Spark.CSharp.Services; @@ -31,24 +28,24 @@ namespace Microsoft.Spark.CSharp.Sql /// /// Schema for the row. /// - public abstract RowSchema GetSchema(); + public abstract StructType GetSchema(); /// /// Returns the value at position i. /// - public abstract object Get(int i); + public abstract dynamic Get(int i); /// /// Returns the value of a given columnName. /// - public abstract object Get(string columnName); + public abstract dynamic Get(string columnName); /// /// Returns the value at position i, the return value will be cast to type T. /// public T GetAs(int i) { - object o = Get(i); + dynamic o = Get(i); try { T result = (T)o; @@ -66,7 +63,7 @@ namespace Microsoft.Spark.CSharp.Sql /// public T GetAs(string columnName) { - object o = Get(columnName); + dynamic o = Get(columnName); try { T result = (T)o; @@ -80,154 +77,23 @@ namespace Microsoft.Spark.CSharp.Sql } } - /// - /// Schema of Row - /// - [Serializable] - public class RowSchema - { - public string type; - public List columns; - - private readonly Dictionary columnName2Index = new Dictionary(); - - public RowSchema(string type) - { - this.type = type; - this.columns = new List(); - } - - public RowSchema(string type, List cols) - { - int index = 0; - foreach (var col in cols) - { - string columnName = col.name; - - if (string.IsNullOrEmpty(columnName)) - { - throw new Exception(string.Format("Null column name at pos: {0}", index)); - } - - if (columnName2Index.ContainsKey(columnName)) - { - throw new Exception(string.Format("duplicate column name ({0}) in pos ({1}) and ({2})", - columnName, columnName2Index[columnName], index)); - } - columnName2Index[columnName] = index; - index++; - } - - this.type = type; - this.columns = cols; - } - - internal int GetIndexByColumnName(string ColumnName) - { - if (!columnName2Index.ContainsKey(ColumnName)) - { - throw new Exception(string.Format("unknown ColumnName: {0}", ColumnName)); - } - - return columnName2Index[ColumnName]; - } - - public override string ToString() - { - string result; - - if (columns.Any()) - { - List cols = new List(); - foreach (var col in columns) - { - cols.Add(col.ToString()); - } - - result = "{" + - string.Format("type: {0}, columns: [{1}]", type, string.Join(", ", cols.ToArray())) + - "}"; - } - else - { - result = type; - } - - return result; - } - - internal static RowSchema ParseRowSchemaFromJson(string json) - { - JObject joType = JObject.Parse(json); - string type = joType["type"].ToString(); - - List columns = new List(); - List jtFields = joType["fields"].Children().ToList(); - foreach (JToken jtField in jtFields) - { - ColumnSchema col = ColumnSchema.ParseColumnSchemaFromJson(jtField.ToString()); - columns.Add(col); - } - - return new RowSchema(type, columns); - } - - } - - /// - /// Schema for column - /// - [Serializable] - public class ColumnSchema - { - public string name; - public RowSchema type; - public bool nullable; - - public override string ToString() - { - string str = string.Format("name: {0}, type: {1}, nullable: {2}", name, type, nullable); - return "{" + str + "}"; - } - - internal static ColumnSchema ParseColumnSchemaFromJson(string json) - { - ColumnSchema col = new ColumnSchema(); - JObject joField = JObject.Parse(json); - col.name = joField["name"].ToString(); - col.nullable = (bool)(joField["nullable"]); - - JToken jtType = joField["type"]; - if (jtType.Type == JTokenType.String) - { - col.type = new RowSchema(joField["type"].ToString()); - } - else - { - col.type = RowSchema.ParseRowSchemaFromJson(joField["type"].ToString()); - } - - return col; - } - } - [Serializable] internal class RowImpl : Row { - private readonly RowSchema schema; - private readonly object[] values; + private readonly StructType schema; + private readonly dynamic[] values; private readonly int columnCount; - internal RowImpl(object data, RowSchema schema) + internal RowImpl(dynamic data, StructType schema) { - if (data is object[]) + if (data is dynamic[]) { - values = data as object[]; + values = data as dynamic[]; } - else if (data is List) + else if (data is List) { - values = (data as List).ToArray(); + values = (data as List).ToArray(); } else { @@ -237,7 +103,7 @@ namespace Microsoft.Spark.CSharp.Sql this.schema = schema; columnCount = values.Count(); - int schemaColumnCount = this.schema.columns.Count(); + int schemaColumnCount = this.schema.Fields.Count(); if (columnCount != schemaColumnCount) { throw new Exception(string.Format("column count inferred from data ({0}) and schema ({1}) mismatch", columnCount, schemaColumnCount)); @@ -251,12 +117,12 @@ namespace Microsoft.Spark.CSharp.Sql return columnCount; } - public override RowSchema GetSchema() + public override StructType GetSchema() { return schema; } - public override object Get(int i) + public override dynamic Get(int i) { if (i >= columnCount) { @@ -266,9 +132,9 @@ namespace Microsoft.Spark.CSharp.Sql return values[i]; } - public override object Get(string columnName) + public override dynamic Get(string columnName) { - int index = schema.GetIndexByColumnName(columnName); + int index = schema.Fields.FindIndex(f => f.Name == columnName); // case sensitive return Get(index); } @@ -293,19 +159,63 @@ namespace Microsoft.Spark.CSharp.Sql private void Initialize() { + int index = 0; - foreach (var col in schema.columns) + foreach (var field in schema.Fields) { - if (col.type.columns.Any()) // this column itself is a sub-row + if (field.DataType is ArrayType) { - object value = values[index]; + Func convertArrayTypeToStructTypeFunc = (dataType, length) => + { + StructField[] fields = new StructField[length]; + for(int i = 0; i < length ; i++) + { + fields[i] = new StructField(string.Format("_array_{0}", i), dataType); + } + return new StructType(fields); + }; + var elementType = (field.DataType as ArrayType).ElementType; + + // Note: When creating object from json, PySpark converts Json array to Python List (https://github.com/apache/spark/blob/branch-1.4/python/pyspark/sql/types.py, _create_cls(dataType)), + // then Pyrolite unpickler converts Python List to C# ArrayList (https://github.com/irmen/Pyrolite/blob/v4.10/README.txt). So values[index] should be of type ArrayList; + // In case Python changes its implementation, which means value is not of type ArrayList, try cast to object[] because Pyrolite unpickler convert Python Tuple to C# object[]. + object[] valueOfArray = values[index] is ArrayList ? (values[index] as ArrayList).ToArray() : values[index] as object[]; + if (valueOfArray == null) + { + throw new ArgumentException("Cannot parse data of ArrayType: " + field.Name); + } + + values[index] = new RowImpl(valueOfArray, elementType as StructType ?? convertArrayTypeToStructTypeFunc(elementType, valueOfArray.Length)).values; + } + else if (field.DataType is MapType) + { + //TODO + } + else if (field.DataType is StructType) + { + dynamic value = values[index]; if (value != null) { - RowImpl subRow = new RowImpl(values[index], col.type); + var subRow = new RowImpl(values[index], field.DataType as StructType); values[index] = subRow; } } - + else if (field.DataType is DecimalType) + { + //TODO + } + else if (field.DataType is DateType) + { + //TODO + } + else if (field.DataType is StringType) + { + if (values[index] != null) values[index] = values[index].ToString(); + } + else + { + values[index] = values[index]; + } index++; } } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SqlContext.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SqlContext.cs index ac760fd..36ec602 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SqlContext.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SqlContext.cs @@ -8,6 +8,7 @@ using System.Text; using System.Threading.Tasks; using Microsoft.Spark.CSharp.Core; using Microsoft.Spark.CSharp.Interop; +using Microsoft.Spark.CSharp.Interop.Ipc; using Microsoft.Spark.CSharp.Proxy; namespace Microsoft.Spark.CSharp.Sql @@ -39,9 +40,13 @@ namespace Microsoft.Spark.CSharp.Sql return new DataFrame(sqlContextProxy.ReadDataFrame(path, schema, options), sparkContext); } - public DataFrame CreateDataFrame(RDD rdd, StructType schema) + public DataFrame CreateDataFrame(RDD rdd, StructType schema) { - throw new NotImplementedException(); + // pickle RDD, convert to RDD + var rddRow = rdd.Map(r => r); + rddRow.serializedMode = SerializedMode.Row; + + return new DataFrame(sqlContextProxy.CreateDataFrame(rddRow.RddProxy, schema.StructTypeProxy), sparkContext); } /// diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Struct.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Struct.cs deleted file mode 100644 index e75e555..0000000 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Struct.cs +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for full license information. - -using System.Collections.Generic; -using System.Linq; -using Microsoft.Spark.CSharp.Interop; -using Microsoft.Spark.CSharp.Proxy; -using Microsoft.Spark.CSharp.Proxy.Ipc; - -namespace Microsoft.Spark.CSharp.Sql -{ - /// - /// Schema of DataFrame - /// - public class StructType - { - private readonly IStructTypeProxy structTypeProxy; - - internal IStructTypeProxy StructTypeProxy - { - get - { - return structTypeProxy; - } - } - - public List Fields //TODO - avoid calling method everytime - { - get - { - var structTypeFieldJvmObjectReferenceList = - structTypeProxy.GetStructTypeFields(); - var structFieldList = new List(structTypeFieldJvmObjectReferenceList.Count); - structFieldList.AddRange( - structTypeFieldJvmObjectReferenceList.Select( - structTypeFieldJvmObjectReference => new StructField(structTypeFieldJvmObjectReference))); - return structFieldList; - } - } - - public string ToJson() - { - return structTypeProxy.ToJson(); - } - - - internal StructType(IStructTypeProxy structTypeProxy) - { - this.structTypeProxy = structTypeProxy; - } - - public static StructType CreateStructType(List structFields) - { - return new StructType(SparkCLREnvironment.SparkCLRProxy.CreateStructType(structFields)); - } - } - - /// - /// Schema for DataFrame column - /// - public class StructField - { - private readonly IStructFieldProxy structFieldProxy; - - internal IStructFieldProxy StructFieldProxy - { - get - { - return structFieldProxy; - } - } - - public string Name - { - get - { - return structFieldProxy.GetStructFieldName(); - } - } - - public DataType DataType - { - get - { - return new DataType(structFieldProxy.GetStructFieldDataType()); - } - } - - public bool IsNullable - { - get - { - return structFieldProxy.GetStructFieldIsNullable(); - } - } - - internal StructField(IStructFieldProxy strucFieldProxy) - { - structFieldProxy = strucFieldProxy; - } - - public static StructField CreateStructField(string name, string dataType, bool isNullable) - { - return new StructField(SparkCLREnvironment.SparkCLRProxy.CreateStructField(name, dataType, isNullable)); - } - } - - public class DataType - { - private readonly IStructDataTypeProxy structDataTypeProxy; - - internal IStructDataTypeProxy StructDataTypeProxy - { - get - { - return structDataTypeProxy; - } - } - - internal DataType(IStructDataTypeProxy structDataTypeProxy) - { - this.structDataTypeProxy = structDataTypeProxy; - } - - public override string ToString() - { - return structDataTypeProxy.GetDataTypeString(); - } - - public string SimpleString() - { - return structDataTypeProxy.GetDataTypeSimpleString(); - } - } -} \ No newline at end of file diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs new file mode 100644 index 0000000..a4e9274 --- /dev/null +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs @@ -0,0 +1,366 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text.RegularExpressions; +using Microsoft.Spark.CSharp.Interop.Ipc; +using Microsoft.Spark.CSharp.Proxy; +using Microsoft.Spark.CSharp.Proxy.Ipc; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microsoft.Spark.CSharp.Sql +{ + [Serializable] + public abstract class DataType + { + /// + /// Trim "Type" in the end from class name, ToLower() to align with Scala. + /// + public string TypeName + { + get { return NormalizeTypeName(GetType().Name); } + } + + /// + /// return TypeName by default, subclass can override it + /// + public virtual string SimpleString + { + get { return TypeName; } + } + + /// + /// return only type: TypeName by default, subclass can override it + /// + internal virtual object JsonValue { get { return TypeName; } } + + public string Json + { + get + { + var jObject = JsonValue is JObject ? ((JObject)JsonValue).SortProperties() : JsonValue; + return JsonConvert.SerializeObject(jObject, Formatting.None); + } + } + + protected static DataType ParseDataTypeFromJson(string json) + { + return ParseDataTypeFromJson(JToken.Parse(json)); + } + + protected static DataType ParseDataTypeFromJson(JToken json) + { + if (json is JObject) + { + JToken type; + if (((JObject)json).TryGetValue("type", out type)) + { + if (type.Type == JTokenType.Object) // {name: address, type: {type: struct,...},...} + { + var typeJObject = (JObject)type; + if (typeJObject.TryGetValue("type", out type)) + { + Type complexType; + if ((complexType = ComplexTypes.FirstOrDefault(ct => NormalizeTypeName(ct.Name) == type.ToString())) != default(Type)) + { + return ((ComplexType)Activator.CreateInstance(complexType, BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance + , null, new object[] { typeJObject }, null)); // create new instance of ComplexType + } + if (type.ToString() == "udt") + { + // TODO + } + } + throw new ArgumentException(string.Format("Could not parse data type: {0}", type)); + } + else // {name: age, type: bigint,...} // TODO: validate more JTokenType other than Object + { + return ParseAtomicType(type); + } + } + } + else + { + return ParseAtomicType(json); + } + + throw new ArgumentException(string.Format("Could not parse data type: {0}", json)); + } + + private static AtomicType ParseAtomicType(JToken type) + { + Type atomicType; + if ((atomicType = AtomicTypes.FirstOrDefault(at => NormalizeTypeName(at.Name) == type.ToString())) != default(Type)) + { + return (AtomicType)Activator.CreateInstance(atomicType); // create new instance of AtomicType + } + + Match fixedDecimal = DecimalType.FixedDecimal.Match(type.ToString()); + if (fixedDecimal.Success) + { + return new DecimalType(int.Parse(fixedDecimal.Groups[1].Value), int.Parse(fixedDecimal.Groups[2].Value)); + } + + throw new ArgumentException(string.Format("Could not parse data type: {0}", type)); + } + + [NonSerialized] + private static readonly Type[] AtomicTypes = typeof(AtomicType).Assembly.GetTypes().Where(type => + type.IsSubclassOf(typeof(AtomicType))).ToArray(); + + [NonSerialized] + private static readonly Type[] ComplexTypes = typeof(ComplexType).Assembly.GetTypes().Where(type => + type.IsSubclassOf(typeof(ComplexType))).ToArray(); + + [NonSerialized] + private static readonly Func NormalizeTypeName = s => s.Substring(0, s.Length - 4).ToLower(); // trim "Type" at the end of type name + + + } + + [Serializable] + public class AtomicType : DataType + { + } + + [Serializable] + public abstract class ComplexType : DataType + { + public abstract DataType FromJson(JObject json); + public DataType FromJson(string json) + { + return FromJson(JObject.Parse(json)); + } + } + + + [Serializable] + public class NullType : AtomicType { } + + [Serializable] + public class StringType : AtomicType { } + + [Serializable] + public class BinaryType : AtomicType { } + + [Serializable] + public class BooleanType : AtomicType { } + + [Serializable] + public class DateType : AtomicType { } + + [Serializable] + public class TimestampType : AtomicType { } + + [Serializable] + public class DoubleType : AtomicType { } + + [Serializable] + public class FloatType : AtomicType { } + + [Serializable] + public class ByteType : AtomicType { } + + [Serializable] + public class IntegerType : AtomicType { } + + [Serializable] + public class LongType : AtomicType { } + + [Serializable] + public class ShortType : AtomicType { } + + [Serializable] + public class DecimalType : AtomicType + { + public static Regex FixedDecimal = new Regex(@"decimal\((\d+),\s(\d+)\)"); + private int? precision, scale; + public DecimalType(int? precision = null, int? scale = null) + { + this.precision = precision; + this.scale = scale; + } + + internal override object JsonValue + { + get { throw new NotImplementedException(); } + } + + public DataType FromJson(JObject json) + { + throw new NotImplementedException(); + } + } + + [Serializable] + public class ArrayType : ComplexType + { + public DataType ElementType { get { return elementType; } } + public bool ContainsNull { get { return containsNull; } } + + public ArrayType(DataType elementType, bool containsNull = true) + { + this.elementType = elementType; + this.containsNull = containsNull; + } + + internal ArrayType(JObject json) + { + FromJson(json); + } + + public override string SimpleString + { + get { return string.Format("array<{0}>", elementType.SimpleString); } + } + + internal override object JsonValue + { + get + { + return new JObject( + new JProperty("type", TypeName), + new JProperty("elementType", elementType.JsonValue), + new JProperty("containsNull", containsNull)); + } + } + + public override sealed DataType FromJson(JObject json) + { + elementType = ParseDataTypeFromJson(json["elementType"]); + containsNull = (bool)json["containsNull"]; + return this; + } + + private DataType elementType; + private bool containsNull; + } + + [Serializable] + public class MapType : ComplexType + { + internal override object JsonValue + { + get { throw new NotImplementedException(); } + } + + public override DataType FromJson(JObject json) + { + throw new NotImplementedException(); + } + } + + [Serializable] + public class StructField : ComplexType + { + public string Name { get { return name; } } + public DataType DataType { get { return dataType; } } + public bool Nullable { get { return nullable; } } + public JObject Metadata { get { return metadata; } } + + public StructField(string name, DataType dataType, bool nullable = true, JObject metadata = null) + { + this.name = name; + this.dataType = dataType; + this.nullable = nullable; + this.metadata = metadata ?? new JObject(); + } + + internal StructField(JObject json) + { + FromJson(json); + } + + public override string SimpleString { get { return string.Format(@"{0}:{1}", name, dataType.SimpleString); } } + + internal override object JsonValue + { + get + { + return new JObject( + new JProperty("name", name), + new JProperty("type", dataType.JsonValue), + new JProperty("nullable", nullable), + new JProperty("metadata", metadata)); + } + } + + public override sealed DataType FromJson(JObject json) + { + name = json["name"].ToString(); + dataType = ParseDataTypeFromJson(json); + nullable = (bool)json["nullable"]; + metadata = (JObject)json["metadata"]; + return this; + } + + private string name; + private DataType dataType; + private bool nullable; + [NonSerialized] + private JObject metadata; + } + + [Serializable] + public class StructType : ComplexType + { + public List Fields { get { return fields; } } + + internal IStructTypeProxy StructTypeProxy + { + get + { + return structTypeProxy ?? + new StructTypeIpcProxy( + new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "createSchema", + new object[] { Json }).ToString())); + } + } + + public StructType(IEnumerable fields) + { + this.fields = fields.ToList(); + } + + internal StructType(JObject json) + { + FromJson(json); + } + + internal StructType(IStructTypeProxy structTypeProxy) + { + this.structTypeProxy = structTypeProxy; + var jsonSchema = (structTypeProxy as StructTypeIpcProxy).ToJson(); + FromJson(jsonSchema); + } + + public override string SimpleString + { + get { return string.Format(@"struct<{0}>", string.Join(",", fields.Select(f => f.SimpleString))); } + } + + internal override object JsonValue + { + get + { + return new JObject( + new JProperty("type", TypeName), + new JProperty("fields", fields.Select(f => f.JsonValue).ToArray())); + } + } + + public override sealed DataType FromJson(JObject json) + { + var fieldsJObjects = json["fields"].Select(f => (JObject)f); + fields = fieldsJObjects.Select(fieldJObject => (new StructField(fieldJObject))).ToList(); + return this; + } + + [NonSerialized] + private readonly IStructTypeProxy structTypeProxy; + + private List fields; + } + +} diff --git a/csharp/AdapterTest/DataFrameTest.cs b/csharp/AdapterTest/DataFrameTest.cs index d1d5e6e..10e3959 100644 --- a/csharp/AdapterTest/DataFrameTest.cs +++ b/csharp/AdapterTest/DataFrameTest.cs @@ -381,65 +381,65 @@ namespace AdapterTest Assert.AreEqual(expectedResultDataFrameProxy, actualResultDataFrame.DataFrameProxy); } - [Test] - public void TestRdd() - { - const string jsonSchema = @" - { - ""type"" : ""struct"", - ""fields"" : [{ - ""name"" : ""age"", - ""type"" : ""long"", - ""nullable"" : true, - ""metadata"" : { } - }, { - ""name"" : ""id"", - ""type"" : ""string"", - ""nullable"" : true, - ""metadata"" : { } - }, { - ""name"" : ""name"", - ""type"" : ""string"", - ""nullable"" : true, - ""metadata"" : { } - } ] - }"; +// [Test] +// public void TestRdd() +// { +// const string jsonSchema = @" +// { +// ""type"" : ""struct"", +// ""fields"" : [{ +// ""name"" : ""age"", +// ""type"" : ""long"", +// ""nullable"" : true, +// ""metadata"" : { } +// }, { +// ""name"" : ""id"", +// ""type"" : ""string"", +// ""nullable"" : true, +// ""metadata"" : { } +// }, { +// ""name"" : ""name"", +// ""type"" : ""string"", +// ""nullable"" : true, +// ""metadata"" : { } +// } ] +// }"; - Mock mockStructTypeProxy = new Mock(); - mockStructTypeProxy.Setup(m => m.ToJson()).Returns(jsonSchema); - mockDataFrameProxy.Setup(m => m.GetSchema()).Returns(mockStructTypeProxy.Object); +// Mock mockStructTypeProxy = new Mock(); +// mockStructTypeProxy.Setup(m => m.ToJson()).Returns(jsonSchema); +// mockDataFrameProxy.Setup(m => m.GetSchema()).Returns(mockStructTypeProxy.Object); - var rows = new object[] - { - new RowImpl(new object[] - { - 34, - "123", - "Bill" - }, - RowSchema.ParseRowSchemaFromJson(jsonSchema)) - }; +// var rows = new object[] +// { +// new RowImpl(new object[] +// { +// 34, +// "123", +// "Bill" +// }, +// RowSchema.ParseRowSchemaFromJson(jsonSchema)) +// }; - mockDataFrameProxy.Setup(m => m.JavaToCSharp()).Returns(new MockRddProxy(rows)); +// mockDataFrameProxy.Setup(m => m.JavaToCSharp()).Returns(new MockRddProxy(rows)); - var sc = new SparkContext(null); - var dataFrame = new DataFrame(mockDataFrameProxy.Object, sc); +// var sc = new SparkContext(null); +// var dataFrame = new DataFrame(mockDataFrameProxy.Object, sc); - // act - var rdd = dataFrame.Rdd; +// // act +// var rdd = dataFrame.Rdd; - Assert.IsNotNull(rdd); - mockDataFrameProxy.Verify(m => m.JavaToCSharp(), Times.Once); - mockStructTypeProxy.Verify(m => m.ToJson(), Times.Once); +// Assert.IsNotNull(rdd); +// mockDataFrameProxy.Verify(m => m.JavaToCSharp(), Times.Once); +// mockStructTypeProxy.Verify(m => m.ToJson(), Times.Once); - mockDataFrameProxy.Reset(); - mockStructTypeProxy.Reset(); +// mockDataFrameProxy.Reset(); +// mockStructTypeProxy.Reset(); - rdd = dataFrame.Rdd; - Assert.IsNotNull(rdd); - mockDataFrameProxy.Verify(m => m.JavaToCSharp(), Times.Never); - mockStructTypeProxy.Verify(m => m.ToJson(), Times.Never); - } +// rdd = dataFrame.Rdd; +// Assert.IsNotNull(rdd); +// mockDataFrameProxy.Verify(m => m.JavaToCSharp(), Times.Never); +// mockStructTypeProxy.Verify(m => m.ToJson(), Times.Never); +// } [Test] public void TestIsLocal() diff --git a/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs b/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs index 2413819..cc39c13 100644 --- a/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs +++ b/csharp/AdapterTest/Mocks/MockSparkCLRProxy.cs @@ -44,17 +44,6 @@ namespace AdapterTest.Mocks return new MockSparkContextProxy(conf); } - - public IStructFieldProxy CreateStructField(string name, string dataType, bool isNullable) - { - throw new NotImplementedException(); - } - - public IStructTypeProxy CreateStructType(List fields) - { - throw new NotImplementedException(); - } - public ISparkContextProxy SparkContextProxy { get { throw new NotImplementedException(); } diff --git a/csharp/AdapterTest/Mocks/MockSqlContextProxy.cs b/csharp/AdapterTest/Mocks/MockSqlContextProxy.cs index 4366410..cb74842 100644 --- a/csharp/AdapterTest/Mocks/MockSqlContextProxy.cs +++ b/csharp/AdapterTest/Mocks/MockSqlContextProxy.cs @@ -27,6 +27,11 @@ namespace AdapterTest.Mocks mockSparkContextProxy = scProxy; } + public IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy) + { + throw new NotImplementedException(); + } + public IDataFrameProxy ReadDataFrame(string path, StructType schema, System.Collections.Generic.Dictionary options) { throw new NotImplementedException(); diff --git a/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs b/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs index b8ca932..bb61eac 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs @@ -26,6 +26,70 @@ namespace Microsoft.Spark.CSharp.Samples return sqlContext ?? (sqlContext = new SqlContext(SparkCLRSamples.SparkContext)); } + /// + /// Sample to create DataFrame + /// + [Sample] + internal static void DFCreateDataFrameSample() + { + var rddPeople = SparkCLRSamples.SparkContext.Parallelize(new List + { + new object[] { "123", "Bill", 43, new object[]{"Columbus", "Ohio"}, new string[]{"Tel1", "Tel2"} }, + new object[] { "456", "Steve", 34, new object[]{"Seattle", "Washington"}, new string[]{"Tel3", "Tel4"} } + }); + + + var schemaPeople = new StructType(new List + { + new StructField("id", new StringType()), + new StructField("name", new StringType()), + new StructField("age", new IntegerType()), + new StructField("address", new StructType(new List + { + new StructField("city", new StringType()), + new StructField("state", new StringType()) + })), + new StructField("phone numbers", new ArrayType(new StringType())) + }); + var dataFramePeople = GetSqlContext().CreateDataFrame(rddPeople, schemaPeople); + Console.WriteLine("------ Schema of People Data Frame:\r\n"); + dataFramePeople.ShowSchema(); + Console.WriteLine(); + foreach (var people in dataFramePeople.Collect()) + { + string id = people.Get("id"); + string name = people.Get("name"); + int age = people.Get("age"); + Row address = people.Get("address"); + string city = address.Get("city"); + string state = address.Get("state"); + object[] phoneNumbers = people.Get("phone numbers"); + Console.WriteLine("id:{0}, name:{1}, age:{2}, address:(city:{3},state:{4}), phoneNumbers:[{5},{6}]\r\n", id, name, age, city, state, phoneNumbers[0], phoneNumbers[1]); + } + + var rddRequestsLog = SparkCLRSamples.SparkContext.TextFile(SparkCLRSamples.Configuration.GetInputDataPath(RequestsLog), 1).Map(r => r.Split(',').Select(s => (object)s).ToArray()); + + var schemaRequestsLog = new StructType(new List + { + new StructField("guid", new StringType(), false), + new StructField("datacenter", new StringType(), false), + new StructField("abtestid", new StringType(), false), + new StructField("traffictype", new StringType(), false), + }); + var dataFrameRequestsLog = GetSqlContext().CreateDataFrame(rddRequestsLog, schemaRequestsLog); + Console.WriteLine("------ Schema of RequestsLog Data Frame:"); + dataFrameRequestsLog.ShowSchema(); + Console.WriteLine(); + foreach (var request in dataFrameRequestsLog.Collect()) + { + string guid = request.Get("guid"); + string datacenter = request.Get("datacenter"); + string abtestid = request.Get("abtestid"); + string traffictype = request.Get("traffictype"); + Console.WriteLine("guid:{0}, datacenter:{1}, abtestid:{2}, traffictype:{3}\r\n", guid, datacenter, abtestid, traffictype); + } + } + /// /// Sample to show schema of DataFrame /// @@ -41,10 +105,10 @@ namespace Microsoft.Spark.CSharp.Samples /// Sample to get schema of DataFrame in json format /// [Sample] - internal static void DFGetSchemaToJsonSample() + internal static void DFGetSchemaJsonSample() { var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson)); - string json = peopleDataFrame.Schema.ToJson(); + string json = peopleDataFrame.Schema.Json; Console.WriteLine("schema in json format: {0}", json); } @@ -55,7 +119,7 @@ namespace Microsoft.Spark.CSharp.Samples internal static void DFCollectSample() { var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson)); - IEnumerable rows = peopleDataFrame.Collect(); + var rows = peopleDataFrame.Collect().ToArray(); Console.WriteLine("peopleDataFrame:"); int i = 0; @@ -121,15 +185,13 @@ namespace Microsoft.Spark.CSharp.Samples [Sample] internal static void DFTextFileLoadDataFrameSample() { - var requestsSchema = StructType.CreateStructType( - new List - { - StructField.CreateStructField("guid", "string", false), - StructField.CreateStructField("datacenter", "string", false), - StructField.CreateStructField("abtestid", "string", false), - StructField.CreateStructField("traffictype", "string", false), - } - ); + var requestsSchema = new StructType(new List + { + new StructField("guid", new StringType(), false), + new StructField("datacenter", new StringType(), false), + new StructField("abtestid", new StringType(), false), + new StructField("traffictype", new StringType(), false), + }); var requestsDateFrame = GetSqlContext().TextFile(SparkCLRSamples.Configuration.GetInputDataPath(RequestsLog), requestsSchema); requestsDateFrame.RegisterTempTable("requests"); @@ -153,18 +215,17 @@ namespace Microsoft.Spark.CSharp.Samples private static DataFrame GetMetricsDataFrame() { - var metricsSchema = StructType.CreateStructType( - new List + var metricsSchema = new StructType( + new[] { - StructField.CreateStructField("unknown", "string", false), - StructField.CreateStructField("date", "string", false), - StructField.CreateStructField("time", "string", false), - StructField.CreateStructField("guid", "string", false), - StructField.CreateStructField("lang", "string", false), - StructField.CreateStructField("country", "string", false), - StructField.CreateStructField("latency", "integer", false) - } - ); + new StructField("unknown", new StringType(), false), + new StructField("date", new StringType(), false), + new StructField("time", new StringType(), false), + new StructField("guid", new StringType(), false), + new StructField("lang", new StringType(), false), + new StructField("country", new StringType(), false), + new StructField("latency", new StringType(), false), + }); return GetSqlContext() @@ -235,9 +296,9 @@ namespace Microsoft.Spark.CSharp.Samples { var name = peopleDataFrameSchemaField.Name; var dataType = peopleDataFrameSchemaField.DataType; - var stringVal = dataType.ToString(); - var simpleStringVal = dataType.SimpleString(); - var isNullable = peopleDataFrameSchemaField.IsNullable; + var stringVal = dataType.TypeName; + var simpleStringVal = dataType.SimpleString; + var isNullable = peopleDataFrameSchemaField.Nullable; Console.WriteLine("Name={0}, DT.string={1}, DT.simplestring={2}, DT.isNullable={3}", name, stringVal, simpleStringVal, isNullable); } } @@ -387,7 +448,7 @@ namespace Microsoft.Spark.CSharp.Samples var singleValueReplaced = peopleDataFrame.Replace("Bill", "Bill.G"); singleValueReplaced.Show(); - + var multiValueReplaced = peopleDataFrame.ReplaceAll(new List { 14, 34 }, new List { 44, 54 }); multiValueReplaced.Show(); @@ -852,7 +913,7 @@ namespace Microsoft.Spark.CSharp.Samples Console.WriteLine("peopleDataFrame:"); var count = 0; - RowSchema schema = null; + StructType schema = null; Row firstRow = null; foreach (var row in rows) { @@ -938,43 +999,43 @@ namespace Microsoft.Spark.CSharp.Samples /// Verify the schema of people dataframe. /// /// RowSchema of people DataFrame - internal static void VerifySchemaOfPeopleDataFrame(RowSchema schema) + internal static void VerifySchemaOfPeopleDataFrame(StructType schema) { Assert.IsNotNull(schema); - Assert.AreEqual("struct", schema.type); - Assert.IsNotNull(schema.columns); - Assert.AreEqual(4, schema.columns.Count); + Assert.AreEqual("struct", schema.TypeName); + Assert.IsNotNull(schema.Fields); + Assert.AreEqual(4, schema.Fields.Count); // name - var nameColSchema = schema.columns.Find(c => c.name.Equals("name")); + var nameColSchema = schema.Fields.Find(c => c.Name.Equals("name")); Assert.IsNotNull(nameColSchema); - Assert.AreEqual("name", nameColSchema.name); - Assert.IsTrue(nameColSchema.nullable); - Assert.AreEqual("string", nameColSchema.type.ToString()); + Assert.AreEqual("name", nameColSchema.Name); + Assert.IsTrue(nameColSchema.Nullable); + Assert.AreEqual("string", nameColSchema.DataType.TypeName); // id - var idColSchema = schema.columns.Find(c => c.name.Equals("id")); + var idColSchema = schema.Fields.Find(c => c.Name.Equals("id")); Assert.IsNotNull(idColSchema); - Assert.AreEqual("id", idColSchema.name); - Assert.IsTrue(idColSchema.nullable); - Assert.AreEqual("string", nameColSchema.type.ToString()); + Assert.AreEqual("id", idColSchema.Name); + Assert.IsTrue(idColSchema.Nullable); + Assert.AreEqual("string", nameColSchema.DataType.TypeName); // age - var ageColSchema = schema.columns.Find(c => c.name.Equals("age")); + var ageColSchema = schema.Fields.Find(c => c.Name.Equals("age")); Assert.IsNotNull(ageColSchema); - Assert.AreEqual("age", ageColSchema.name); - Assert.IsTrue(ageColSchema.nullable); - Assert.AreEqual("long", ageColSchema.type.ToString()); + Assert.AreEqual("age", ageColSchema.Name); + Assert.IsTrue(ageColSchema.Nullable); + Assert.AreEqual("long", ageColSchema.DataType.TypeName); // address - var addressColSchema = schema.columns.Find(c => c.name.Equals("address")); + var addressColSchema = schema.Fields.Find(c => c.Name.Equals("address")); Assert.IsNotNull(addressColSchema); - Assert.AreEqual("address", addressColSchema.name); - Assert.IsTrue(addressColSchema.nullable); - Assert.IsNotNull(addressColSchema.type); - Assert.AreEqual("struct", addressColSchema.type.type); - Assert.IsNotNull(addressColSchema.type.columns.Find(c => c.name.Equals("state"))); - Assert.IsNotNull(addressColSchema.type.columns.Find(c => c.name.Equals("city"))); + Assert.AreEqual("address", addressColSchema.Name); + Assert.IsTrue(addressColSchema.Nullable); + Assert.IsNotNull(addressColSchema.DataType); + Assert.AreEqual("struct", addressColSchema.DataType.TypeName); + Assert.IsNotNull(((StructType)addressColSchema.DataType).Fields.Find(c => c.Name.Equals("state"))); + Assert.IsNotNull(((StructType)addressColSchema.DataType).Fields.Find(c => c.Name.Equals("city"))); } /// @@ -1032,7 +1093,7 @@ namespace Microsoft.Spark.CSharp.Samples internal static void DFRddSample() { var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson)); - peopleDataFrame.Show(); + //peopleDataFrame.Show(); var dfCount = peopleDataFrame.Count(); var peopleRdd = peopleDataFrame.Rdd; @@ -1100,25 +1161,25 @@ namespace Microsoft.Spark.CSharp.Samples if (x == null && y == null) return true; if (x == null && y != null || x != null && y == null) return false; - foreach (var col in x.GetSchema().columns) + foreach (var col in x.GetSchema().Fields) { - if (!y.GetSchema().columns.Any(c => c.ToString() == col.ToString())) return false; + if (!y.GetSchema().Fields.Any(c => c.Name == col.Name)) return false; - if (col.type.columns.Any()) + if (col.DataType is StructType) { - if (!IsRowEqual(x.GetAs(col.name), y.GetAs(col.name), columnsComparer)) return false; + if (!IsRowEqual(x.GetAs(col.Name), y.GetAs(col.Name), columnsComparer)) return false; } - else if (x.Get(col.name) == null && y.Get(col.name) != null || x.Get(col.name) != null && y.Get(col.name) == null) return false; - else if (x.Get(col.name) != null && y.Get(col.name) != null) + else if (x.Get(col.Name) == null && y.Get(col.Name) != null || x.Get(col.Name) != null && y.Get(col.Name) == null) return false; + else if (x.Get(col.Name) != null && y.Get(col.Name) != null) { Func colComparer; - if (columnsComparer != null && columnsComparer.TryGetValue(col.name, out colComparer)) + if (columnsComparer != null && columnsComparer.TryGetValue(col.Name, out colComparer)) { - if (!colComparer(x.Get(col.name), y.Get(col.name))) return false; + if (!colComparer(x.Get(col.Name), y.Get(col.Name))) return false; } else { - if (x.Get(col.name).ToString() != y.Get(col.name).ToString()) return false; + if (x.Get(col.Name).ToString() != y.Get(col.Name).ToString()) return false; } } } diff --git a/csharp/Samples/Microsoft.Spark.CSharp/Samples.csproj b/csharp/Samples/Microsoft.Spark.CSharp/Samples.csproj index e867663..3fec50d 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/Samples.csproj +++ b/csharp/Samples/Microsoft.Spark.CSharp/Samples.csproj @@ -33,6 +33,10 @@ 4 + + ..\..\packages\Newtonsoft.Json.7.0.1\lib\net45\Newtonsoft.Json.dll + True + ..\..\packages\NUnit.3.0.1\lib\net45\nunit.framework.dll True diff --git a/csharp/Samples/Microsoft.Spark.CSharp/packages.config b/csharp/Samples/Microsoft.Spark.CSharp/packages.config index b183023..4abe7e9 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/packages.config +++ b/csharp/Samples/Microsoft.Spark.CSharp/packages.config @@ -1,4 +1,5 @@  + \ No newline at end of file diff --git a/scala/src/main/org/apache/spark/sql/api/csharp/SQLUtils.scala b/scala/src/main/org/apache/spark/sql/api/csharp/SQLUtils.scala index 6d9a426..74bfe1e 100644 --- a/scala/src/main/org/apache/spark/sql/api/csharp/SQLUtils.scala +++ b/scala/src/main/org/apache/spark/sql/api/csharp/SQLUtils.scala @@ -3,14 +3,16 @@ package org.apache.spark.sql.api.csharp -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.io.{ByteArrayOutputStream, DataOutputStream} import org.apache.spark.SparkContext import org.apache.spark.api.csharp.SerDe import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.{DataType, FloatType, StructField, StructType} +import org.apache.spark.sql.types.{DataType, FloatType, StructType} import org.apache.spark.sql._ +import java.util.{ArrayList => JArrayList} /** * Utility functions for DataFrame in SparkCLR @@ -31,10 +33,6 @@ object SQLUtils { arr.toSeq } - def createStructType(fields : Seq[StructField]): StructType = { - StructType(fields) - } - def getSQLDataType(dataType: String): DataType = { dataType match { case "byte" => org.apache.spark.sql.types.ByteType @@ -54,17 +52,6 @@ object SQLUtils { } } - def createStructField(name: String, dataType: String, nullable: Boolean): StructField = { - val dtObj = getSQLDataType(dataType) - StructField(name, dtObj, nullable) - } - - def createDF(rdd: RDD[Array[Byte]], schema: StructType, sqlContext: SQLContext): DataFrame = { - val num = schema.fields.size - val rowRDD = rdd.map(bytesToRow(_, schema)) - sqlContext.createDataFrame(rowRDD, schema) - } - def dfToRowRDD(df: DataFrame): RDD[Array[Byte]] = { df.map(r => rowToCSharpBytes(r)) } @@ -77,14 +64,6 @@ object SQLUtils { } } - private[this] def bytesToRow(bytes: Array[Byte], schema: StructType): Row = { - val bis = new ByteArrayInputStream(bytes) - val dis = new DataInputStream(bis) - val num = SerDe.readInt(dis) - Row.fromSeq((0 until num).map { i => - doConversion(SerDe.readObject(dis), schema.fields(i).dataType) - }.toSeq) - } private[this] def rowToCSharpBytes(row: Row): Array[Byte] = { val bos = new ByteArrayOutputStream() @@ -176,8 +155,11 @@ object SQLUtils { dfReader.load(path) } - def loadTextFile(sqlContext: SQLContext, path: String, delimiter: String, schema: StructType) : DataFrame = { + def loadTextFile(sqlContext: SQLContext, path: String, delimiter: String, schemaJson: String) : DataFrame = { val stringRdd = sqlContext.sparkContext.textFile(path) + + val schema = createSchema(schemaJson) + val rowRdd = stringRdd.map{s => val columns = s.split(delimiter) columns.length match { @@ -217,4 +199,21 @@ object SQLUtils { sqlContext.createDataFrame(rowRdd, schema) } + + def createSchema(schemaJson: String) : StructType = { + DataType.fromJson(schemaJson).asInstanceOf[StructType] + } + + def byteArrayRDDToAnyArrayRDD(jrdd: JavaRDD[Array[Byte]]) : RDD[Array[_ >: AnyRef]] = { + // JavaRDD[Array[Byte]] -> JavaRDD[Any] + val jrddAny = SerDeUtil.pythonToJava(jrdd, true) + + // JavaRDD[Any] -> RDD[Array[_]] + jrddAny.rdd.map { + case objs: JArrayList[_] => + objs.toArray + case obj if obj.getClass.isArray => + obj.asInstanceOf[Array[_]].toArray + } + } } From e30ecfe23aff2d03aa2f722ac53d0f6cfa77ecb5 Mon Sep 17 00:00:00 2001 From: guwang Date: Thu, 17 Dec 2015 23:11:22 +0800 Subject: [PATCH 2/5] [Spark 1.5.2] Refactor DataFrame schema and create data type classes, add CreateDataFrame of SqlContext --- .../Microsoft.Spark.CSharp/Adapter.csproj | 4 + .../Core/IRDDCollector.cs | 16 ++ .../Microsoft.Spark.CSharp/Core/RDD.cs | 60 +---- .../Core/RDDCollector.cs | 62 +++++ .../Interop/Ipc/SerDe.cs | 3 + .../Microsoft.Spark.CSharp/Proxy/IRDDProxy.cs | 1 + .../Proxy/ISparkContextProxy.cs | 2 +- .../Proxy/Ipc/DataFrameIpcProxy.cs | 2 +- .../Proxy/Ipc/RDDIpcProxy.cs | 11 + .../Proxy/Ipc/SparkContextIpcProxy.cs | 4 +- .../Microsoft.Spark.CSharp/Sql/DataFrame.cs | 37 +-- .../Microsoft.Spark.CSharp/Sql/PythonSerDe.cs | 32 +++ .../Adapter/Microsoft.Spark.CSharp/Sql/Row.cs | 15 +- .../Sql/RowConstructor.cs | 95 +++++++ .../Microsoft.Spark.CSharp/Sql/Types.cs | 39 ++- .../DataFrameSamples.cs | 15 +- .../Worker/Microsoft.Spark.CSharp/Worker.cs | 237 ++++++++++++++---- scala/pom.xml | 4 +- 18 files changed, 497 insertions(+), 142 deletions(-) create mode 100644 csharp/Adapter/Microsoft.Spark.CSharp/Core/IRDDCollector.cs create mode 100644 csharp/Adapter/Microsoft.Spark.CSharp/Core/RDDCollector.cs create mode 100644 csharp/Adapter/Microsoft.Spark.CSharp/Sql/PythonSerDe.cs create mode 100644 csharp/Adapter/Microsoft.Spark.CSharp/Sql/RowConstructor.cs diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj b/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj index 500f36a..8a2cf1d 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj @@ -64,11 +64,13 @@ + + @@ -106,8 +108,10 @@ + + diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Core/IRDDCollector.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Core/IRDDCollector.cs new file mode 100644 index 0000000..55ec480 --- /dev/null +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Core/IRDDCollector.cs @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; + +namespace Microsoft.Spark.CSharp.Core +{ + /// + /// Interface for collect operation on RDD + /// + interface IRDDCollector + { + IEnumerable Collect(int port, SerializedMode serializedMode, Type type); + } +} \ No newline at end of file diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Core/RDD.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Core/RDD.cs index 1b85924..59bf42e 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Core/RDD.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Core/RDD.cs @@ -5,7 +5,6 @@ using System; using System.Collections.Generic; using System.Linq; using System.Text; -using System.Threading.Tasks; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; using System.Reflection; @@ -13,7 +12,9 @@ using System.IO; using System.Net.Sockets; using Microsoft.Spark.CSharp.Proxy; using Microsoft.Spark.CSharp.Interop.Ipc; +using Microsoft.Spark.CSharp.Sql; using Razorvine.Pickle; +using Razorvine.Pickle.Objects; namespace Microsoft.Spark.CSharp.Core { @@ -276,7 +277,7 @@ namespace Microsoft.Spark.CSharp.Core /// public RDD Distinct(int numPartitions = 0) { - return Map(x => new KeyValuePair(x, 0)).ReduceByKey((x,y) => x, numPartitions).Map(x => x.Key); + return Map(x => new KeyValuePair(x, 0)).ReduceByKey((x, y) => x, numPartitions).Map(x => x.Key); } /// @@ -410,7 +411,7 @@ namespace Microsoft.Spark.CSharp.Core else { const double delta = 0.00005; - var gamma = - Math.Log(delta) / total; + var gamma = -Math.Log(delta) / total; return Math.Min(1, fraction + gamma + Math.Sqrt(gamma * gamma + 2 * gamma * fraction)); } } @@ -569,45 +570,10 @@ namespace Microsoft.Spark.CSharp.Core int port = RddProxy.CollectAndServe(); return Collect(port).Cast().ToArray(); } - + internal IEnumerable Collect(int port) { - IFormatter formatter = new BinaryFormatter(); - Socket sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); - sock.Connect("127.0.0.1", port); - - using (NetworkStream s = new NetworkStream(sock)) - { - byte[] buffer; - while ((buffer = SerDe.ReadBytes(s)) != null && buffer.Length > 0) - { - if (serializedMode == SerializedMode.Byte) - { - MemoryStream ms = new MemoryStream(buffer); - yield return formatter.Deserialize(ms); - } - else if (serializedMode == SerializedMode.String) - { - yield return Encoding.UTF8.GetString(buffer); - } - else if (serializedMode == SerializedMode.Pair) - { - MemoryStream ms = new MemoryStream(buffer); - MemoryStream ms2 = new MemoryStream(SerDe.ReadBytes(s)); - - ConstructorInfo ci = typeof(T).GetConstructors()[0]; - yield return ci.Invoke(new object[] { formatter.Deserialize(ms), formatter.Deserialize(ms2) }); - } - else if (serializedMode == SerializedMode.Row) - { - Unpickler unpickler = new Unpickler(); - foreach (var item in (unpickler.loads(buffer) as object[])) - { - yield return item; - } - } - } - } + return RddProxy.RDDCollector.Collect(port, serializedMode, typeof(T)); } /// @@ -836,7 +802,7 @@ namespace Microsoft.Spark.CSharp.Core int left = num - items.Count; IEnumerable partitions = Enumerable.Range(partsScanned, Math.Min(numPartsToTry, totalParts - partsScanned)); var mappedRDD = MapPartitions(new TakeHelper(left).Execute); - int port = sparkContext.SparkContextProxy.RunJob(mappedRDD.RddProxy, partitions, true); + int port = sparkContext.SparkContextProxy.RunJob(mappedRDD.RddProxy, partitions); IEnumerable res = Collect(port).Cast(); items.AddRange(res); @@ -890,7 +856,7 @@ namespace Microsoft.Spark.CSharp.Core { return Map>(v => new KeyValuePair(v, default(T))).SubtractByKey ( - other.Map>(v => new KeyValuePair(v, default(T))), + other.Map>(v => new KeyValuePair(v, default(T))), numPartitions ) .Keys(); @@ -993,7 +959,7 @@ namespace Microsoft.Spark.CSharp.Core int[] starts = new int[num]; if (num > 1) { - var nums = MapPartitions(iter => new [] {iter.Count()}).Collect(); + var nums = MapPartitions(iter => new[] { iter.Count() }).Collect(); for (int i = 0; i < nums.Length - 1; i++) starts[i + 1] = starts[i] + nums[i]; } @@ -1067,10 +1033,10 @@ namespace Microsoft.Spark.CSharp.Core /// public IEnumerable ToLocalIterator() { - foreach(int partition in Enumerable.Range(0, GetNumPartitions())) + foreach (int partition in Enumerable.Range(0, GetNumPartitions())) { var mappedRDD = MapPartitions(iter => iter); - int port = sparkContext.SparkContextProxy.RunJob(mappedRDD.RddProxy, Enumerable.Range(partition, 1), true); + int port = sparkContext.SparkContextProxy.RunJob(mappedRDD.RddProxy, Enumerable.Range(partition, 1)); foreach (T row in Collect(port)) yield return row; } @@ -1360,7 +1326,7 @@ namespace Microsoft.Spark.CSharp.Core internal KeyValuePair Execute(T input) { - return new KeyValuePair(func(input), input); + return new KeyValuePair(func(input), input); } } [Serializable] @@ -1407,7 +1373,7 @@ namespace Microsoft.Spark.CSharp.Core else if (y.Value) return x; else - return new KeyValuePair(func(x.Key, y.Key), false); + return new KeyValuePair(func(x.Key, y.Key), false); } } [Serializable] diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Core/RDDCollector.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Core/RDDCollector.cs new file mode 100644 index 0000000..29b93a3 --- /dev/null +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Core/RDDCollector.cs @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Net.Sockets; +using System.Reflection; +using System.Runtime.Serialization; +using System.Runtime.Serialization.Formatters.Binary; +using System.Text; +using Microsoft.Spark.CSharp.Interop.Ipc; +using Microsoft.Spark.CSharp.Sql; + +namespace Microsoft.Spark.CSharp.Core +{ + /// + /// Used for collect operation on RDD + /// + class RDDCollector : IRDDCollector + { + public IEnumerable Collect(int port, SerializedMode serializedMode, Type type) + { + IFormatter formatter = new BinaryFormatter(); + Socket sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Connect("127.0.0.1", port); + + using (NetworkStream s = new NetworkStream(sock)) + { + byte[] buffer; + while ((buffer = SerDe.ReadBytes(s)) != null && buffer.Length > 0) + { + if (serializedMode == SerializedMode.Byte) + { + MemoryStream ms = new MemoryStream(buffer); + yield return formatter.Deserialize(ms); + } + else if (serializedMode == SerializedMode.String) + { + yield return Encoding.UTF8.GetString(buffer); + } + else if (serializedMode == SerializedMode.Pair) + { + MemoryStream ms = new MemoryStream(buffer); + MemoryStream ms2 = new MemoryStream(SerDe.ReadBytes(s)); + + ConstructorInfo ci = type.GetConstructors()[0]; + yield return ci.Invoke(new object[] { formatter.Deserialize(ms), formatter.Deserialize(ms2) }); + } + else if (serializedMode == SerializedMode.Row) + { + var unpickledObjects = PythonSerDe.GetUnpickledObjects(buffer); + foreach (var item in unpickledObjects) + { + yield return (item as RowConstructor).GetRow(); + } + } + } + } + } + } +} diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs index 42f47c1..be413bc 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs @@ -29,6 +29,9 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc /// public class SerDe //TODO - add ToBytes() for other types { + public static long totalReadNum = 0; + public static long totalWriteNum = 0; + public static byte[] ToBytes(bool value) { return new[] { System.Convert.ToByte(value) }; diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IRDDProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IRDDProxy.cs index 40db645..a5e85f2 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IRDDProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/IRDDProxy.cs @@ -12,6 +12,7 @@ namespace Microsoft.Spark.CSharp.Proxy { internal interface IRDDProxy { + IRDDCollector RDDCollector { get; set; } StorageLevel GetStorageLevel(); void Cache(); void Persist(StorageLevelType storageLevelType); diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkContextProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkContextProxy.cs index 0ccd3cb..cfd412f 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkContextProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkContextProxy.cs @@ -48,7 +48,7 @@ namespace Microsoft.Spark.CSharp.Proxy void CancelJobGroup(string groupId); void CancelAllJobs(); IStatusTrackerProxy StatusTracker { get; } - int RunJob(IRDDProxy rdd, IEnumerable partitions, bool allowLocal); + int RunJob(IRDDProxy rdd, IEnumerable partitions); IBroadcastProxy ReadBroadcastFromFile(string path, out long broadcastId); IRDDProxy CreateCSharpRdd(IRDDProxy prefvJavaRddReference, byte[] command, Dictionary environmentVariables, List pythonIncludes, bool preservePartitioning, List broadcastVariables, List accumulator); IRDDProxy CreatePairwiseRDD(IRDDProxy javaReferenceInByteArrayRdd, int numPartitions); diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DataFrameIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DataFrameIpcProxy.cs index 7ed0178..0a25cc6 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DataFrameIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/DataFrameIpcProxy.cs @@ -76,7 +76,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod( jvmDataFrameReference, "showString", - new object[] { numberOfRows /*, truncate*/ }).ToString(); //1.4.1 does not support second param + new object[] { numberOfRows, truncate }).ToString(); } public bool IsLocal() diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/RDDIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/RDDIpcProxy.cs index 8f0d1b4..084a14d 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/RDDIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/RDDIpcProxy.cs @@ -23,6 +23,17 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc get { return jvmRddReference; } } + private IRDDCollector rddCollector; + public IRDDCollector RDDCollector + { + get { return rddCollector ?? (rddCollector = new RDDCollector()); } + + set + { + rddCollector = value; + } + } + public string Name { get diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs index 51f05e0..ae68d94 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkContextIpcProxy.cs @@ -306,10 +306,10 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc } - public int RunJob(IRDDProxy rdd, IEnumerable partitions, bool allowLocal) + public int RunJob(IRDDProxy rdd, IEnumerable partitions) { var jpartitions = GetJavaList(partitions); - return int.Parse(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "runJob", new object[] { jvmSparkContextReference, (rdd as RDDIpcProxy).JvmRddReference, jpartitions, allowLocal }).ToString()); + return int.Parse(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.api.python.PythonRDD", "runJob", new object[] { jvmSparkContextReference, (rdd as RDDIpcProxy).JvmRddReference, jpartitions }).ToString()); } public IBroadcastProxy ReadBroadcastFromFile(string path, out long broadcastId) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs index 082cd05..d9c533b 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs @@ -28,29 +28,40 @@ namespace Microsoft.Spark.CSharp.Sql private StructType schema; [NonSerialized] private RDD rdd; + [NonSerialized] + private IRDDProxy rddProxy; [NonSerialized] private bool? isLocal; [NonSerialized] - private Random random = new Random(); + private readonly Random random = new Random(); public RDD Rdd { get { - if (schema == null) - { - schema = new StructType(dataFrameProxy.GetSchema()); - } if (rdd == null) { - IRDDProxy rddProxy = dataFrameProxy.JavaToCSharp(); - rdd = new RDD(rddProxy, sparkContext, SerializedMode.Row).Map(item => (Row)new RowImpl(item, schema)); + rddProxy = dataFrameProxy.JavaToCSharp(); + rdd = new RDD(rddProxy, sparkContext, SerializedMode.Row); } return rdd; } } + private IRDDProxy RddProxy + { + get + { + if (rddProxy == null) + { + rddProxy = dataFrameProxy.JavaToCSharp(); + rdd = new RDD(rddProxy, sparkContext, SerializedMode.Row); + } + return rddProxy; + } + } + public bool IsLocal { get @@ -138,14 +149,8 @@ namespace Microsoft.Spark.CSharp.Sql /// public IEnumerable Collect() { - IRDDProxy rddProxy = dataFrameProxy.JavaToCSharp(); - RDD rdd = new RDD(rddProxy, sparkContext, SerializedMode.Row); - - int port = rddProxy.CollectAndServe(); - foreach (var item in rdd.Collect(port)) - { - yield return new RowImpl(item, Schema); - } + int port = RddProxy.CollectAndServe(); + return Rdd.Collect(port).Cast(); } /// @@ -164,7 +169,7 @@ namespace Microsoft.Spark.CSharp.Sql public RDD ToJSON() { var stringRddReference = dataFrameProxy.ToJSON(); - return new RDD(stringRddReference, sparkContext); + return new RDD(stringRddReference, sparkContext, SerializedMode.String); } /// diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/PythonSerDe.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/PythonSerDe.cs new file mode 100644 index 0000000..ccdcd7f --- /dev/null +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/PythonSerDe.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Razorvine.Pickle; +using Razorvine.Pickle.Objects; + +namespace Microsoft.Spark.CSharp.Sql +{ + /// + /// Used for SerDe of Python objects + /// + class PythonSerDe + { + static PythonSerDe() + { + //custom picklers used in PySpark implementation + Unpickler.registerConstructor("pyspark.sql.types", "_parse_datatype_json_string", new StringConstructor()); + Unpickler.registerConstructor("pyspark.sql.types", "_create_row_inbound_converter", new RowConstructor()); + } + + /// + /// Unpickles objects from byte[] + /// + /// + /// + internal static object[] GetUnpickledObjects(byte[] buffer) + { + var unpickler = new Unpickler(); //not making any assumptions about the implementation and hence not a class member + return (unpickler.loads(buffer) as object[]); + } + } +} diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs index aeb92f0..e1be0c1 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs @@ -4,9 +4,12 @@ using System; using System.Collections; using System.Collections.Generic; +using System.IO; using System.Linq; - +using System.Text; using Microsoft.Spark.CSharp.Services; +using Newtonsoft.Json.Linq; +using Razorvine.Pickle; namespace Microsoft.Spark.CSharp.Sql { @@ -81,10 +84,19 @@ namespace Microsoft.Spark.CSharp.Sql internal class RowImpl : Row { private readonly StructType schema; + public dynamic[] Values { get { return values; } } private readonly dynamic[] values; private readonly int columnCount; + public object this[int index] + { + get + { + return Get(index); + } + } + internal RowImpl(dynamic data, StructType schema) { if (data is dynamic[]) @@ -220,4 +232,5 @@ namespace Microsoft.Spark.CSharp.Sql } } } + } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/RowConstructor.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/RowConstructor.cs new file mode 100644 index 0000000..f453b82 --- /dev/null +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/RowConstructor.cs @@ -0,0 +1,95 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using Razorvine.Pickle; + +namespace Microsoft.Spark.CSharp.Sql +{ + /// + /// Used by Unpickler to unpickle pickled objects. It is also used to construct a Row (C# representation of pickled objects). + /// Note this implementation is not ThreadSafe. Collect or RDD conversion where unpickling is done is not expected to be multithreaded. + /// + public class RowConstructor : IObjectConstructor + { + //construction is done using multiple RowConstructor objects with multiple calls to construct method with first call always + //done using schema as the args. Using static variables to store the schema and if it is set (that is if the first call is made) + /// + /// Schema of the DataFrame currently being processed + /// + private static string currentSchema; + + /// + /// Indicates if Schema is already set during construction of this type + /// + private static bool isCurrentSchemaSet; + + /// + /// Arguments used to construct this typ + /// + internal object[] Values; + + /// + /// Schema of the values + /// + internal string Schema; + + public override string ToString() + { + return string.Format("{{{0}}}", string.Join(",", Values)); + } + + /// + /// Used by Unpickler - do not use to construct Row. Use GetRow() method + /// + /// + /// + public object construct(object[] args) + { + if (!isCurrentSchemaSet) //first call always includes schema and schema is always in args[0] + { + currentSchema = args[0].ToString(); + isCurrentSchemaSet = true; + } + + return new RowConstructor { Values = args, Schema = currentSchema }; + } + + /// + /// Used to construct a Row + /// + /// + public Row GetRow() + { + var schema = DataType.ParseDataTypeFromJson(Schema) as StructType; + var row = new RowImpl(GetValues(Values), schema); + + //Resetting schema here so that rows from multiple DataFrames can be processed in the same AppDomain + //next row will have schema - so resetting is fine + isCurrentSchemaSet = false; + currentSchema = null; + + return row; + } + + //removes objects of type RowConstructor and replacing them with actual values + private object[] GetValues(object[] arguments) + { + var values = new object[arguments.Length]; + int i = 0; + foreach (var argument in arguments) + { + if (argument != null && argument.GetType() == typeof(RowConstructor)) + { + values[i++] = (argument as RowConstructor).Values; + } + else + { + values[i++] = argument; + } + + } + + return values; + } + } +} diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs index a4e9274..03d71aa 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs @@ -1,13 +1,16 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Reflection; +using System.Text; using System.Text.RegularExpressions; using Microsoft.Spark.CSharp.Interop.Ipc; using Microsoft.Spark.CSharp.Proxy; using Microsoft.Spark.CSharp.Proxy.Ipc; using Newtonsoft.Json; using Newtonsoft.Json.Linq; +using Razorvine.Pickle; namespace Microsoft.Spark.CSharp.Sql { @@ -44,43 +47,33 @@ namespace Microsoft.Spark.CSharp.Sql } } - protected static DataType ParseDataTypeFromJson(string json) + public static DataType ParseDataTypeFromJson(string json) { return ParseDataTypeFromJson(JToken.Parse(json)); } protected static DataType ParseDataTypeFromJson(JToken json) { - if (json is JObject) + if (json.Type == JTokenType.Object) // {name: address, type: {type: struct,...},...} { JToken type; - if (((JObject)json).TryGetValue("type", out type)) + var typeJObject = (JObject)json; + if (typeJObject.TryGetValue("type", out type)) { - if (type.Type == JTokenType.Object) // {name: address, type: {type: struct,...},...} + Type complexType; + if ((complexType = ComplexTypes.FirstOrDefault(ct => NormalizeTypeName(ct.Name) == type.ToString())) != default(Type)) { - var typeJObject = (JObject)type; - if (typeJObject.TryGetValue("type", out type)) - { - Type complexType; - if ((complexType = ComplexTypes.FirstOrDefault(ct => NormalizeTypeName(ct.Name) == type.ToString())) != default(Type)) - { - return ((ComplexType)Activator.CreateInstance(complexType, BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance - , null, new object[] { typeJObject }, null)); // create new instance of ComplexType - } - if (type.ToString() == "udt") - { - // TODO - } - } - throw new ArgumentException(string.Format("Could not parse data type: {0}", type)); + return ((ComplexType)Activator.CreateInstance(complexType, BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance + , null, new object[] { typeJObject }, null)); // create new instance of ComplexType } - else // {name: age, type: bigint,...} // TODO: validate more JTokenType other than Object + if (type.ToString() == "udt") { - return ParseAtomicType(type); + // TODO } } + throw new ArgumentException(string.Format("Could not parse data type: {0}", type)); } - else + else // {name: age, type: bigint,...} // TODO: validate more JTokenType other than Object { return ParseAtomicType(json); } @@ -289,7 +282,7 @@ namespace Microsoft.Spark.CSharp.Sql public override sealed DataType FromJson(JObject json) { name = json["name"].ToString(); - dataType = ParseDataTypeFromJson(json); + dataType = ParseDataTypeFromJson(json["type"]); nullable = (bool)json["nullable"]; metadata = (JObject)json["metadata"]; return this; diff --git a/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs b/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs index bb61eac..a7aa75e 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs @@ -32,13 +32,6 @@ namespace Microsoft.Spark.CSharp.Samples [Sample] internal static void DFCreateDataFrameSample() { - var rddPeople = SparkCLRSamples.SparkContext.Parallelize(new List - { - new object[] { "123", "Bill", 43, new object[]{"Columbus", "Ohio"}, new string[]{"Tel1", "Tel2"} }, - new object[] { "456", "Steve", 34, new object[]{"Seattle", "Washington"}, new string[]{"Tel3", "Tel4"} } - }); - - var schemaPeople = new StructType(new List { new StructField("id", new StringType()), @@ -51,6 +44,14 @@ namespace Microsoft.Spark.CSharp.Samples })), new StructField("phone numbers", new ArrayType(new StringType())) }); + + var rddPeople = SparkCLRSamples.SparkContext.Parallelize( + new List + { + new object[] { "123", "Bill", 43, new object[]{ "Columbus", "Ohio" }, new string[]{ "Tel1", "Tel2" } }, + new object[] { "456", "Steve", 34, new object[]{ "Seattle", "Washington" }, new string[]{ "Tel3", "Tel4" } } + }); + var dataFramePeople = GetSqlContext().CreateDataFrame(rddPeople, schemaPeople); Console.WriteLine("------ Schema of People Data Frame:\r\n"); dataFramePeople.ShowSchema(); diff --git a/csharp/Worker/Microsoft.Spark.CSharp/Worker.cs b/csharp/Worker/Microsoft.Spark.CSharp/Worker.cs index ffb0f55..d267ed5 100644 --- a/csharp/Worker/Microsoft.Spark.CSharp/Worker.cs +++ b/csharp/Worker/Microsoft.Spark.CSharp/Worker.cs @@ -6,16 +6,19 @@ using System.Collections; using System.Collections.Generic; using System.IO; using System.Net; +using System.Diagnostics; +using System.Linq; using System.Net.Sockets; using System.Reflection; using System.Runtime.Serialization; using System.Runtime.Serialization.Formatters.Binary; - +using System.Text; using Microsoft.Spark.CSharp.Core; using Microsoft.Spark.CSharp.Interop.Ipc; using Microsoft.Spark.CSharp.Services; - +using Microsoft.Spark.CSharp.Sql; using Razorvine.Pickle; +using Razorvine.Pickle.Objects; namespace Microsoft.Spark.CSharp { @@ -123,6 +126,10 @@ namespace Microsoft.Spark.CSharp if (lengthOCommandByteArray > 0) { + Stopwatch commandProcessWatch = new Stopwatch(); + Stopwatch funcProcessWatch = new Stopwatch(); + commandProcessWatch.Start(); + string deserializerMode = SerDe.ReadString(s); logger.LogInfo("Deserializer mode: " + deserializerMode); @@ -139,11 +146,31 @@ namespace Microsoft.Spark.CSharp DateTime initTime = DateTime.UtcNow; int count = 0; - foreach (var message in func(splitIndex, GetIterator(s, deserializerMode))) + + // here we use low level API because we need to get perf metrics + WorkerInputEnumerator inputEnumerator = new WorkerInputEnumerator(s, deserializerMode); + IEnumerable inputEnumerable = Enumerable.Cast(inputEnumerator); + funcProcessWatch.Start(); + IEnumerable outputEnumerable = func(splitIndex, inputEnumerable); + var outputEnumerator = outputEnumerable.GetEnumerator(); + funcProcessWatch.Stop(); + while (true) { - if (object.ReferenceEquals(null, message)) + funcProcessWatch.Start(); + bool hasNext = outputEnumerator.MoveNext(); + funcProcessWatch.Stop(); + if (!hasNext) { - continue; + break; + } + + funcProcessWatch.Start(); + var message = outputEnumerator.Current; + funcProcessWatch.Stop(); + + if (object.ReferenceEquals(null, message)) + { + continue; } byte[] buffer; @@ -200,6 +227,13 @@ namespace Microsoft.Spark.CSharp SerDe.Write(s, 0L); //shuffle.MemoryBytesSpilled SerDe.Write(s, 0L); //shuffle.DiskBytesSpilled + + commandProcessWatch.Stop(); + + // log statistics + inputEnumerator.LogStatistic(); + logger.LogInfo(string.Format("func process time: {0}", funcProcessWatch.ElapsedMilliseconds)); + logger.LogInfo(string.Format("command process time: {0}", commandProcessWatch.ElapsedMilliseconds)); } else { @@ -224,7 +258,7 @@ namespace Microsoft.Spark.CSharp int end = SerDe.ReadInt(s); // check end of stream - if (end == (int)SpecialLengths.END_OF_DATA_SECTION || end == (int)SpecialLengths.END_OF_STREAM) + if (end == (int)SpecialLengths.END_OF_STREAM) { SerDe.Write(s, (int)SpecialLengths.END_OF_STREAM); logger.LogInfo("END_OF_STREAM: " + (int)SpecialLengths.END_OF_STREAM); @@ -237,6 +271,10 @@ namespace Microsoft.Spark.CSharp } s.Flush(); + // log bytes read and write + logger.LogInfo(string.Format("total read bytes: {0}", SerDe.totalReadNum)); + logger.LogInfo(string.Format("total write bytes: {0}", SerDe.totalWriteNum)); + // wait for server to complete, otherwise server gets 'connection reset' exception // Use SerDe.ReadBytes() to detect java side has closed socket properly // ReadBytes() will block until the socket is closed @@ -280,48 +318,163 @@ namespace Microsoft.Spark.CSharp { return (long)(dt - UnixTimeEpoch).TotalMilliseconds; } + } - private static IEnumerable GetIterator(Stream s, string serializedMode) + // Get worker input data from input stream + internal class WorkerInputEnumerator : IEnumerator, IEnumerable + { + private static readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(WorkerInputEnumerator)); + + private Stream inputStream; + private string deserializedMode; + + // cache deserialized object read from input stream + private object[] items = null; + private int pos = 0; + + IFormatter formatter = new BinaryFormatter(); + private Stopwatch watch = new Stopwatch(); + + public WorkerInputEnumerator(Stream inputStream, string deserializedMode) { - logger.LogInfo("Serialized mode in GetIterator: " + serializedMode); - IFormatter formatter = new BinaryFormatter(); - int messageLength; - while ((messageLength = SerDe.ReadInt(s)) != (int)SpecialLengths.END_OF_DATA_SECTION) + this.inputStream = inputStream; + this.deserializedMode = deserializedMode; + } + + public bool MoveNext() + { + watch.Start(); + bool hasNext; + + if ((items != null) && (pos < items.Length)) { - if (messageLength > 0 || serializedMode == "Pair") + hasNext = true; + } + else + { + int messageLength = SerDe.ReadInt(inputStream); + if (messageLength == (int)SpecialLengths.END_OF_DATA_SECTION) { - byte[] buffer = messageLength > 0 ? SerDe.ReadBytes(s, messageLength) : null; - switch (serializedMode) - { - case "String": - yield return SerDe.ToString(buffer); - break; - - case "Row": - Unpickler unpickler = new Unpickler(); - foreach (var item in (unpickler.loads(buffer) as object[])) - { - yield return item; - } - break; - - case "Pair": - messageLength = SerDe.ReadInt(s); - if (messageLength > 0) - { - yield return new KeyValuePair(buffer, SerDe.ReadBytes(s, messageLength)); - } - break; - - case "Byte": - default: - var ms = new MemoryStream(buffer); - dynamic message = formatter.Deserialize(ms); - yield return message; - break; - } + hasNext = false; + logger.LogInfo("END_OF_DATA_SECTION"); + } + else if ((messageLength > 0) || (messageLength == (int)SpecialLengths.NULL)) + { + items = GetNext(messageLength); + Debug.Assert(items != null); + Debug.Assert(items.Any()); + pos = 0; + hasNext = true; + } + else + { + throw new Exception(string.Format("unexpected messageLength: {0}", messageLength)); } } + + watch.Stop(); + return hasNext; + } + + public object Current + { + get + { + int currPos = pos; + pos++; + return items[currPos]; + } + } + + public void Reset() + { + throw new NotImplementedException(); + } + + public IEnumerator GetEnumerator() + { + return this; + } + + public void LogStatistic() + { + logger.LogInfo(string.Format("total elapsed time: {0}", watch.ElapsedMilliseconds)); + } + + private object[] GetNext(int messageLength) + { + object[] result = null; + switch (deserializedMode) + { + case "String": + { + result = new object[1]; + if (messageLength > 0) + { + byte[] buffer = SerDe.ReadBytes(inputStream, messageLength); + result[0] = SerDe.ToString(buffer); + } + else + { + result[0] = null; + } + break; + } + + case "Row": + { + Debug.Assert(messageLength > 0); + byte[] buffer = SerDe.ReadBytes(inputStream, messageLength); + var unpickledObjects = PythonSerDe.GetUnpickledObjects(buffer); + var rows = unpickledObjects.Select(item => (item as RowConstructor).GetRow()).ToList(); + result = rows.Cast().ToArray(); + break; + } + + case "Pair": + { + byte[] pairKey = (messageLength > 0) ? SerDe.ReadBytes(inputStream, messageLength) : null; + byte[] pairValue = null; + + int valueLength = SerDe.ReadInt(inputStream); + if (valueLength > 0) + { + pairValue = SerDe.ReadBytes(inputStream, valueLength); + } + else if (valueLength == (int)SpecialLengths.NULL) + { + pairValue = null; + } + else + { + throw new Exception(string.Format("unexpected valueLength: {0}", valueLength)); + } + + result = new object[1]; + result[0] = new KeyValuePair(pairKey, pairValue); + break; + } + + case "Byte": + default: + { + result = new object[1]; + if (messageLength > 0) + { + byte[] buffer = SerDe.ReadBytes(inputStream, messageLength); + var ms = new MemoryStream(buffer); + result[0] = formatter.Deserialize(ms); + } + else + { + result[0] = null; + } + + break; + } + } + + return result; } } } diff --git a/scala/pom.xml b/scala/pom.xml index 8ee39a0..a6cfb65 100644 --- a/scala/pom.xml +++ b/scala/pom.xml @@ -19,7 +19,7 @@ 1.5 UTF-8 2.10.4 - 1.4.1 + 1.5.2 2.10 @@ -90,7 +90,7 @@ com.databricks spark-csv_2.10 - 1.2.0 + 1.3.0 From 5b6799702ddbecd0f2a7a639de359fb8194afac5 Mon Sep 17 00:00:00 2001 From: guwang Date: Fri, 18 Dec 2015 19:21:21 +0800 Subject: [PATCH 3/5] address CodeFlow comments --- .../Interop/Ipc/JsonSerDe.cs | 68 ++++++++++++++++ .../Interop/Ipc/SerDe.cs | 61 -------------- .../Microsoft.Spark.CSharp/Sql/DataFrame.cs | 10 ++- .../Adapter/Microsoft.Spark.CSharp/Sql/Row.cs | 3 + .../Microsoft.Spark.CSharp/Sql/Types.cs | 13 +-- .../DataFrameSamples.cs | 81 +++++++++++++++---- 6 files changed, 148 insertions(+), 88 deletions(-) create mode 100644 csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JsonSerDe.cs diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JsonSerDe.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JsonSerDe.cs new file mode 100644 index 0000000..ff9a907 --- /dev/null +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JsonSerDe.cs @@ -0,0 +1,68 @@ +using System.Linq; +using Newtonsoft.Json.Linq; + +namespace Microsoft.Spark.CSharp.Interop.Ipc +{ + /// + /// Json.NET Serialization/Deserialization helper class. + /// + public static class JsonSerDe + { + /* + * Note: Scala side uses JSortedObject when parse Json, so the properties in JObject need to be sorted. + */ + + /// + /// Extend method to sort items in a JSON object by keys. + /// + /// + /// + public static JObject SortProperties(this JObject jObject) + { + JObject sortedJObject = new JObject(); + foreach (var property in jObject.Properties().OrderBy(p => p.Name)) + { + if (property.Value is JObject) + { + var propJObject = property.Value as JObject; + sortedJObject.Add(property.Name, propJObject.SortProperties()); + } + else if (property.Value is JArray) + { + var propJArray = property.Value as JArray; + sortedJObject.Add(property.Name, propJArray.SortProperties()); + } + else + { + sortedJObject.Add(property.Name, property.Value); + } + } + return sortedJObject; + } + + /// + /// Extend method to sort items in a JSON array by keys. + /// + public static JArray SortProperties(this JArray jArray) + { + JArray sortedJArray = new JArray(); + if (jArray.Count == 0) return jArray; + + foreach (var item in jArray) + { + if (item is JObject) + { + var sortedItem = ((JObject)item).SortProperties(); + sortedJArray.Add(sortedItem); + } + else if (item is JArray) + { + var sortedItem = ((JArray)item).SortProperties(); + sortedJArray.Add(sortedItem); + } + } + return sortedJArray; + } + } + +} diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs index be413bc..ed1ec7d 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs @@ -241,65 +241,4 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc } } - /// - /// Json.NET Serialization/Deserialization helper class. - /// - public static class JsonSerDe - { - /* - * Note: Scala side uses JSortedObject when parse Json, so the properties in JObject need to be sorted. - */ - - /// - /// Extend method to sort items in a JSON object by keys. - /// - /// - /// - public static JObject SortProperties(this JObject jObject) - { - JObject sortedJObject = new JObject(); - foreach (var property in jObject.Properties().OrderBy(p => p.Name)) - { - if (property.Value is JObject) - { - var propJObject = property.Value as JObject; - sortedJObject.Add(property.Name, propJObject.SortProperties()); - } - else if (property.Value is JArray) - { - var propJArray = property.Value as JArray; - sortedJObject.Add(property.Name, propJArray.SortProperties()); - } - else - { - sortedJObject.Add(property.Name, property.Value); - } - } - return sortedJObject; - } - - /// - /// Extend method to sort items in a JSON array by keys. - /// - public static JArray SortProperties(this JArray jArray) - { - JArray sortedJArray = new JArray(); - if(jArray.Count == 0) return jArray; - - foreach (var item in jArray) - { - if (item is JObject) - { - var sortedItem = ((JObject)item).SortProperties(); - sortedJArray.Add(sortedItem); - } - else if (item is JArray) - { - var sortedItem = ((JArray)item).SortProperties(); - sortedJArray.Add(sortedItem); - } - } - return sortedJArray; - } - } } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs index d9c533b..0c38f5c 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/DataFrame.cs @@ -43,7 +43,8 @@ namespace Microsoft.Spark.CSharp.Sql if (rdd == null) { rddProxy = dataFrameProxy.JavaToCSharp(); - rdd = new RDD(rddProxy, sparkContext, SerializedMode.Row); + rdd = new RDD(rddProxy, sparkContext, SerializedMode.Row); + rdd = rdd.Map(r => r); } return rdd; } @@ -57,6 +58,7 @@ namespace Microsoft.Spark.CSharp.Sql { rddProxy = dataFrameProxy.JavaToCSharp(); rdd = new RDD(rddProxy, sparkContext, SerializedMode.Row); + rdd = rdd.Map(r => r); } return rddProxy; } @@ -154,12 +156,12 @@ namespace Microsoft.Spark.CSharp.Sql } /// - /// Converts the DataFrame to RDD of byte[] + /// Converts the DataFrame to RDD of Row /// /// resulting RDD - public RDD ToRDD() //RDD created using byte representation of GenericRow objects + public RDD ToRDD() //RDD created using byte representation of Row objects { - return new RDD(dataFrameProxy.ToRDD(), sparkContext); + return Rdd; } /// diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs index e1be0c1..2f94f21 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Row.cs @@ -202,6 +202,7 @@ namespace Microsoft.Spark.CSharp.Sql else if (field.DataType is MapType) { //TODO + throw new NotImplementedException(); } else if (field.DataType is StructType) { @@ -215,10 +216,12 @@ namespace Microsoft.Spark.CSharp.Sql else if (field.DataType is DecimalType) { //TODO + throw new NotImplementedException(); } else if (field.DataType is DateType) { //TODO + throw new NotImplementedException(); } else if (field.DataType is StringType) { diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs index 03d71aa..d3e76b8 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs @@ -69,6 +69,7 @@ namespace Microsoft.Spark.CSharp.Sql if (type.ToString() == "udt") { // TODO + throw new NotImplementedException(); } } throw new ArgumentException(string.Format("Could not parse data type: {0}", type)); @@ -249,14 +250,14 @@ namespace Microsoft.Spark.CSharp.Sql { public string Name { get { return name; } } public DataType DataType { get { return dataType; } } - public bool Nullable { get { return nullable; } } + public bool IsNullable { get { return isNullable; } } public JObject Metadata { get { return metadata; } } - public StructField(string name, DataType dataType, bool nullable = true, JObject metadata = null) + public StructField(string name, DataType dataType, bool isNullable = true, JObject metadata = null) { this.name = name; this.dataType = dataType; - this.nullable = nullable; + this.isNullable = isNullable; this.metadata = metadata ?? new JObject(); } @@ -274,7 +275,7 @@ namespace Microsoft.Spark.CSharp.Sql return new JObject( new JProperty("name", name), new JProperty("type", dataType.JsonValue), - new JProperty("nullable", nullable), + new JProperty("nullable", isNullable), new JProperty("metadata", metadata)); } } @@ -283,14 +284,14 @@ namespace Microsoft.Spark.CSharp.Sql { name = json["name"].ToString(); dataType = ParseDataTypeFromJson(json["type"]); - nullable = (bool)json["nullable"]; + isNullable = (bool)json["nullable"]; metadata = (JObject)json["metadata"]; return this; } private string name; private DataType dataType; - private bool nullable; + private bool isNullable; [NonSerialized] private JObject metadata; } diff --git a/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs b/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs index a7aa75e..79ec009 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/DataFrameSamples.cs @@ -4,7 +4,9 @@ using System; using System.Collections.Generic; using System.Collections.ObjectModel; +using System.IO; using System.Linq; +using System.Text.RegularExpressions; using Microsoft.Spark.CSharp.Core; using Microsoft.Spark.CSharp.Sql; using NUnit.Framework; @@ -27,7 +29,7 @@ namespace Microsoft.Spark.CSharp.Samples } /// - /// Sample to create DataFrame + /// Sample to create DataFrame. The RDD is generated from SparkContext Parallelize; the schema is created via object creating. /// [Sample] internal static void DFCreateDataFrameSample() @@ -56,7 +58,8 @@ namespace Microsoft.Spark.CSharp.Samples Console.WriteLine("------ Schema of People Data Frame:\r\n"); dataFramePeople.ShowSchema(); Console.WriteLine(); - foreach (var people in dataFramePeople.Collect()) + var collected = dataFramePeople.Collect().ToArray(); + foreach (var people in collected) { string id = people.Get("id"); string name = people.Get("name"); @@ -68,20 +71,58 @@ namespace Microsoft.Spark.CSharp.Samples Console.WriteLine("id:{0}, name:{1}, age:{2}, address:(city:{3},state:{4}), phoneNumbers:[{5},{6}]\r\n", id, name, age, city, state, phoneNumbers[0], phoneNumbers[1]); } + if (SparkCLRSamples.Configuration.IsValidationEnabled) + { + Assert.AreEqual(2, dataFramePeople.Rdd.Count()); + Assert.AreEqual(schemaPeople.Json, dataFramePeople.Schema.Json); + } + } + + /// + /// Sample to create DataFrame. The RDD is generated from SparkContext TextFile; the schema is created from Json. + /// + [Sample] + internal static void DFCreateDataFrameSample2() + { var rddRequestsLog = SparkCLRSamples.SparkContext.TextFile(SparkCLRSamples.Configuration.GetInputDataPath(RequestsLog), 1).Map(r => r.Split(',').Select(s => (object)s).ToArray()); - var schemaRequestsLog = new StructType(new List - { - new StructField("guid", new StringType(), false), - new StructField("datacenter", new StringType(), false), - new StructField("abtestid", new StringType(), false), - new StructField("traffictype", new StringType(), false), - }); - var dataFrameRequestsLog = GetSqlContext().CreateDataFrame(rddRequestsLog, schemaRequestsLog); + const string schemaRequestsLogJson = @"{ + ""fields"": [{ + ""metadata"": {}, + ""name"": ""guid"", + ""nullable"": false, + ""type"": ""string"" + }, + { + ""metadata"": {}, + ""name"": ""datacenter"", + ""nullable"": false, + ""type"": ""string"" + }, + { + ""metadata"": {}, + ""name"": ""abtestid"", + ""nullable"": false, + ""type"": ""string"" + }, + { + ""metadata"": {}, + ""name"": ""traffictype"", + ""nullable"": false, + ""type"": ""string"" + }], + ""type"": ""struct"" + }"; + + // create schema from parsing Json + StructType requestsLogSchema = DataType.ParseDataTypeFromJson(schemaRequestsLogJson) as StructType; + var dataFrameRequestsLog = GetSqlContext().CreateDataFrame(rddRequestsLog, requestsLogSchema); + Console.WriteLine("------ Schema of RequestsLog Data Frame:"); dataFrameRequestsLog.ShowSchema(); Console.WriteLine(); - foreach (var request in dataFrameRequestsLog.Collect()) + var collected = dataFrameRequestsLog.Collect().ToArray(); + foreach (var request in collected) { string guid = request.Get("guid"); string datacenter = request.Get("datacenter"); @@ -89,6 +130,12 @@ namespace Microsoft.Spark.CSharp.Samples string traffictype = request.Get("traffictype"); Console.WriteLine("guid:{0}, datacenter:{1}, abtestid:{2}, traffictype:{3}\r\n", guid, datacenter, abtestid, traffictype); } + + if (SparkCLRSamples.Configuration.IsValidationEnabled) + { + Assert.AreEqual(10, collected.Length); + Assert.AreEqual(Regex.Replace(schemaRequestsLogJson, @"\s", string.Empty), Regex.Replace(dataFrameRequestsLog.Schema.Json, @"\s", string.Empty)); + } } /// @@ -299,7 +346,7 @@ namespace Microsoft.Spark.CSharp.Samples var dataType = peopleDataFrameSchemaField.DataType; var stringVal = dataType.TypeName; var simpleStringVal = dataType.SimpleString; - var isNullable = peopleDataFrameSchemaField.Nullable; + var isNullable = peopleDataFrameSchemaField.IsNullable; Console.WriteLine("Name={0}, DT.string={1}, DT.simplestring={2}, DT.isNullable={3}", name, stringVal, simpleStringVal, isNullable); } } @@ -1011,28 +1058,28 @@ namespace Microsoft.Spark.CSharp.Samples var nameColSchema = schema.Fields.Find(c => c.Name.Equals("name")); Assert.IsNotNull(nameColSchema); Assert.AreEqual("name", nameColSchema.Name); - Assert.IsTrue(nameColSchema.Nullable); + Assert.IsTrue(nameColSchema.IsNullable); Assert.AreEqual("string", nameColSchema.DataType.TypeName); // id var idColSchema = schema.Fields.Find(c => c.Name.Equals("id")); Assert.IsNotNull(idColSchema); Assert.AreEqual("id", idColSchema.Name); - Assert.IsTrue(idColSchema.Nullable); + Assert.IsTrue(idColSchema.IsNullable); Assert.AreEqual("string", nameColSchema.DataType.TypeName); // age var ageColSchema = schema.Fields.Find(c => c.Name.Equals("age")); Assert.IsNotNull(ageColSchema); Assert.AreEqual("age", ageColSchema.Name); - Assert.IsTrue(ageColSchema.Nullable); + Assert.IsTrue(ageColSchema.IsNullable); Assert.AreEqual("long", ageColSchema.DataType.TypeName); // address var addressColSchema = schema.Fields.Find(c => c.Name.Equals("address")); Assert.IsNotNull(addressColSchema); Assert.AreEqual("address", addressColSchema.Name); - Assert.IsTrue(addressColSchema.Nullable); + Assert.IsTrue(addressColSchema.IsNullable); Assert.IsNotNull(addressColSchema.DataType); Assert.AreEqual("struct", addressColSchema.DataType.TypeName); Assert.IsNotNull(((StructType)addressColSchema.DataType).Fields.Find(c => c.Name.Equals("state"))); @@ -1094,7 +1141,7 @@ namespace Microsoft.Spark.CSharp.Samples internal static void DFRddSample() { var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson)); - //peopleDataFrame.Show(); + peopleDataFrame.Show(); var dfCount = peopleDataFrame.Count(); var peopleRdd = peopleDataFrame.Rdd; From 8d53f9800af2a5525f27a4aa79630378ddbeb8d8 Mon Sep 17 00:00:00 2001 From: guwang Date: Mon, 21 Dec 2015 10:12:55 +0800 Subject: [PATCH 4/5] Address code flow comments part2 --- csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj | 1 + csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs | 6 ------ 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj b/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj index 8a2cf1d..70b2ab0 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj @@ -76,6 +76,7 @@ + diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs index ed1ec7d..a8473e8 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/SerDe.cs @@ -2,13 +2,8 @@ // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; -using System.Collections.Generic; -using System.Linq; using System.Text; using System.IO; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; -using Newtonsoft.Json.Serialization; namespace Microsoft.Spark.CSharp.Interop.Ipc { @@ -240,5 +235,4 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc Write(s, buffer); } } - } From ebd55c3ce8091c4d60643d726327977e6070945b Mon Sep 17 00:00:00 2001 From: guwang Date: Mon, 21 Dec 2015 13:51:50 +0800 Subject: [PATCH 5/5] address CodeFlow comments --- .../Microsoft.Spark.CSharp/Interop/Ipc/JsonSerDe.cs | 10 +++++----- .../Adapter/Microsoft.Spark.CSharp/Sql/SqlContext.cs | 5 ++++- csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs | 8 ++++---- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JsonSerDe.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JsonSerDe.cs index ff9a907..0ba0aea 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JsonSerDe.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Interop/Ipc/JsonSerDe.cs @@ -1,4 +1,7 @@ -using System.Linq; +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System.Linq; using Newtonsoft.Json.Linq; namespace Microsoft.Spark.CSharp.Interop.Ipc @@ -8,10 +11,7 @@ namespace Microsoft.Spark.CSharp.Interop.Ipc /// public static class JsonSerDe { - /* - * Note: Scala side uses JSortedObject when parse Json, so the properties in JObject need to be sorted. - */ - + // Note: Scala side uses JSortedObject when parse Json, so the properties in JObject need to be sorted /// /// Extend method to sort items in a JSON object by keys. /// diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SqlContext.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SqlContext.cs index e733abb..93aa071 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SqlContext.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SqlContext.cs @@ -42,7 +42,10 @@ namespace Microsoft.Spark.CSharp.Sql public DataFrame CreateDataFrame(RDD rdd, StructType schema) { - // pickle RDD, convert to RDD + // Note: This is for pickling RDD, convert to RDD which happens in CSharpWorker. + // The below sqlContextProxy.CreateDataFrame() will call byteArrayRDDToAnyArrayRDD() of SQLUtils.scala which only accept RDD of type RDD[Array[Byte]]. + // In byteArrayRDDToAnyArrayRDD() of SQLUtils.scala, the SerDeUtil.pythonToJava() will be called which is a mapPartitions inside. + // It will be executed until the CSharpWorker finishes Pickling to RDD[Array[Byte]]. var rddRow = rdd.Map(r => r); rddRow.serializedMode = SerializedMode.Row; diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs index d3e76b8..1a34892 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/Types.cs @@ -1,16 +1,16 @@ -using System; +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Reflection; -using System.Text; using System.Text.RegularExpressions; using Microsoft.Spark.CSharp.Interop.Ipc; using Microsoft.Spark.CSharp.Proxy; using Microsoft.Spark.CSharp.Proxy.Ipc; using Newtonsoft.Json; using Newtonsoft.Json.Linq; -using Razorvine.Pickle; namespace Microsoft.Spark.CSharp.Sql {