added UDF support to SparkSession

This commit is contained in:
skaarthik 2017-01-28 01:03:17 -08:00
Родитель b95520eff8
Коммит a1b74ad8a7
10 изменённых файлов: 534 добавлений и 11 удалений

Просмотреть файл

@ -157,6 +157,7 @@
<Compile Include="Sql\SparkSession.cs" />
<Compile Include="Sql\SqlContext.cs" />
<Compile Include="Sql\Types.cs" />
<Compile Include="Sql\UdfRegistration.cs" />
<Compile Include="Sql\UserDefinedFunction.cs" />
<Compile Include="Streaming\ConstantInputDStream.cs" />
<Compile Include="Streaming\DStream.cs" />

Просмотреть файл

@ -10,12 +10,15 @@ using Microsoft.Spark.CSharp.Sql;
namespace Microsoft.Spark.CSharp.Proxy
{
internal interface IUdfRegistration { }
internal interface IUdfRegistrationProxy
{
void RegisterFunction(string name, byte[] command, string returnType);
}
interface ISparkSessionProxy
{
ISqlContextProxy SqlContextProxy { get; }
IUdfRegistration Udf { get; }
IUdfRegistrationProxy Udf { get; }
ICatalogProxy GetCatalog();
IDataFrameReaderProxy Read();
ISparkSessionProxy NewSession();

Просмотреть файл

@ -17,18 +17,13 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
private readonly JvmObjectReference jvmSparkSessionReference;
private readonly ISqlContextProxy sqlContextProxy;
private readonly IUdfRegistration udfRegistration;
private readonly IUdfRegistrationProxy udfRegistrationProxy;
public IUdfRegistration Udf
public IUdfRegistrationProxy Udf
{
get
{
if (udfRegistration == null)
{
//TODO implementation needed
}
return udfRegistration;
return udfRegistrationProxy;
}
}
@ -46,6 +41,7 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
this.jvmSparkSessionReference = jvmSparkSessionReference;
sqlContextProxy = new SqlContextIpcProxy(GetSqlContextReference());
udfRegistrationProxy = new UdfRegistrationIpcProxy(sqlContextProxy);
}
private JvmObjectReference GetSqlContextReference()
@ -98,4 +94,19 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkSessionReference, "stop");
}
}
[ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured
internal class UdfRegistrationIpcProxy : IUdfRegistrationProxy
{
private readonly ISqlContextProxy sqlContextProxy;
internal UdfRegistrationIpcProxy(ISqlContextProxy sqlContextProxy)
{
this.sqlContextProxy = sqlContextProxy;
}
public void RegisterFunction(string name, byte[] command, string returnType)
{
sqlContextProxy.RegisterFunction(name, command, returnType);
}
}
}

Просмотреть файл

@ -48,6 +48,11 @@ namespace Microsoft.Spark.CSharp.Sql
get { return sparkContext; }
}
public UdfRegistration Udf
{
get { return new UdfRegistration(sparkSessionProxy.Udf); }
}
/// <summary>
/// Builder for SparkSession
/// </summary>

Просмотреть файл

