diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj b/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj index 4daf4aa..d887daf 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Adapter.csproj @@ -157,6 +157,7 @@ + diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkSessionProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkSessionProxy.cs index 56f869c..11a58cd 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkSessionProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/ISparkSessionProxy.cs @@ -10,12 +10,15 @@ using Microsoft.Spark.CSharp.Sql; namespace Microsoft.Spark.CSharp.Proxy { - internal interface IUdfRegistration { } + internal interface IUdfRegistrationProxy + { + void RegisterFunction(string name, byte[] command, string returnType); + } interface ISparkSessionProxy { ISqlContextProxy SqlContextProxy { get; } - IUdfRegistration Udf { get; } + IUdfRegistrationProxy Udf { get; } ICatalogProxy GetCatalog(); IDataFrameReaderProxy Read(); ISparkSessionProxy NewSession(); diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkSessionIpcProxy.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkSessionIpcProxy.cs index d134c08..febfd3b 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkSessionIpcProxy.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Proxy/Ipc/SparkSessionIpcProxy.cs @@ -17,18 +17,13 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc private readonly JvmObjectReference jvmSparkSessionReference; private readonly ISqlContextProxy sqlContextProxy; - private readonly IUdfRegistration udfRegistration; + private readonly IUdfRegistrationProxy udfRegistrationProxy; - public IUdfRegistration Udf + public IUdfRegistrationProxy Udf { get { - if (udfRegistration == null) - { - //TODO implementation needed - } - - return udfRegistration; + return udfRegistrationProxy; } } @@ -46,6 +41,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc { this.jvmSparkSessionReference = jvmSparkSessionReference; sqlContextProxy = new SqlContextIpcProxy(GetSqlContextReference()); + udfRegistrationProxy = new UdfRegistrationIpcProxy(sqlContextProxy); } private JvmObjectReference GetSqlContextReference() @@ -98,4 +94,19 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkSessionReference, "stop"); } } + + [ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured + internal class UdfRegistrationIpcProxy : IUdfRegistrationProxy + { + private readonly ISqlContextProxy sqlContextProxy; + internal UdfRegistrationIpcProxy(ISqlContextProxy sqlContextProxy) + { + this.sqlContextProxy = sqlContextProxy; + } + + public void RegisterFunction(string name, byte[] command, string returnType) + { + sqlContextProxy.RegisterFunction(name, command, returnType); + } + } } diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SparkSession.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SparkSession.cs index 3ff8a8a..55b5846 100644 --- a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SparkSession.cs +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/SparkSession.cs @@ -48,6 +48,11 @@ namespace Microsoft.Spark.CSharp.Sql get { return sparkContext; } } + public UdfRegistration Udf + { + get { return new UdfRegistration(sparkSessionProxy.Udf); } + } + /// /// Builder for SparkSession /// diff --git a/csharp/Adapter/Microsoft.Spark.CSharp/Sql/UdfRegistration.cs b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/UdfRegistration.cs new file mode 100644 index 0000000..b9c5008 --- /dev/null +++ b/csharp/Adapter/Microsoft.Spark.CSharp/Sql/UdfRegistration.cs @@ -0,0 +1,254 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Spark.CSharp.Core; +using Microsoft.Spark.CSharp.Proxy; +using Microsoft.Spark.CSharp.Services; + +namespace Microsoft.Spark.CSharp.Sql +{ + /// + /// Used for registering User Defined Functions. SparkSession.Udf is used to access instance of this type. + /// + public class UdfRegistration + { + private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(UdfRegistration)); + + private IUdfRegistrationProxy udfRegistrationProxy; + + internal UdfRegistration(IUdfRegistrationProxy udfRegistrationProxy) + { + this.udfRegistrationProxy = udfRegistrationProxy; + } + + //TODO - the following section is a copy of the same functionality in SQLContext..refactoring needed + #region UDF Registration + /// + /// Register UDF with no input argument, e.g: + /// SqlContext.RegisterFunction<bool>("MyFilter", () => true); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter()"); + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + + /// + /// Register UDF with 1 input argument, e.g: + /// SqlContext.RegisterFunction<bool, string>("MyFilter", (arg1) => arg1 != null); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1)"); + /// + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + + /// + /// Register UDF with 2 input arguments, e.g: + /// SqlContext.RegisterFunction<bool, string, string>("MyFilter", (arg1, arg2) => arg1 != null && arg2 != null); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2)"); + /// + /// + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + + /// + /// Register UDF with 3 input arguments, e.g: + /// SqlContext.RegisterFunction<bool, string, string, string>("MyFilter", (arg1, arg2, arg3) => arg1 != null && arg2 != null && arg3 != null); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, columnName3)"); + /// + /// + /// + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + + /// + /// Register UDF with 4 input arguments, e.g: + /// SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg4) => arg1 != null && arg2 != null && ... && arg3 != null); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName4)"); + /// + /// + /// + /// + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + + /// + /// Register UDF with 5 input arguments, e.g: + /// SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg5) => arg1 != null && arg2 != null && ... && arg5 != null); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName5)"); + /// + /// + /// + /// + /// + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + + /// + /// Register UDF with 6 input arguments, e.g: + /// SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg6) => arg1 != null && arg2 != null && ... && arg6 != null); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName6)"); + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + + /// + /// Register UDF with 7 input arguments, e.g: + /// SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg7) => arg1 != null && arg2 != null && ... && arg7 != null); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName7)"); + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + + /// + /// Register UDF with 8 input arguments, e.g: + /// SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg8) => arg1 != null && arg2 != null && ... && arg8 != null); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName8)"); + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + + /// + /// Register UDF with 9 input arguments, e.g: + /// SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg9) => arg1 != null && arg2 != null && ... && arg9 != null); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName9)"); + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + + /// + /// Register UDF with 10 input arguments, e.g: + /// SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg10) => arg1 != null && arg2 != null && ... && arg10 != null); + /// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName10)"); + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + /// + public void RegisterFunction(string name, Func f) + { + logger.LogInfo("Name of the function to register {0}, method info", name, f.Method); + Func, IEnumerable> udfHelper = new UdfHelper(f).Execute; + udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT))); + } + #endregion + } +} diff --git a/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML b/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML index 0f4c49c..0fde906 100644 --- a/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML +++ b/csharp/Adapter/documentation/Microsoft.Spark.CSharp.Adapter.Doc.XML @@ -7675,6 +7675,171 @@ The Json object used to construct a StructType A new StructType instance + + + Register UDF with no input argument, e.g: + SqlContext.RegisterFunction<bool>("MyFilter", () => true); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter()"); + + + + + + + + Register UDF with 1 input argument, e.g: + SqlContext.RegisterFunction<bool, string>("MyFilter", (arg1) => arg1 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1)"); + + + + + + + + + Register UDF with 2 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string>("MyFilter", (arg1, arg2) => arg1 != null && arg2 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2)"); + + + + + + + + + + Register UDF with 3 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, string>("MyFilter", (arg1, arg2, arg3) => arg1 != null && arg2 != null && arg3 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, columnName3)"); + + + + + + + + + + + Register UDF with 4 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg4) => arg1 != null && arg2 != null && ... && arg3 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName4)"); + + + + + + + + + + + + Register UDF with 5 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg5) => arg1 != null && arg2 != null && ... && arg5 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName5)"); + + + + + + + + + + + + + Register UDF with 6 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg6) => arg1 != null && arg2 != null && ... && arg6 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName6)"); + + + + + + + + + + + + + + Register UDF with 7 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg7) => arg1 != null && arg2 != null && ... && arg7 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName7)"); + + + + + + + + + + + + + + + Register UDF with 8 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg8) => arg1 != null && arg2 != null && ... && arg8 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName8)"); + + + + + + + + + + + + + + + + Register UDF with 9 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg9) => arg1 != null && arg2 != null && ... && arg9 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName9)"); + + + + + + + + + + + + + + + + + Register UDF with 10 input arguments, e.g: + SqlContext.RegisterFunction<bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg10) => arg1 != null && arg2 != null && ... && arg10 != null); + sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName10)"); + + + + + + + + + + + + + + + An input stream that always returns the same RDD on each timestep. Useful for testing. diff --git a/csharp/AdapterTest/AdapterTest.csproj b/csharp/AdapterTest/AdapterTest.csproj index ca95b87..c32ed7a 100644 --- a/csharp/AdapterTest/AdapterTest.csproj +++ b/csharp/AdapterTest/AdapterTest.csproj @@ -119,6 +119,7 @@ + diff --git a/csharp/AdapterTest/Mocks/MockSparkSessionProxy.cs b/csharp/AdapterTest/Mocks/MockSparkSessionProxy.cs index da695c3..b7cf2ca 100644 --- a/csharp/AdapterTest/Mocks/MockSparkSessionProxy.cs +++ b/csharp/AdapterTest/Mocks/MockSparkSessionProxy.cs @@ -13,7 +13,7 @@ namespace AdapterTest.Mocks class MockSparkSessionProxy : ISparkSessionProxy { public ISqlContextProxy SqlContextProxy { get { return new MockSqlContextProxy(new MockSparkContextProxy(new MockSparkConfProxy()));} } - public IUdfRegistration Udf { get; } + public IUdfRegistrationProxy Udf { get; } public ICatalogProxy GetCatalog() { throw new NotImplementedException(); diff --git a/csharp/AdapterTest/UdfRegistrationTest.cs b/csharp/AdapterTest/UdfRegistrationTest.cs new file mode 100644 index 0000000..918fe9d --- /dev/null +++ b/csharp/AdapterTest/UdfRegistrationTest.cs @@ -0,0 +1,57 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +using System; +using Microsoft.Spark.CSharp.Proxy; +using Microsoft.Spark.CSharp.Sql; +using Moq; +using NUnit.Framework; + +namespace AdapterTest +{ + [TestFixture] + public class UdfRegistrationTest + { + [Test] + public void TestRegisterFunction() + { + Mock mockUdfRegistrationProxy = new Mock(); + mockUdfRegistrationProxy.Setup(m => m.RegisterFunction(It.IsAny(), It.IsAny(), It.IsAny())); + + var udfRegistration = new UdfRegistration(mockUdfRegistrationProxy.Object); + + udfRegistration.RegisterFunction("Func0", () => "Func0"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func0", It.IsAny(), "string")); + + udfRegistration.RegisterFunction("Func1", s => "Func1"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func1", It.IsAny(), "string")); + + udfRegistration.RegisterFunction("Func2", (s1, s2) => "Func2"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func2", It.IsAny(), "string")); + + udfRegistration.RegisterFunction("Func3", (s1, s2, s3) => "Func3"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func3", It.IsAny(), "string")); + + udfRegistration.RegisterFunction("Func4", (s1, s2, s3, s4) => "Func4"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func4", It.IsAny(), "string")); + + udfRegistration.RegisterFunction("Func5", (s1, s2, s3, s4, s5) => "Func5"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func5", It.IsAny(), "string")); + + udfRegistration.RegisterFunction("Func6", (s1, s2, s3, s4, s5, s6) => "Func6"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func6", It.IsAny(), "string")); + + udfRegistration.RegisterFunction("Func7", (s1, s2, s3, s4, s5, s6, s7) => "Func7"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func7", It.IsAny(), "string")); + + udfRegistration.RegisterFunction("Func8", (s1, s2, s3, s4, s5, s6, s7, s8) => "Func8"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func8", It.IsAny(), "string")); + + udfRegistration.RegisterFunction("Func9", (s1, s2, s3, s4, s5, s6, s7, s8, s9) => "Func9"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func9", It.IsAny(), "string")); + + udfRegistration.RegisterFunction("Func10", (s1, s2, s3, s4, s5, s6, s7, s8, s9, s10) => "Func10"); + mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func10", It.IsAny(), "string")); + } + } +} diff --git a/csharp/Samples/Microsoft.Spark.CSharp/SparkSessionSamples.cs b/csharp/Samples/Microsoft.Spark.CSharp/SparkSessionSamples.cs index f628e1c..7fd1dd3 100644 --- a/csharp/Samples/Microsoft.Spark.CSharp/SparkSessionSamples.cs +++ b/csharp/Samples/Microsoft.Spark.CSharp/SparkSessionSamples.cs @@ -185,5 +185,31 @@ namespace Microsoft.Spark.CSharp.Samples Assert.AreEqual(schemaPeople.Json, dataFramePeople.Schema.Json); } } + + [Sample] + internal static void SparkSessionUdfSample() + { + GetSparkSession().Udf.RegisterFunction("FullAddress", (city, state) => city + " " + state); + GetSparkSession().Udf.RegisterFunction("PeopleFilter", (name, age) => name == "Bill" && age > 80); + + var peopleDataFrame = GetSparkSession().Read().Json(SparkCLRSamples.Configuration.GetInputDataPath(DataFrameSamples.PeopleJson)); + var functionAppliedDF = peopleDataFrame.SelectExpr("name", "age * 2 as age", + "FullAddress(address.city, address.state) as address") + .Where("PeopleFilter(name, age)"); + + functionAppliedDF.ShowSchema(); + functionAppliedDF.Show(); + + if (SparkCLRSamples.Configuration.IsValidationEnabled) + { + var collected = functionAppliedDF.Collect().ToArray(); + CollectionAssert.AreEquivalent(new[] { "name", "age", "address" }, + functionAppliedDF.Schema.Fields.Select(f => f.Name).ToArray()); + Assert.AreEqual(1, collected.Length); + Assert.AreEqual("Bill", collected[0].Get("name")); + Assert.AreEqual(86, collected[0].Get("age")); + Assert.AreEqual("Seattle Washington", collected[0].Get("address")); + } + } } }