@ -0,0 +1,254 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Services;
namespace Microsoft.Spark.CSharp.Sql
{
/// <summary>
/// Used for registering User Defined Functions. SparkSession.Udf is used to access instance of this type.
/// </summary>
public class UdfRegistration
{
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(UdfRegistration));
private IUdfRegistrationProxy udfRegistrationProxy;
internal UdfRegistration(IUdfRegistrationProxy udfRegistrationProxy)
{
this.udfRegistrationProxy = udfRegistrationProxy;
}
//TODO - the following section is a copy of the same functionality in SQLContext..refactoring needed
#region UDF Registration
/// <summary>
/// Register UDF with no input argument, e.g:
/// SqlContext.RegisterFunction&lt;bool>("MyFilter", () => true);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter()");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT>(string name, Func<RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
/// <summary>
/// Register UDF with 1 input argument, e.g:
/// SqlContext.RegisterFunction&lt;bool, string>("MyFilter", (arg1) => arg1 != null);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1)");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <typeparam name="A1"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT, A1>(string name, Func<A1, RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT, A1>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
/// <summary>
/// Register UDF with 2 input arguments, e.g:
/// SqlContext.RegisterFunction&lt;bool, string, string>("MyFilter", (arg1, arg2) => arg1 != null &amp;&amp; arg2 != null);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2)");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <typeparam name="A1"></typeparam>
/// <typeparam name="A2"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT, A1, A2>(string name, Func<A1, A2, RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT, A1, A2>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
/// <summary>
/// Register UDF with 3 input arguments, e.g:
/// SqlContext.RegisterFunction&lt;bool, string, string, string>("MyFilter", (arg1, arg2, arg3) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; arg3 != null);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, columnName3)");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <typeparam name="A1"></typeparam>
/// <typeparam name="A2"></typeparam>
/// <typeparam name="A3"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT, A1, A2, A3>(string name, Func<A1, A2, A3, RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT, A1, A2, A3>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
/// <summary>
/// Register UDF with 4 input arguments, e.g:
/// SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg4) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg3 != null);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName4)");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <typeparam name="A1"></typeparam>
/// <typeparam name="A2"></typeparam>
/// <typeparam name="A3"></typeparam>
/// <typeparam name="A4"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT, A1, A2, A3, A4>(string name, Func<A1, A2, A3, A4, RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT, A1, A2, A3, A4>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
/// <summary>
/// Register UDF with 5 input arguments, e.g:
/// SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg5) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg5 != null);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName5)");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <typeparam name="A1"></typeparam>
/// <typeparam name="A2"></typeparam>
/// <typeparam name="A3"></typeparam>
/// <typeparam name="A4"></typeparam>
/// <typeparam name="A5"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT, A1, A2, A3, A4, A5>(string name, Func<A1, A2, A3, A4, A5, RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT, A1, A2, A3, A4, A5>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
/// <summary>
/// Register UDF with 6 input arguments, e.g:
/// SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg6) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg6 != null);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName6)");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <typeparam name="A1"></typeparam>
/// <typeparam name="A2"></typeparam>
/// <typeparam name="A3"></typeparam>
/// <typeparam name="A4"></typeparam>
/// <typeparam name="A5"></typeparam>
/// <typeparam name="A6"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT, A1, A2, A3, A4, A5, A6>(string name, Func<A1, A2, A3, A4, A5, A6, RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT, A1, A2, A3, A4, A5, A6>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
/// <summary>
/// Register UDF with 7 input arguments, e.g:
/// SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg7) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg7 != null);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName7)");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <typeparam name="A1"></typeparam>
/// <typeparam name="A2"></typeparam>
/// <typeparam name="A3"></typeparam>
/// <typeparam name="A4"></typeparam>
/// <typeparam name="A5"></typeparam>
/// <typeparam name="A6"></typeparam>
/// <typeparam name="A7"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT, A1, A2, A3, A4, A5, A6, A7>(string name, Func<A1, A2, A3, A4, A5, A6, A7, RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT, A1, A2, A3, A4, A5, A6, A7>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
/// <summary>
/// Register UDF with 8 input arguments, e.g:
/// SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg8) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg8 != null);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName8)");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <typeparam name="A1"></typeparam>
/// <typeparam name="A2"></typeparam>
/// <typeparam name="A3"></typeparam>
/// <typeparam name="A4"></typeparam>
/// <typeparam name="A5"></typeparam>
/// <typeparam name="A6"></typeparam>
/// <typeparam name="A7"></typeparam>
/// <typeparam name="A8"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT, A1, A2, A3, A4, A5, A6, A7, A8>(string name, Func<A1, A2, A3, A4, A5, A6, A7, A8, RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT, A1, A2, A3, A4, A5, A6, A7, A8>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
/// <summary>
/// Register UDF with 9 input arguments, e.g:
/// SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg9) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg9 != null);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName9)");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <typeparam name="A1"></typeparam>
/// <typeparam name="A2"></typeparam>
/// <typeparam name="A3"></typeparam>
/// <typeparam name="A4"></typeparam>
/// <typeparam name="A5"></typeparam>
/// <typeparam name="A6"></typeparam>
/// <typeparam name="A7"></typeparam>
/// <typeparam name="A8"></typeparam>
/// <typeparam name="A9"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT, A1, A2, A3, A4, A5, A6, A7, A8, A9>(string name, Func<A1, A2, A3, A4, A5, A6, A7, A8, A9, RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT, A1, A2, A3, A4, A5, A6, A7, A8, A9>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
/// <summary>
/// Register UDF with 10 input arguments, e.g:
/// SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg10) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg10 != null);
/// sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName10)");
/// </summary>
/// <typeparam name="RT"></typeparam>
/// <typeparam name="A1"></typeparam>
/// <typeparam name="A2"></typeparam>
/// <typeparam name="A3"></typeparam>
/// <typeparam name="A4"></typeparam>
/// <typeparam name="A5"></typeparam>
/// <typeparam name="A6"></typeparam>
/// <typeparam name="A7"></typeparam>
/// <typeparam name="A8"></typeparam>
/// <typeparam name="A9"></typeparam>
/// <typeparam name="A10"></typeparam>
/// <param name="name"></param>
/// <param name="f"></param>
public void RegisterFunction<RT, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10>(string name, Func<A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, RT> f)
{
logger.LogInfo("Name of the function to register {0}, method info", name, f.Method);
Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> udfHelper = new UdfHelper<RT, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10>(f).Execute;
udfRegistrationProxy.RegisterFunction(name, SparkContext.BuildCommand(new CSharpWorkerFunc(udfHelper), SerializedMode.Row, SerializedMode.Row), Functions.GetReturnType(typeof(RT)));
}
#endregion
}
}

Просмотреть файл

@ -7675,6 +7675,171 @@
<param name="json">The Json object used to construct a StructType</param>
<returns>A new StructType instance</returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``1(System.String,System.Func{``0})">
<summary>
Register UDF with no input argument, e.g:
SqlContext.RegisterFunction&lt;bool>("MyFilter", () => true);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter()");
</summary>
<typeparam name="RT"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``2(System.String,System.Func{``1,``0})">
<summary>
Register UDF with 1 input argument, e.g:
SqlContext.RegisterFunction&lt;bool, string>("MyFilter", (arg1) => arg1 != null);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1)");
</summary>
<typeparam name="RT"></typeparam>
<typeparam name="A1"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``3(System.String,System.Func{``1,``2,``0})">
<summary>
Register UDF with 2 input arguments, e.g:
SqlContext.RegisterFunction&lt;bool, string, string>("MyFilter", (arg1, arg2) => arg1 != null &amp;&amp; arg2 != null);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2)");
</summary>
<typeparam name="RT"></typeparam>
<typeparam name="A1"></typeparam>
<typeparam name="A2"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``4(System.String,System.Func{``1,``2,``3,``0})">
<summary>
Register UDF with 3 input arguments, e.g:
SqlContext.RegisterFunction&lt;bool, string, string, string>("MyFilter", (arg1, arg2, arg3) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; arg3 != null);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, columnName3)");
</summary>
<typeparam name="RT"></typeparam>
<typeparam name="A1"></typeparam>
<typeparam name="A2"></typeparam>
<typeparam name="A3"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``5(System.String,System.Func{``1,``2,``3,``4,``0})">
<summary>
Register UDF with 4 input arguments, e.g:
SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg4) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg3 != null);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName4)");
</summary>
<typeparam name="RT"></typeparam>
<typeparam name="A1"></typeparam>
<typeparam name="A2"></typeparam>
<typeparam name="A3"></typeparam>
<typeparam name="A4"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``6(System.String,System.Func{``1,``2,``3,``4,``5,``0})">
<summary>
Register UDF with 5 input arguments, e.g:
SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg5) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg5 != null);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName5)");
</summary>
<typeparam name="RT"></typeparam>
<typeparam name="A1"></typeparam>
<typeparam name="A2"></typeparam>
<typeparam name="A3"></typeparam>
<typeparam name="A4"></typeparam>
<typeparam name="A5"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``7(System.String,System.Func{``1,``2,``3,``4,``5,``6,``0})">
<summary>
Register UDF with 6 input arguments, e.g:
SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg6) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg6 != null);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName6)");
</summary>
<typeparam name="RT"></typeparam>
<typeparam name="A1"></typeparam>
<typeparam name="A2"></typeparam>
<typeparam name="A3"></typeparam>
<typeparam name="A4"></typeparam>
<typeparam name="A5"></typeparam>
<typeparam name="A6"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``8(System.String,System.Func{``1,``2,``3,``4,``5,``6,``7,``0})">
<summary>
Register UDF with 7 input arguments, e.g:
SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg7) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg7 != null);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName7)");
</summary>
<typeparam name="RT"></typeparam>
<typeparam name="A1"></typeparam>
<typeparam name="A2"></typeparam>
<typeparam name="A3"></typeparam>
<typeparam name="A4"></typeparam>
<typeparam name="A5"></typeparam>
<typeparam name="A6"></typeparam>
<typeparam name="A7"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``9(System.String,System.Func{``1,``2,``3,``4,``5,``6,``7,``8,``0})">
<summary>
Register UDF with 8 input arguments, e.g:
SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg8) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg8 != null);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName8)");
</summary>
<typeparam name="RT"></typeparam>
<typeparam name="A1"></typeparam>
<typeparam name="A2"></typeparam>
<typeparam name="A3"></typeparam>
<typeparam name="A4"></typeparam>
<typeparam name="A5"></typeparam>
<typeparam name="A6"></typeparam>
<typeparam name="A7"></typeparam>
<typeparam name="A8"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``10(System.String,System.Func{``1,``2,``3,``4,``5,``6,``7,``8,``9,``0})">
<summary>
Register UDF with 9 input arguments, e.g:
SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg9) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg9 != null);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName9)");
</summary>
<typeparam name="RT"></typeparam>
<typeparam name="A1"></typeparam>
<typeparam name="A2"></typeparam>
<typeparam name="A3"></typeparam>
<typeparam name="A4"></typeparam>
<typeparam name="A5"></typeparam>
<typeparam name="A6"></typeparam>
<typeparam name="A7"></typeparam>
<typeparam name="A8"></typeparam>
<typeparam name="A9"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.UdfRegistration.RegisterFunction``11(System.String,System.Func{``1,``2,``3,``4,``5,``6,``7,``8,``9,``10,``0})">
<summary>
Register UDF with 10 input arguments, e.g:
SqlContext.RegisterFunction&lt;bool, string, string, ..., string>("MyFilter", (arg1, arg2, ..., arg10) => arg1 != null &amp;&amp; arg2 != null &amp;&amp; ... &amp;&amp; arg10 != null);
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1, columnName2, ..., columnName10)");
</summary>
<typeparam name="RT"></typeparam>
<typeparam name="A1"></typeparam>
<typeparam name="A2"></typeparam>
<typeparam name="A3"></typeparam>
<typeparam name="A4"></typeparam>
<typeparam name="A5"></typeparam>
<typeparam name="A6"></typeparam>
<typeparam name="A7"></typeparam>
<typeparam name="A8"></typeparam>
<typeparam name="A9"></typeparam>
<typeparam name="A10"></typeparam>
<param name="name"></param>
<param name="f"></param>
</member>
<member name="T:Microsoft.Spark.CSharp.Streaming.ConstantInputDStream`1">
<summary>
An input stream that always returns the same RDD on each timestep. Useful for testing.

Просмотреть файл

@ -119,6 +119,7 @@
<Compile Include="PairRDDTest.cs" />
<Compile Include="ComparableRDDTest.cs" />
<Compile Include="DoubleRDDTest.cs" />
<Compile Include="UdfRegistrationTest.cs" />
<Compile Include="UserDefinedFunctionTest.cs" />
<Compile Include="WeakObjectManagerTest.cs" />
</ItemGroup>

Просмотреть файл

@ -13,7 +13,7 @@ namespace AdapterTest.Mocks
class MockSparkSessionProxy : ISparkSessionProxy
{
public ISqlContextProxy SqlContextProxy { get { return new MockSqlContextProxy(new MockSparkContextProxy(new MockSparkConfProxy()));} }
public IUdfRegistration Udf { get; }
public IUdfRegistrationProxy Udf { get; }
public ICatalogProxy GetCatalog()
{
throw new NotImplementedException();

Просмотреть файл

@ -0,0 +1,57 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Sql;
using Moq;
using NUnit.Framework;
namespace AdapterTest
{
[TestFixture]
public class UdfRegistrationTest
{
[Test]
public void TestRegisterFunction()
{
Mock<IUdfRegistrationProxy> mockUdfRegistrationProxy = new Mock<IUdfRegistrationProxy>();
mockUdfRegistrationProxy.Setup(m => m.RegisterFunction(It.IsAny<string>(), It.IsAny<byte[]>(), It.IsAny<string>()));
var udfRegistration = new UdfRegistration(mockUdfRegistrationProxy.Object);
udfRegistration.RegisterFunction("Func0", () => "Func0");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func0", It.IsAny<byte[]>(), "string"));
udfRegistration.RegisterFunction<string, string>("Func1", s => "Func1");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func1", It.IsAny<byte[]>(), "string"));
udfRegistration.RegisterFunction<string, string, string>("Func2", (s1, s2) => "Func2");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func2", It.IsAny<byte[]>(), "string"));
udfRegistration.RegisterFunction<string, string, string, string>("Func3", (s1, s2, s3) => "Func3");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func3", It.IsAny<byte[]>(), "string"));
udfRegistration.RegisterFunction<string, string, string, string, string>("Func4", (s1, s2, s3, s4) => "Func4");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func4", It.IsAny<byte[]>(), "string"));
udfRegistration.RegisterFunction<string, string, string, string, string, string>("Func5", (s1, s2, s3, s4, s5) => "Func5");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func5", It.IsAny<byte[]>(), "string"));
udfRegistration.RegisterFunction<string, string, string, string, string, string, string>("Func6", (s1, s2, s3, s4, s5, s6) => "Func6");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func6", It.IsAny<byte[]>(), "string"));
udfRegistration.RegisterFunction<string, string, string, string, string, string, string, string>("Func7", (s1, s2, s3, s4, s5, s6, s7) => "Func7");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func7", It.IsAny<byte[]>(), "string"));
udfRegistration.RegisterFunction<string, string, string, string, string, string, string, string, string>("Func8", (s1, s2, s3, s4, s5, s6, s7, s8) => "Func8");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func8", It.IsAny<byte[]>(), "string"));
udfRegistration.RegisterFunction<string, string, string, string, string, string, string, string, string, string>("Func9", (s1, s2, s3, s4, s5, s6, s7, s8, s9) => "Func9");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func9", It.IsAny<byte[]>(), "string"));
udfRegistration.RegisterFunction<string, string, string, string, string, string, string, string, string, string, string>("Func10", (s1, s2, s3, s4, s5, s6, s7, s8, s9, s10) => "Func10");
mockUdfRegistrationProxy.Verify(m => m.RegisterFunction("Func10", It.IsAny<byte[]>(), "string"));
}
}
}

Просмотреть файл

@ -185,5 +185,31 @@ namespace Microsoft.Spark.CSharp.Samples
Assert.AreEqual(schemaPeople.Json, dataFramePeople.Schema.Json);
}
}
[Sample]
internal static void SparkSessionUdfSample()
{
GetSparkSession().Udf.RegisterFunction<string, string, string>("FullAddress", (city, state) => city + " " + state);
GetSparkSession().Udf.RegisterFunction<bool, string, int>("PeopleFilter", (name, age) => name == "Bill" && age > 80);
var peopleDataFrame = GetSparkSession().Read().Json(SparkCLRSamples.Configuration.GetInputDataPath(DataFrameSamples.PeopleJson));
var functionAppliedDF = peopleDataFrame.SelectExpr("name", "age * 2 as age",
"FullAddress(address.city, address.state) as address")
.Where("PeopleFilter(name, age)");
functionAppliedDF.ShowSchema();
functionAppliedDF.Show();
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
var collected = functionAppliedDF.Collect().ToArray();
CollectionAssert.AreEquivalent(new[] { "name", "age", "address" },
functionAppliedDF.Schema.Fields.Select(f => f.Name).ToArray());
Assert.AreEqual(1, collected.Length);
Assert.AreEqual("Bill", collected[0].Get("name"));
Assert.AreEqual(86, collected[0].Get("age"));
Assert.AreEqual("Seattle Washington", collected[0].Get("address"));
}
}
}
}