merging latest changes from master

This commit is contained in:
Kaarthik Sivashanmugam 2016-10-11 15:09:13 -07:00 коммит произвёл GitHub
Родитель 013278bf0e fd386b09a4
Коммит da69c1fef8
132 изменённых файлов: 3432 добавлений и 355 удалений

5
.gitignore поставляемый
Просмотреть файл

@ -11,6 +11,7 @@
*.class
*.dll
*.exe
*.pyc
# Packages #
############
@ -40,6 +41,10 @@ build/dependencies/
*.log
lib/
# Local databases used for Dataset/frames #
###########################################
scala/metastore_db/
# Generated Files #
############
SparkCLRCodeCoverage.xml

Просмотреть файл

@ -112,7 +112,7 @@ Refer to the [docs folder](docs) for design overview and other info on Mobius
|Build & run unit tests |[Build in Windows](notes/windows-instructions.md#building-mobius) |[Build in Linux](notes/linux-instructions.md#building-mobius-in-linux) |
|Run samples (functional tests) in local mode |[Samples in Windows](notes/windows-instructions.md#running-samples) |[Samples in Linux](notes/linux-instructions.md#running-mobius-samples-in-linux) |
|Run examples in local mode |[Examples in Windows](/notes/running-mobius-app.md#running-mobius-examples-in-local-mode) |[Examples in Linux](notes/linux-instructions.md#running-mobius-examples-in-linux) |
|Run Mobius app |<ul><li>[Standalone cluster](notes/running-mobius-app.md#standalone-cluster)</li><li>[YARN cluster](notes/running-mobius-app.md#yarn-cluster)</li></ul> |<ul><li>[Linux cluster](notes/linux-instructions.md#running-mobius-applications-in-linux)</li><li>[Azure HDInsight Spark Cluster](/notes/linux-instructions.md#mobius-in-azure-hdinsight-spark-cluster)</li><li>[AWS EMR Spark Cluster](/notes/linux-instructions.md#mobius-in-amazon-web-services-emr-spark-cluster)</li> |
|Run Mobius app |<ul><li>[Standalone cluster](notes/running-mobius-app.md#standalone-cluster)</li><li>[YARN cluster](notes/running-mobius-app.md#yarn-cluster)</li></ul> |<ul><li>[Linux cluster](notes/linux-instructions.md#running-mobius-applications-in-linux)</li><li>[Azure HDInsight Spark Cluster](/notes/mobius-in-hdinsight.md)</li><li>[AWS EMR Spark Cluster](/notes/linux-instructions.md#mobius-in-amazon-web-services-emr-spark-cluster)</li> |
|Run Mobius Shell |<ul><li>[Local](notes/mobius-shell.md#run-shell)</li><li>[YARN](notes/mobius-shell.md#run-shell)</li></ul> | Not supported yet |
### Useful Links
@ -122,7 +122,7 @@ Refer to the [docs folder](docs) for design overview and other info on Mobius
## Supported Spark Versions
Mobius is built and tested with Apache Spark [1.4.1](https://github.com/Microsoft/Mobius/tree/branch-1.4), [1.5.2](https://github.com/Microsoft/Mobius/tree/branch-1.5) and [1.6.*](https://github.com/Microsoft/Mobius/tree/branch-1.6).
Mobius is built and tested with Apache Spark [1.4.1](https://github.com/Microsoft/Mobius/tree/branch-1.4), [1.5.2](https://github.com/Microsoft/Mobius/tree/branch-1.5), [1.6.*](https://github.com/Microsoft/Mobius/tree/branch-1.6) and [2.0](https://github.com/Microsoft/Mobius/tree/branch-2.0).
## Releases

Просмотреть файл

@ -1,6 +1,11 @@
@setlocal
@echo OFF
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
if "%1" == "csharp" set buildCSharp=true
SET CMDHOME=%~dp0

Просмотреть файл

@ -1,7 +1,49 @@
#!/bin/bash
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
export FWDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
[ ! -d "$FWDIR/dependencies" ] && mkdir "$FWDIR/dependencies"
echo "Download Mobius external dependencies"
pushd "$FWDIR/dependencies"
download_dependency() {
LINK=$1
JAR=$2
if [ ! -e $JAR ];
then
echo "Downloading $JAR"
wget -q $LINK -O $JAR
if [ ! -e $JAR ];
then
echo "Cannot download external dependency $JAR from $LINK"
popd
exit 1
fi
fi
}
SPARK_CSV_LINK="http://search.maven.org/remotecontent?filepath=com/databricks/spark-csv_2.10/1.4.0/spark-csv_2.10-1.4.0.jar"
SPARK_CSV_JAR="spark-csv_2.10-1.4.0.jar"
download_dependency $SPARK_CSV_LINK $SPARK_CSV_JAR
COMMONS_CSV_LINK="http://search.maven.org/remotecontent?filepath=org/apache/commons/commons-csv/1.4/commons-csv-1.4.jar"
COMMONS_CSV_JAR="commons-csv-1.4.jar"
download_dependency $COMMONS_CSV_LINK $COMMONS_CSV_JAR
SPARK_STREAMING_KAFKA_LINK="http://search.maven.org/remotecontent?filepath=org/apache/spark/spark-streaming-kafka-0-8-assembly_2.11/2.0.0/spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar"
SPARK_STREAMING_KAFKA_JAR="spark-streaming-kafka-0-8-assembly_2.11-2.0.0.jar"
download_dependency $SPARK_STREAMING_KAFKA_LINK $SPARK_STREAMING_KAFKA_JAR
popd
export SPARKCLR_HOME="$FWDIR/runtime"
echo "SPARKCLR_HOME=$SPARKCLR_HOME"
@ -17,6 +59,11 @@ fi
[ ! -d "$SPARKCLR_HOME/lib" ] && mkdir "$SPARKCLR_HOME/lib"
[ ! -d "$SPARKCLR_HOME/samples" ] && mkdir "$SPARKCLR_HOME/samples"
[ ! -d "$SPARKCLR_HOME/scripts" ] && mkdir "$SPARKCLR_HOME/scripts"
[ ! -d "$SPARKCLR_HOME/dependencies" ] && mkdir "$SPARKCLR_HOME/dependencies"
echo "Assemble Mobius external dependencies"
cp $FWDIR/dependencies/* "$SPARKCLR_HOME/dependencies/"
[ $? -ne 0 ] && exit 1
echo "Assemble Mobius Scala components"
pushd "$FWDIR/../scala"
@ -31,7 +78,7 @@ mvn clean -q
# build the package
mvn package -Puber-jar -q
if [ $? -ne 0 ]
if [ $? -ne 0 ];
then
echo "Build Mobius Scala components failed, stop building."
popd

Просмотреть файл

@ -1,3 +1,8 @@
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
function Get-ScriptDirectory
{
$Invocation = (Get-Variable MyInvocation -Scope 1).Value;

Просмотреть файл

@ -1,4 +1,10 @@
@echo OFF
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
setlocal enabledelayedexpansion
SET CMDHOME=%~dp0
@ -62,8 +68,8 @@ set SPARKCLR_HOME=%CMDHOME%\..\runtime
@rem spark-csv package and its depenedency are required for DataFrame operations in Mobius
set SPARKCLR_EXT_PATH=%SPARKCLR_HOME%\dependencies
set SPARKCSV_JAR1PATH=%SPARKCLR_EXT_PATH%\spark-csv_2.10-1.3.0.jar
set SPARKCSV_JAR2PATH=%SPARKCLR_EXT_PATH%\commons-csv-1.1.jar
set SPARKCSV_JAR1PATH=%SPARKCLR_EXT_PATH%\spark-csv_2.10-1.4.0.jar
set SPARKCSV_JAR2PATH=%SPARKCLR_EXT_PATH%\commons-csv-1.4.jar
set SPARKCLR_EXT_JARS=%SPARKCSV_JAR1PATH%,%SPARKCSV_JAR2PATH%
@rem RunSamples.cmd is in local mode, should not load Hadoop or Yarn cluster config. Disable Hadoop/Yarn conf dir.

Просмотреть файл

@ -1,3 +1,8 @@
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
#
# Input -
# (1) "stage" parameter, accepts either "build" or "run"
@ -342,14 +347,14 @@ function Download-ExternalDependencies
$readMeStream.WriteLine("------------ Dependencies for CSV parsing in Mobius DataFrame API -----------------------------")
# Downloading spark-csv package and its depenency. These packages are required for DataFrame operations in Mobius
$url = "http://search.maven.org/remotecontent?filepath=com/databricks/spark-csv_2.10/1.3.0/spark-csv_2.10-1.3.0.jar"
$output="$scriptDir\..\dependencies\spark-csv_2.10-1.3.0.jar"
$url = "http://search.maven.org/remotecontent?filepath=com/databricks/spark-csv_2.10/1.4.0/spark-csv_2.10-1.4.0.jar"
$output="$scriptDir\..\dependencies\spark-csv_2.10-1.4.0.jar"
Download-File $url $output
Write-Output "[downloadtools.Download-ExternalDependencies] Downloading $url to $scriptDir\..\dependencies"
$readMeStream.WriteLine("$url")
$url = "http://search.maven.org/remotecontent?filepath=org/apache/commons/commons-csv/1.1/commons-csv-1.1.jar"
$output="$scriptDir\..\dependencies\commons-csv-1.1.jar"
$url = "http://search.maven.org/remotecontent?filepath=org/apache/commons/commons-csv/1.4/commons-csv-1.4.jar"
$output="$scriptDir\..\dependencies\commons-csv-1.4.jar"
Download-File $url $output
Write-Output "[downloadtools.Download-ExternalDependencies] Downloading $url to $scriptDir\..\dependencies"
$readMeStream.WriteLine("$url")

Просмотреть файл

@ -1,3 +1,8 @@
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
$x64items = @(Get-ChildItem "HKLM:SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall")
$x64items + @(Get-ChildItem "HKLM:SOFTWARE\wow6432node\Microsoft\Windows\CurrentVersion\Uninstall") `
| ForEach-object { Get-ItemProperty Microsoft.PowerShell.Core\Registry::$_ } `

Просмотреть файл

@ -1,3 +1,8 @@
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
$root = (split-path -parent $MyInvocation.MyCommand.Definition) + '\..\..'
# expected tagname: v{version-string}. E.g., "v1.5.2-snapshot-2", "v1.5.2-prerelease-1"

Просмотреть файл

@ -1,3 +1,8 @@
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
#
# Input -
# "targetPom" parameter, target Pom.xml file

Просмотреть файл

@ -1,5 +1,10 @@
@echo OFF
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
set precheck=ok
if not exist "%JAVA_HOME%\bin\java.exe" (

Просмотреть файл

@ -1,5 +1,10 @@
#!/bin/bash
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
export verbose=
for param in "$@"
@ -68,9 +73,9 @@ fi
export SPARKCLR_HOME="$FWDIR/../runtime"
# spark-csv package and its depenedency are required for DataFrame operations in Mobius
export SPARKCLR_EXT_PATH="$SPARKCLR_HOME\dependencies"
export SPARKCSV_JAR1PATH="$SPARKCLR_EXT_PATH\spark-csv_2.10-1.3.0.jar"
export SPARKCSV_JAR2PATH="$SPARKCLR_EXT_PATH\commons-csv-1.1.jar"
export SPARKCLR_EXT_PATH="$SPARKCLR_HOME/dependencies"
export SPARKCSV_JAR1PATH="$SPARKCLR_EXT_PATH/spark-csv_2.10-1.4.0.jar"
export SPARKCSV_JAR2PATH="$SPARKCLR_EXT_PATH/commons-csv-1.4.jar"
export SPARKCLR_EXT_JARS="$SPARKCSV_JAR1PATH,$SPARKCSV_JAR2PATH"
# run-samples.sh is in local mode, should not load Hadoop or Yarn cluster config. Disable Hadoop/Yarn conf dir.

Просмотреть файл

@ -1,8 +1,10 @@
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
# This script takes in "dir" and "target" parameters, zips all files under dir to the target file
#
Param([string]$dir, [string]$target)

Просмотреть файл

@ -1,6 +1,11 @@
@setlocal
@ECHO off
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
SET CMDHOME=%~dp0
@REM Remove trailing backslash \
set CMDHOME=%CMDHOME:~0,-1%

Просмотреть файл

@ -1,4 +1,10 @@
@ECHO OFF
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
FOR /D /R . %%G IN (bin) DO @IF EXIST "%%G" (@echo RDMR /S /Q "%%G" & rd /s /q "%%G")
FOR /D /R . %%G IN (obj) DO @IF EXIST "%%G" (@echo RDMR /S /Q "%%G" & rd /s /q "%%G")
FOR /D /R . %%G IN (x64) DO @IF EXIST "%%G" (@echo RDMR /S /Q "%%G" & rd /s /q "%%G")

Просмотреть файл

@ -102,20 +102,25 @@
<Compile Include="Network\SockDataToken.cs" />
<Compile Include="Network\SocketFactory.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Proxy\ICatalogProxy.cs" />
<Compile Include="Proxy\IDataFrameNaFunctionsProxy.cs" />
<Compile Include="Proxy\IDataFrameProxy.cs" />
<Compile Include="Proxy\IDataFrameReaderProxy.cs" />
<Compile Include="Proxy\IDataFrameWriterProxy.cs" />
<Compile Include="Proxy\IDatasetProxy.cs" />
<Compile Include="Proxy\IDStreamProxy.cs" />
<Compile Include="Proxy\IHadoopConfigurationProxy.cs" />
<Compile Include="Proxy\Ipc\CatalogIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameNaFunctionsIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameReaderIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameWriterIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DatasetIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DStreamIpcProxy.cs" />
<Compile Include="Proxy\Ipc\HadoopConfigurationIpcProxy.cs" />
<Compile Include="Proxy\Ipc\RDDIpcProxy.cs" />
<Compile Include="Proxy\Ipc\SparkCLRIpcProxy.cs" />
<Compile Include="Proxy\Ipc\SparkSessionIpcProxy.cs" />
<Compile Include="Proxy\Ipc\SqlContextIpcProxy.cs" />
<Compile Include="Proxy\Ipc\StatusTrackerIpcProxy.cs" />
<Compile Include="Proxy\Ipc\StreamingContextIpcProxy.cs" />
@ -125,6 +130,7 @@
<Compile Include="Proxy\ISparkConfProxy.cs" />
<Compile Include="Proxy\ISparkContextProxy.cs" />
<Compile Include="Proxy\Ipc\SparkConfIpcProxy.cs" />
<Compile Include="Proxy\ISparkSessionProxy.cs" />
<Compile Include="Proxy\ISqlContextProxy.cs" />
<Compile Include="Proxy\IStatusTrackerProxy.cs" />
<Compile Include="Proxy\IStreamingContextProxy.cs" />
@ -134,17 +140,21 @@
<Compile Include="Services\ILoggerService.cs" />
<Compile Include="Services\Log4NetLoggerService.cs" />
<Compile Include="Services\LoggerServiceFactory.cs" />
<Compile Include="Sql\Builder.cs" />
<Compile Include="Sql\Catalog.cs" />
<Compile Include="Sql\Column.cs" />
<Compile Include="Sql\DataFrame.cs" />
<Compile Include="Sql\DataFrameNaFunctions.cs" />
<Compile Include="Sql\DataFrameReader.cs" />
<Compile Include="Sql\DataFrameWriter.cs" />
<Compile Include="Sql\Dataset.cs" />
<Compile Include="Sql\HiveContext.cs" />
<Compile Include="Sql\PythonSerDe.cs" />
<Compile Include="Sql\RowConstructor.cs" />
<Compile Include="Sql\Row.cs" />
<Compile Include="Sql\Functions.cs" />
<Compile Include="Sql\SaveMode.cs" />
<Compile Include="Sql\SparkSession.cs" />
<Compile Include="Sql\SqlContext.cs" />
<Compile Include="Sql\Types.cs" />
<Compile Include="Sql\UserDefinedFunction.cs" />

Просмотреть файл

@ -65,7 +65,9 @@ namespace Microsoft.Spark.CSharp.Configuration
configuration = new SparkCLRConfiguration(appConfig);
runMode = RunMode.CLUSTER;
}
else if (sparkMaster.Equals("yarn-client", StringComparison.OrdinalIgnoreCase) || sparkMaster.Equals("yarn-cluster", StringComparison.OrdinalIgnoreCase))
else if (sparkMaster.Equals("yarn-cluster", StringComparison.OrdinalIgnoreCase) ||
sparkMaster.Equals("yarn-client", StringComparison.OrdinalIgnoreCase) ||
sparkMaster.Equals("yarn", StringComparison.OrdinalIgnoreCase)) //supported in Spark 2.0
{
configuration = new SparkCLRConfiguration(appConfig);
runMode = RunMode.YARN;

Просмотреть файл

@ -22,13 +22,14 @@ namespace Microsoft.Spark.CSharp.Core
public CSharpWorkerFunc(Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> func)
{
this.func = func;
stackTrace = new StackTrace(true).ToString();
stackTrace = new StackTrace(true).ToString().Replace(" at ", " [STACK] ");
}
public CSharpWorkerFunc(Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> func, string innerStackTrace)
: this(func)
{
this.func = func;
stackTrace = new StackTrace(true).ToString() + "\nInner stack trace ...\n" + innerStackTrace;
stackTrace += string.Format(" [STACK] --- Inner stack trace: ---{0}{1}",
Environment.NewLine, innerStackTrace.Replace(" at ", " [STACK] "));
}
public Func<int, IEnumerable<dynamic>, IEnumerable<dynamic>> Func

Просмотреть файл

@ -2,6 +2,8 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;
using Microsoft.Spark.CSharp.Configuration;
using Microsoft.Spark.CSharp.Interop;
using Microsoft.Spark.CSharp.Proxy;
@ -122,6 +124,29 @@ namespace Microsoft.Spark.CSharp.Core
{
return sparkConfProxy.Get(key, defaultValue);
}
/// <summary>
/// Get all parameters as a list of pairs
/// </summary>
public Dictionary<string, string> GetAll()
{
var configKvp = new Dictionary<string, string>();
var kvpStringCollection = sparkConfProxy.GetSparkConfAsString();
var kvpStringArray = Regex.Split(kvpStringCollection, ";");
foreach (var kvpString in kvpStringArray)
{
if (!string.IsNullOrEmpty(kvpString))
{
var kvpItems = Regex.Split(kvpString, "=");
if (kvpItems.Length == 2 && !string.IsNullOrEmpty(kvpItems[0]) && !string.IsNullOrEmpty(kvpItems[1]))
{
configKvp.Add(kvpItems[0], kvpItems[1]);
}
}
}
return configKvp;
}
}

Просмотреть файл

@ -129,6 +129,7 @@ namespace Microsoft.Spark.CSharp.Core
{
SparkContextProxy = sparkContextProxy;
SparkConf = conf;
_activeSparkContext = this;
}
private SparkContext(string master, string appName, string sparkHome, SparkConf conf)
@ -145,6 +146,25 @@ namespace Microsoft.Spark.CSharp.Core
_activeSparkContext = this;
}
/// <summary>
/// This function may be used to get or instantiate a SparkContext and register it as a
/// singleton object. Because we can only have one active SparkContext per JVM,
/// this is useful when applications may wish to share a SparkContext.
/// Note: This function cannot be used to create multiple SparkContext instances
/// even if multiple contexts are allowed.
/// </summary>
/// <param name="conf"></param>
/// <returns></returns>
public static SparkContext GetOrCreate(SparkConf conf)
{
if (_activeSparkContext == null)
{
_activeSparkContext = new SparkContext(conf);
}
return _activeSparkContext;
}
internal void StartAccumulatorServer()
{
if (accumulatorServer == null)

Просмотреть файл

@ -30,5 +30,6 @@ using System.Runtime.InteropServices;
// Build Number
// Revision
//
[assembly: AssemblyVersion("1.6.1.0")]
[assembly: AssemblyFileVersion("1.6.1.0")]
[assembly: AssemblyVersion("2.0")]
[assembly: AssemblyFileVersion("2.0")]

Просмотреть файл

@ -0,0 +1,52 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Sql;
using Microsoft.Spark.CSharp.Sql.Catalog;
using Column = Microsoft.Spark.CSharp.Sql.Catalog.Column;
namespace Microsoft.Spark.CSharp.Proxy
{
interface ICatalogProxy
{
string CurrentDatabase { get; }
void SetCurrentDatabase(string dbName);
Dataset<Database> ListDatabases();
Dataset<Table> ListTables(string dbName);
Dataset<Function> ListFunctions(string dbName);
Dataset<Column> ListColumns(string tableName);
Dataset<Column> ListColumns(string dbName, string tableName);
void DropTempTable(string tableName);
bool IsCached(string tableName);
void CacheTable(string tableName);
void UnCacheTable(string tableName);
void RefreshTable(string tableName);
void ClearCache();
DataFrame CreateExternalTable(string tableName, string path);
DataFrame CreateExternalTable(string tableName, string path, string source);
DataFrame CreateExternalTable(string tableName, string source, Dictionary<string, string> options);
DataFrame CreateExternalTable(string tableName, string source, StructType schema,
Dictionary<string, string> options);
}
}

Просмотреть файл

@ -0,0 +1,16 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Microsoft.Spark.CSharp.Proxy
{
interface IDatasetProxy
{
IDataFrameProxy ToDF();
}
}

Просмотреть файл

@ -18,5 +18,6 @@ namespace Microsoft.Spark.CSharp.Proxy
void Set(string key, string value);
int GetInt(string key, int defaultValue);
string Get(string key, string defaultValue);
string GetSparkConfAsString();
}
}

Просмотреть файл

@ -15,8 +15,8 @@ namespace Microsoft.Spark.CSharp.Proxy
internal interface ISparkContextProxy
{
ISparkConfProxy GetConf();
ISqlContextProxy CreateSqlContext();
ISqlContextProxy CreateHiveContext();
ISparkSessionProxy CreateSparkSession();
IColumnProxy CreateColumnFromName(string name);
IColumnProxy CreateFunction(string name, object self);
IColumnProxy CreateBinaryMathFunction(string name, object self, object other);

Просмотреть файл

@ -0,0 +1,27 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Sql;
namespace Microsoft.Spark.CSharp.Proxy
{
internal interface IUdfRegistration { }
interface ISparkSessionProxy
{
ISqlContextProxy SqlContextProxy { get; }
IUdfRegistration Udf { get; }
ICatalogProxy GetCatalog();
IDataFrameReaderProxy Read();
ISparkSessionProxy NewSession();
IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy);
IDataFrameProxy Table(string tableName);
IDataFrameProxy Sql(string query);
void Stop();
}
}

Просмотреть файл

@ -14,7 +14,6 @@ namespace Microsoft.Spark.CSharp.Proxy
internal interface ISqlContextProxy
{
IDataFrameReaderProxy Read();
ISqlContextProxy NewSession();
string GetConf(string key, string defaultValue);
void SetConf(string key, string value);
IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy);

Просмотреть файл

@ -0,0 +1,154 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Sql;
using Microsoft.Spark.CSharp.Sql.Catalog;
namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
[ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured
internal class CatalogIpcProxy : ICatalogProxy
{
private readonly JvmObjectReference jvmCatalogReference;
private readonly ISqlContextProxy sqlContextProxy;
internal CatalogIpcProxy(JvmObjectReference jvmCatalogReference, ISqlContextProxy sqlContextProxy)
{
this.jvmCatalogReference = jvmCatalogReference;
this.sqlContextProxy = sqlContextProxy;
}
public string CurrentDatabase
{
get
{
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "currentDatabase").ToString();
}
}
public void CacheTable(string tableName)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "cacheTable", new object[] { tableName });
}
public void ClearCache()
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "clearCache");
}
public DataFrame CreateExternalTable(string tableName, string path)
{
return new DataFrame(
new DataFrameIpcProxy(
new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "createExternalTable",
new object[] {tableName, path}).ToString()), sqlContextProxy), SparkContext.GetActiveSparkContext());
}
public DataFrame CreateExternalTable(string tableName, string source, Dictionary<string, string> options)
{
throw new NotImplementedException(); //TODO - implement
}
public DataFrame CreateExternalTable(string tableName, string path, string source)
{
return new DataFrame(
new DataFrameIpcProxy(
new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "createExternalTable",
new object[] { tableName, path, source }).ToString()), sqlContextProxy), SparkContext.GetActiveSparkContext());
}
public DataFrame CreateExternalTable(string tableName, string source, StructType schema, Dictionary<string, string> options)
{
throw new NotImplementedException(); //TODO - implement
}
public void DropTempTable(string tableName)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "dropTempView", new object[] { tableName });
}
public bool IsCached(string tableName)
{
return
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "isCached",
new object[] {tableName}).ToString().Equals("true", StringComparison.InvariantCultureIgnoreCase);
}
public Dataset<Sql.Catalog.Column> ListColumns(string tableName)
{
return new Dataset<Sql.Catalog.Column>(
new DatasetIpcProxy(
new JvmObjectReference(
(string)
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "listColumns",
new object[] { tableName })), sqlContextProxy));
}
public Dataset<Sql.Catalog.Column> ListColumns(string dbName, string tableName)
{
return new Dataset<Sql.Catalog.Column>(
new DatasetIpcProxy(
new JvmObjectReference(
(string)
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "listColumns",
new object[] { dbName, tableName })), sqlContextProxy));
}
public Dataset<Database> ListDatabases()
{
return new Dataset<Database>(
new DatasetIpcProxy(
new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "listDatabases")), sqlContextProxy));
}
public Dataset<Function> ListFunctions(string dbName)
{
return new Dataset<Function>(
new DatasetIpcProxy(
new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "listFunctions", new object[] { dbName })), sqlContextProxy));
}
public Dataset<Table> ListTables(string dbName = null)
{
if (dbName != null)
return new Dataset<Table>(
new DatasetIpcProxy(
new JvmObjectReference(
(string)
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "listTables",
new object[] {dbName})), sqlContextProxy));
else
return new Dataset<Table>(
new DatasetIpcProxy(
new JvmObjectReference(
(string)
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "listTables")),
sqlContextProxy));
}
public void SetCurrentDatabase(string dbName)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "setCurrentDatabase", new object[] { dbName });
}
public void UnCacheTable(string tableName)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "uncacheTable", new object[] { tableName });
}
public void RefreshTable(string tableName)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmCatalogReference, "refreshTable", new object[] { tableName });
}
}
}

Просмотреть файл

@ -0,0 +1,35 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Interop.Ipc;
namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
[ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured
internal class DatasetIpcProxy : IDatasetProxy
{
private readonly JvmObjectReference jvmDatasetReference;
private readonly ISqlContextProxy sqlContextProxy;
internal DatasetIpcProxy(JvmObjectReference jvmDatasetReference, ISqlContextProxy sqlContextProxy)
{
this.jvmDatasetReference = jvmDatasetReference;
this.sqlContextProxy = sqlContextProxy;
}
public IDataFrameProxy ToDF()
{
return new DataFrameIpcProxy(
new JvmObjectReference(
(string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDatasetReference, "toDF")),
sqlContextProxy
);
}
}
}

Просмотреть файл

@ -57,5 +57,10 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
return SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkConfReference, "get", new object[] { key, defaultValue }).ToString();
}
}
public string GetSparkConfAsString()
{
return SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.JvmBridgeUtils", "getSparkConfAsString", new object[] { jvmSparkConfReference }).ToString();
}
}
}

Просмотреть файл

@ -38,10 +38,15 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
this.jvmSparkContextReference = jvmSparkContextReference;
this.jvmJavaContextReference = jvmJavaContextReference;
}
public ISqlContextProxy CreateSqlContext()
public ISparkSessionProxy CreateSparkSession()
{
return new SqlContextIpcProxy(new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "createSQLContext", new object[] { jvmSparkContextReference })));
return
new SparkSessionIpcProxy(
new JvmObjectReference(
(string)
SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils",
"createSparkSession", new object[] {jvmSparkContextReference})));
}
public ISqlContextProxy CreateHiveContext()

Просмотреть файл

@ -0,0 +1,101 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Interop.Ipc;
namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
[ExcludeFromCodeCoverage] //IPC calls to JVM validated using validation-enabled samples - unit test coverage not reqiured
internal class SparkSessionIpcProxy : ISparkSessionProxy
{
private readonly JvmObjectReference jvmSparkSessionReference;
private readonly ISqlContextProxy sqlContextProxy;
private readonly IUdfRegistration udfRegistration;
public IUdfRegistration Udf
{
get
{
if (udfRegistration == null)
{
//TODO implementation needed
}
return udfRegistration;
}
}
public ISqlContextProxy SqlContextProxy
{
get { return sqlContextProxy; }
}
public ICatalogProxy GetCatalog()
{
return new CatalogIpcProxy(new JvmObjectReference((string) SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkSessionReference, "catalog")), sqlContextProxy);
}
internal SparkSessionIpcProxy(JvmObjectReference jvmSparkSessionReference)
{
this.jvmSparkSessionReference = jvmSparkSessionReference;
sqlContextProxy = new SqlContextIpcProxy(GetSqlContextReference());
}
private JvmObjectReference GetSqlContextReference()
{
return
new JvmObjectReference(
(string) SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "getSqlContext", new object[] { jvmSparkSessionReference }));
}
public ISparkSessionProxy NewSession()
{
return new SparkSessionIpcProxy(
new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkSessionReference, "newSession")));
}
public IDataFrameReaderProxy Read()
{
var javaDataFrameReaderReference = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkSessionReference, "read");
return new DataFrameReaderIpcProxy(new JvmObjectReference(javaDataFrameReaderReference.ToString()), sqlContextProxy);
}
public IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy)
{
var rdd = new JvmObjectReference(SparkCLRIpcProxy.JvmBridge.CallStaticJavaMethod("org.apache.spark.sql.api.csharp.SQLUtils", "byteArrayRDDToAnyArrayRDD",
new object[] { (rddProxy as RDDIpcProxy).JvmRddReference }).ToString());
return new DataFrameIpcProxy(
new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkSessionReference, "applySchemaToPythonRDD",
new object[] { rdd, (structTypeProxy as StructTypeIpcProxy).JvmStructTypeReference }).ToString()), sqlContextProxy);
}
public IDataFrameProxy Sql(string sqlQuery)
{
var javaDataFrameReference = SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkSessionReference, "sql", new object[] { sqlQuery });
var javaObjectReferenceForDataFrame = new JvmObjectReference(javaDataFrameReference.ToString());
return new DataFrameIpcProxy(javaObjectReferenceForDataFrame, sqlContextProxy);
}
public IDataFrameProxy Table(string tableName)
{
return new DataFrameIpcProxy(
new JvmObjectReference(
(string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkSessionReference, "table",
new object[] { tableName })), sqlContextProxy);
}
public void Stop()
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSparkSessionReference, "stop");
}
}
}

Просмотреть файл

@ -112,12 +112,6 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(judf, "registerPython", new object[] { name, udf });
}
public ISqlContextProxy NewSession()
{
return new SqlContextIpcProxy(
new JvmObjectReference((string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSqlContextReference, "newSession")));
}
public string GetConf(string key, string defaultValue)
{
return (string)SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmSqlContextReference, "getConf", new object[] { key, defaultValue });

Просмотреть файл

@ -1,8 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Microsoft.Spark.CSharp.Services
{
@ -12,7 +8,20 @@ namespace Microsoft.Spark.CSharp.Services
/// </summary>
public class DefaultLoggerService : ILoggerService
{
internal readonly static DefaultLoggerService Instance = new DefaultLoggerService(typeof (Type));
internal static readonly DefaultLoggerService Instance = new DefaultLoggerService(typeof(Type));
private readonly Type type;
private DefaultLoggerService(Type t)
{
type = t;
}
/// <summary>
/// Gets a value indicating whether logging is enabled for the Debug level.
/// Always return true for the DefaultLoggerService object.
/// </summary>
public bool IsDebugEnabled { get { return true; } }
/// <summary>
/// Get an instance of ILoggerService by a given type of logger
/// </summary>
@ -22,12 +31,6 @@ namespace Microsoft.Spark.CSharp.Services
{
return new DefaultLoggerService(type);
}
private readonly Type type;
private DefaultLoggerService(Type t)
{
type = t;
}
/// <summary>
/// Logs a message at debug level.

Просмотреть файл

@ -7,6 +7,11 @@ namespace Microsoft.Spark.CSharp.Services
/// </summary>
public interface ILoggerService
{
/// <summary>
/// Gets a value indicating whether logging is enabled for the Debug level.
/// </summary>
bool IsDebugEnabled { get; }
/// <summary>
/// Get an instance of ILoggerService by a given type of logger
/// </summary>

Просмотреть файл

@ -1,10 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading.Tasks;
using log4net;
using log4net.Config;
@ -35,7 +31,15 @@ namespace Microsoft.Spark.CSharp.Services
public Log4NetLoggerService(Type type)
{
logger = LogManager.GetLogger(type);
log4net.GlobalContext.Properties["pid"] = Process.GetCurrentProcess().Id;
GlobalContext.Properties["pid"] = Process.GetCurrentProcess().Id;
}
/// <summary>
/// Gets a value indicating whether logging is enabled for the Debug level.
/// </summary>
public bool IsDebugEnabled
{
get { return logger.IsDebugEnabled; }
}
/// <summary>

Просмотреть файл

@ -0,0 +1,130 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
namespace Microsoft.Spark.CSharp.Sql
{
/// <summary>
/// The entry point to programming Spark with the Dataset and DataFrame API.
/// </summary>
public class Builder
{
internal Dictionary<string, string> options = new Dictionary<string, string>();
internal Builder() { }
/// <summary>
/// Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to
/// run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
/// </summary>
/// <param name="master">Master URL</param>
public Builder Master(string master)
{
Config("spark.master", master);
return this;
}
/// <summary>
/// Sets a name for the application, which will be shown in the Spark web UI.
/// If no application name is set, a randomly generated name will be used.
/// </summary>
/// <param name="appName">Name of the app</param>
public Builder AppName(string appName)
{
Config("spark.app.name", appName);
return this;
}
/// <summary>
/// Sets a config option. Options set using this method are automatically propagated to
/// both SparkConf and SparkSession's own configuration.
/// </summary>
/// <param name="key">Key for the configuration</param>
/// <param name="value">value of the configuration</param>
public Builder Config(string key, string value)
{
options[key] = value;
return this;
}
/// <summary>
/// Sets a config option. Options set using this method are automatically propagated to
/// both SparkConf and SparkSession's own configuration.
/// </summary>
/// <param name="key">Key for the configuration</param>
/// <param name="value">value of the configuration</param>
public Builder Config(string key, bool value)
{
options[key] = value.ToString();
return this;
}
/// <summary>
/// Sets a config option. Options set using this method are automatically propagated to
/// both SparkConf and SparkSession's own configuration.
/// </summary>
/// <param name="key">Key for the configuration</param>
/// <param name="value">value of the configuration</param>
public Builder Config(string key, double value)
{
options[key] = value.ToString();
return this;
}
/// <summary>
/// Sets a config option. Options set using this method are automatically propagated to
/// both SparkConf and SparkSession's own configuration.
/// </summary>
/// <param name="key">Key for the configuration</param>
/// <param name="value">value of the configuration</param>
public Builder Config(string key, long value)
{
options[key] = value.ToString();
return this;
}
/// <summary>
/// Sets a list of config options based on the given SparkConf
/// </summary>
public Builder Config(SparkConf conf)
{
foreach (var keyValuePair in conf.GetAll())
{
options[keyValuePair.Key] = keyValuePair.Value;
}
return this;
}
/// <summary>
/// Enables Hive support, including connectivity to a persistent Hive metastore, support for
/// Hive serdes, and Hive user-defined functions.
/// </summary>
public Builder EnableHiveSupport()
{
return Config("spark.sql.catalogImplementation", "hive");
}
/// <summary>
/// Gets an existing [[SparkSession]] or, if there is no existing one, creates a new
/// one based on the options set in this builder.
/// </summary>
/// <returns></returns>
public SparkSession GetOrCreate()
{
var sparkConf = new SparkConf();
foreach (var option in options)
{
sparkConf.Set(option.Key, option.Value);
}
var sparkContext = SparkContext.GetOrCreate(sparkConf);
return SqlContext.GetOrCreate(sparkContext).SparkSession;
}
}
}

Просмотреть файл

@ -0,0 +1,350 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Proxy;
namespace Microsoft.Spark.CSharp.Sql.Catalog
{
/// <summary>
/// Catalog interface for Spark.
/// </summary>
public class Catalog
{
ICatalogProxy catalogProxy;
internal Catalog(ICatalogProxy catalogProxy)
{
this.catalogProxy = catalogProxy;
}
/// <summary>
/// Returns the current default database in this session.
/// </summary>
public string CurrentDatabase
{
get { return catalogProxy.CurrentDatabase; }
}
// TODO Enable these convenience functions if needed
/*
public List<Database> GetDatabasesList()
{
var rows = ListDatabases().Collect();
var list = new List<Database>();
foreach (var row in rows)
{
list.Add(new Database
{
Name = row.Get("name"),
Description = row.Get("description"),
LocationUri = row.Get("locationUri")
});
}
return list;
}
public List<Table> GetTablesList(string dbName = null)
{
var tables = ListTables(dbName).Collect();
//iterate and construct Table
throw new NotImplementedException();
}
public List<Table> GetColumnsList(string tableName, string dbName = null)
{
var tables = ListColumns(tableName, dbName).Collect();
//iterate and construct Column
throw new NotImplementedException();
}
public List<Table> GetFunctionsList(string dbName = null)
{
var tables = ListFunctions(dbName).Collect();
//iterate and construct Table
throw new NotImplementedException();
}
*/
/// <summary>
/// Returns a list of databases available across all sessions.
/// </summary>
/// <returns></returns>
public DataFrame ListDatabases()
{
return catalogProxy.ListDatabases().ToDF();
}
/// <summary>
/// Returns a list of tables in the current database or given database
/// This includes all temporary tables.
/// </summary>
/// <param name="dbName">Optional database name. If not provided, current database is used</param>
public DataFrame ListTables(string dbName = null)
{
return catalogProxy.ListTables(dbName ?? CurrentDatabase).ToDF();
}
/// <summary>
/// Returns a list of columns for the given table in the current database or
/// the given temporary table.
/// </summary>
/// <param name="tableName">Name of the table</param>
/// <param name="dbName">Name of the database. If database is not provided, current database is used</param>
public DataFrame ListColumns(string tableName, string dbName = null)
{
return catalogProxy.ListColumns(tableName, dbName ?? CurrentDatabase).ToDF();
}
/// <summary>
/// Returns a list of functions registered in the specified database.
/// This includes all temporary functions
/// </summary>
/// <param name="dbName">Name of the database. If database is not provided, current database is used</param>
public DataFrame ListFunctions(string dbName = null)
{
return catalogProxy.ListFunctions(dbName ?? CurrentDatabase).ToDF();
}
/// <summary>
/// Sets the current default database in this session.
/// </summary>
/// <param name="dbName">Name of database</param>
public void SetCurrentDatabase(string dbName)
{
catalogProxy.SetCurrentDatabase(dbName);
}
/// <summary>
/// Drops the temporary view with the given view name in the catalog.
/// If the view has been cached before, then it will also be uncached.
/// </summary>
/// <param name="tempViewName">Name of the table</param>
public void DropTempView(string tempViewName)
{
catalogProxy.DropTempTable(tempViewName);
}
/// <summary>
/// Returns true if the table is currently cached in-memory.
/// </summary>
/// <param name="tableName">Name of the table</param>
public bool IsCached(string tableName)
{
return catalogProxy.IsCached(tableName);
}
/// <summary>
/// Caches the specified table in-memory.
/// </summary>
/// <param name="tableName">Name of the table</param>
public void CacheTable(string tableName)
{
catalogProxy.CacheTable(tableName);
}
/// <summary>
/// Removes the specified table from the in-memory cache.
/// </summary>
/// <param name="tableName">Name of the table</param>
public void UnCacheTable(string tableName)
{
catalogProxy.UnCacheTable(tableName);
}
/// <summary>
/// Invalidate and refresh all the cached metadata of the given table. For performance reasons,
/// Spark SQL or the external data source library it uses might cache certain metadata about a
/// table, such as the location of blocks.When those change outside of Spark SQL, users should
/// call this function to invalidate the cache.
/// If this table is cached as an InMemoryRelation, drop the original cached version and make the
/// new version cached lazily.
/// </summary>
/// <param name="tableName">Name of the table</param>
public void RefreshTable(string tableName)
{
catalogProxy.RefreshTable(tableName);
}
/// <summary>
/// Removes all cached tables from the in-memory cache.
/// </summary>
public void ClearCache()
{
catalogProxy.ClearCache();
}
/// <summary>
/// Creates an external table from the given path and returns the corresponding DataFrame.
/// It will use the default data source configured by spark.sql.sources.default.
/// </summary>
/// <param name="tableName">Name of the table</param>
/// <param name="path">Path to table</param>
public DataFrame CreateExternalTable(string tableName, string path)
{
return catalogProxy.CreateExternalTable(tableName, path);
}
/// <summary>
/// Creates an external table from the given path on a data source and returns DataFrame
/// </summary>
/// <param name="tableName">Name of the table</param>
/// <param name="path">Path to table</param>
/// <param name="source">Data source</param>
public DataFrame CreateExternalTable(string tableName, string path, string source)
{
return catalogProxy.CreateExternalTable(tableName, path, source);
}
/// <summary>
/// Creates an external table from the given path based on a data source and a set of options.
/// Then, returns the corresponding DataFrame.
/// </summary>
/// <param name="tableName">Name of the table</param>
/// <param name="source">Data source</param>
/// <param name="options">Options to create table</param>
/// <returns></returns>
public DataFrame CreateExternalTable(string tableName, string source, Dictionary<string, string> options)
{
return catalogProxy.CreateExternalTable(tableName, source, options);
}
/// <summary>
/// Create an external table from the given path based on a data source, a schema and
/// a set of options.Then, returns the corresponding DataFrame.
/// </summary>
/// <param name="tableName">Name of the table</param>
/// <param name="source">Data source</param>
/// <param name="schema">Schema of the table</param>
/// <param name="options">Options to create table</param>
/// <returns></returns>
public DataFrame CreateExternalTable(string tableName, string source, StructType schema, Dictionary<string, string> options)
{
return catalogProxy.CreateExternalTable(tableName, source, schema, options);
}
}
/// <summary>
/// A database in Spark
/// </summary>
public class Database
{
/// <summary>
/// Name of the database
/// </summary>
public string Name { get; internal set; }
/// <summary>
/// Desciption for the database
/// </summary>
public string Description { get; internal set; }
/// <summary>
/// Location of the database
/// </summary>
public string LocationUri { get; internal set; }
}
/// <summary>
/// A table in Spark
/// </summary>
public class Table
{
/// <summary>
/// Name of the table
/// </summary>
public string Name { get; internal set; }
/// <summary>
/// Name of the database Table belongs to
/// </summary>
public string Database { get; internal set; }
/// <summary>
/// Description of the table
/// </summary>
public string Description { get; internal set; }
/// <summary>
/// Type of the table (table, view)
/// </summary>
public string TableType { get; internal set; }
/// <summary>
/// Whether the table is a temporary table
/// </summary>
public bool IsTemporary { get; internal set; }
}
/// <summary>
/// A column in Spark
/// </summary>
public class Column
{
/// <summary>
/// Name of the column
/// </summary>
public string Name { get; internal set; }
/// <summary>
/// Datatype of the column
/// </summary>
public string DataType { get; internal set; }
/// <summary>
/// Description of the column
/// </summary>
public string Description { get; internal set; }
/// <summary>
/// Whether the column value can be null
/// </summary>
public bool IsNullable { get; internal set; }
/// <summary>
/// Whether the column is a partition column.
/// </summary>
public bool IsPartition { get; internal set; }
/// <summary>
/// Whether the column is a bucket column.
/// </summary>
public bool IsBucket { get; internal set; }
}
/// <summary>
/// A user-defined function in Spark
/// </summary>
public class Function
{
/// <summary>
/// Name of the column
/// </summary>
public string Name { get; internal set; }
/// <summary>
/// Name of the database
/// </summary>
public string Database { get; internal set; }
/// <summary>
/// Description of the function
/// </summary>
public string Description { get; internal set; }
/// <summary>
/// Fully qualified class name of the function
/// </summary>
public string ClassName { get; internal set; }
/// <summary>
/// Whether the function is a temporary function or not.
/// </summary>
public bool IsTemporary { get; internal set; }
}
}

Просмотреть файл

@ -170,11 +170,17 @@ namespace Microsoft.Spark.CSharp.Sql
return Rdd.Collect(port).Cast<Row>();
}
//TODO - add this method if needed to convert Row to collection of T
//public IEnumerable<T> Collect<T>()
//{
// throw new NotImplementedException();
//}
/// <summary>
/// Converts the DataFrame to RDD of Row
/// </summary>
/// <returns>resulting RDD</returns>
public RDD<Row> ToRDD() //RDD created using byte representation of Row objects
/// Converts the DataFrame to RDD of Row
/// </summary>
/// <returns>resulting RDD</returns>
public RDD<Row> ToRDD() //RDD created using byte representation of Row objects
{
return Rdd;
}

Просмотреть файл

@ -0,0 +1,135 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Proxy;
namespace Microsoft.Spark.CSharp.Sql
{
/// <summary>
/// Dataset is a strongly typed collection of domain-specific objects that can be transformed
/// in parallel using functional or relational operations.Each Dataset also has an untyped view
/// called a DataFrame, which is a Dataset of Row.
/// </summary>
public class Dataset
{
IDatasetProxy datasetProxy;
internal Dataset(IDatasetProxy datasetProxy)
{
this.datasetProxy = datasetProxy;
}
/// <summary>
/// Selects column based on the column name
/// </summary>
/// <param name="columnName">Name of the column</param>
/// <returns></returns>
public Column this[string columnName]
{
get { return ToDF()[columnName]; }
}
private DataFrame dataFrame;
/// <summary>
/// Converts this strongly typed collection of data to generic Dataframe. In contrast to the
/// strongly typed objects that Dataset operations work on, a Dataframe returns generic[[Row]]
/// objects that allow fields to be accessed by ordinal or name.
/// </summary>
/// <returns>DataFrame created from Dataset</returns>
public DataFrame ToDF()
{
return dataFrame ?? (dataFrame = new DataFrame(datasetProxy.ToDF(), SparkContext.GetActiveSparkContext()));
}
/// <summary>
/// Prints the schema to the console in a nice tree format.
/// </summary>
public void PrintSchema()
{
ToDF().ShowSchema();
}
/// <summary>
/// Prints the plans (logical and physical) to the console for debugging purposes.
/// </summary>
/// <param name="extended"></param>
public void Explain(bool extended)
{
ToDF().Explain(extended);
}
/// <summary>
/// Prints the physical plan to the console for debugging purposes.
/// </summary>
public void Explain()
{
ToDF().Explain();
}
/// <summary>
/// Returns all column names and their data types as an array.
/// </summary>
public IEnumerable<Tuple<string, string>> DTypes()
{
return ToDF().DTypes();
}
/// <summary>
/// Returns all column names as an array.
/// </summary>
public IEnumerable<string> Columns()
{
return ToDF().Columns();
}
/// <summary>
/// Displays the top 20 rows of Dataset in a tabular form. Strings more than 20 characters
/// will be truncated, and all cells will be aligned right.
/// </summary>
/// <param name="numberOfRows">Number of rows - default is 20</param>
/// <param name="truncate">Indicates if rows with more than 20 characters to be truncated</param>
public void Show(int numberOfRows = 20, bool truncate = true)
{
ToDF().Show(numberOfRows, truncate);
}
/// <summary>
/// Prints schema
/// </summary>
public void ShowSchema()
{
ToDF().ShowSchema();
}
}
/// <summary>
/// Dataset of specific types
/// </summary>
/// <typeparam name="T">Type parameter</typeparam>
public class Dataset<T> : Dataset
{
internal Dataset(IDatasetProxy datasetProxy): base(datasetProxy) {}
/************************************************************
* Would it be useful to expose methods like the following?
* It would offer static type checking at the cost of runtime optimizations
* because C# functionality need to execute in CLR
************************************************************
public Dataset<T> Filter(Func<T, bool> func)
{
throw new NotImplementedException();
}
public Dataset<U> Map<U>(Func<T, U> mapFunc)
{
throw new NotImplementedException();
}
*/
}
}

Просмотреть файл

@ -7,7 +7,8 @@ using Microsoft.Spark.CSharp.Proxy;
namespace Microsoft.Spark.CSharp.Sql
{
/// <summary>
/// A variant of Spark SQL that integrates with data stored in Hive.
/// HiveContext is deprecated. Use SparkSession.Builder().EnableHiveSupport()
/// HiveContext is a variant of Spark SQL that integrates with data stored in Hive.
/// Configuration for Hive is read from hive-site.xml on the classpath.
/// It supports running both SQL and HiveQL commands.
/// </summary>
@ -17,14 +18,22 @@ namespace Microsoft.Spark.CSharp.Sql
/// Creates a HiveContext
/// </summary>
/// <param name="sparkContext"></param>
public HiveContext(SparkContext sparkContext)
public HiveContext(SparkContext sparkContext)
: base(sparkContext, sparkContext.SparkContextProxy.CreateHiveContext())
{
}
{ }
internal HiveContext(SparkContext sparkContext, ISqlContextProxy sqlContextProxy)
: base(sparkContext, sqlContextProxy)
{ }
/// <summary>
/// Executes a SQL query using Spark, returning the result as a DataFrame. The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'
/// </summary>
/// <param name="sqlQuery"></param>
/// <returns></returns>
public new DataFrame Sql(string sqlQuery)
{
return new DataFrame(SqlContextProxy.Sql(sqlQuery), sparkContext);
}
/// <summary>

Просмотреть файл

@ -0,0 +1,140 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.Remoting.Contexts;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Services;
using Microsoft.Spark.CSharp.Sql.Catalog;
namespace Microsoft.Spark.CSharp.Sql
{
/// <summary>
/// The entry point to programming Spark with the Dataset and DataFrame API.
/// </summary>
public class SparkSession
{
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(SparkSession));
private ISparkSessionProxy sparkSessionProxy;
private readonly SparkContext sparkContext;
internal ISparkSessionProxy SparkSessionProxy
{
get { return sparkSessionProxy; }
//setter is used only for testing...//TODO - refactor
set { sparkSessionProxy = value; }
}
private Catalog.Catalog catalog;
/// <summary>
/// Interface through which the user may create, drop, alter or query underlying
/// databases, tables, functions etc.
/// </summary>
public Catalog.Catalog Catalog
{
get { return catalog ?? (catalog = new Catalog.Catalog(SparkSessionProxy.GetCatalog())); }
}
internal SparkContext SparkContext
{
get { return sparkContext; }
}
/// <summary>
/// Builder for SparkSession
/// </summary>
public static Builder Builder()
{
return new Builder();
}
internal SparkSession(SparkContext sparkContext)
{
sparkSessionProxy = sparkContext.SparkContextProxy.CreateSparkSession();
this.sparkContext = sparkContext;
}
internal SparkSession(ISparkSessionProxy sparkSessionProxy)
{
this.sparkSessionProxy = sparkSessionProxy;
}
/// <summary>
/// Start a new session with isolated SQL configurations, temporary tables, registered
/// functions are isolated, but sharing the underlying [[SparkContext]] and cached data.
/// Note: Other than the [[SparkContext]], all shared state is initialized lazily.
/// This method will force the initialization of the shared state to ensure that parent
/// and child sessions are set up with the same shared state. If the underlying catalog
/// implementation is Hive, this will initialize the metastore, which may take some time.
/// </summary>
public SparkSession NewSession()
{
return new SparkSession(sparkSessionProxy.NewSession());
}
/// <summary>
/// Stop underlying SparkContext
/// </summary>
public void Stop()
{
sparkSessionProxy.Stop();
}
/// <summary>
/// Returns a DataFrameReader that can be used to read non-streaming data in as a DataFrame
/// </summary>
/// <returns></returns>
public DataFrameReader Read()
{
logger.LogInfo("Using DataFrameReader to read input data from external data source");
return new DataFrameReader(sparkSessionProxy.Read(), sparkContext);
}
/// <summary>
/// Creates a <see cref="DataFrame"/> from a RDD containing array of object using the given schema.
/// </summary>
/// <param name="rdd">RDD containing array of object. The array acts as a row and items within the array act as columns which the schema is specified in <paramref name="schema"/>. </param>
/// <param name="schema">The schema of DataFrame.</param>
/// <returns></returns>
public DataFrame CreateDataFrame(RDD<object[]> rdd, StructType schema)
{
// Note: This is for pickling RDD, convert to RDD<byte[]> which happens in CSharpWorker.
// The below sqlContextProxy.CreateDataFrame() will call byteArrayRDDToAnyArrayRDD() of SQLUtils.scala which only accept RDD of type RDD[Array[Byte]].
// In byteArrayRDDToAnyArrayRDD() of SQLUtils.scala, the SerDeUtil.pythonToJava() will be called which is a mapPartitions inside.
// It will be executed until the CSharpWorker finishes Pickling to RDD[Array[Byte]].
var rddRow = rdd.Map(r => r);
rddRow.serializedMode = SerializedMode.Row;
return new DataFrame(sparkSessionProxy.CreateDataFrame(rddRow.RddProxy, schema.StructTypeProxy), sparkContext);
}
/// <summary>
/// Returns the specified table as a <see cref="DataFrame"/>
/// </summary>
/// <param name="tableName"></param>
/// <returns></returns>
public DataFrame Table(string tableName)
{
return new DataFrame(sparkSessionProxy.Table(tableName), sparkContext);
}
/// <summary>
/// Executes a SQL query using Spark, returning the result as a DataFrame. The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'
/// </summary>
/// <param name="sqlQuery"></param>
/// <returns></returns>
public DataFrame Sql(string sqlQuery)
{
logger.LogInfo("SQL query to execute on the dataframe is {0}", sqlQuery);
return new DataFrame(sparkSessionProxy.Sql(sqlQuery), sparkContext);
}
}
}

Просмотреть файл

@ -18,22 +18,45 @@ namespace Microsoft.Spark.CSharp.Sql
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(SqlContext));
private readonly ISqlContextProxy sqlContextProxy;
private readonly SparkContext sparkContext;
protected readonly SparkContext sparkContext;
internal ISqlContextProxy SqlContextProxy { get { return sqlContextProxy; } }
private static SqlContext instance;
private SparkSession sparkSession;
private bool isRootContext;
/// <summary>
/// Underlying SparkSession
/// </summary>
public SparkSession SparkSession
{
get { return sparkSession; }
}
internal SqlContext(SparkSession sparkSession, bool isRootContext)
{
this.sparkSession = sparkSession;
this.sparkContext = sparkSession.SparkContext;
this.sqlContextProxy = sparkSession.SparkSessionProxy.SqlContextProxy;
this.isRootContext = isRootContext;
if (instance == null) instance = this;
}
internal SqlContext(SparkSession sparkSession) : this(sparkSession, true)
{ }
/// <summary>
/// Creates a SqlContext
/// </summary>
/// <param name="sparkContext"></param>
public SqlContext(SparkContext sparkContext)
public SqlContext(SparkContext sparkContext) : this(new SparkSession(sparkContext))
{
sqlContextProxy = sparkSession.SparkSessionProxy.SqlContextProxy;
this.sparkContext = sparkContext;
sqlContextProxy = sparkContext.SparkContextProxy.CreateSqlContext();
if (instance == null) instance = this;
}
//TODO - remove this constructor after fixing unit tests that reference this
internal SqlContext(SparkContext sparkContext, ISqlContextProxy sqlContextProxy)
{
this.sparkContext = sparkContext;
@ -50,7 +73,7 @@ namespace Microsoft.Spark.CSharp.Sql
{
if (instance == null)
{
return new SqlContext(sparkContext);
instance = new SqlContext(sparkContext);
}
return instance;
}
@ -62,8 +85,7 @@ namespace Microsoft.Spark.CSharp.Sql
/// <returns></returns>
public SqlContext NewSession()
{
var newSessionProxy = sqlContextProxy.NewSession();
return new SqlContext(this.sparkContext, newSessionProxy);
return new SqlContext(sparkSession.NewSession());
}
/// <summary>
@ -75,7 +97,7 @@ namespace Microsoft.Spark.CSharp.Sql
/// <returns></returns>
public string GetConf(string key, string defaultValue)
{
return sqlContextProxy.GetConf(key, defaultValue);
return SparkSession.SparkSessionProxy.SqlContextProxy.GetConf(key, defaultValue);
}
/// <summary>
@ -85,7 +107,7 @@ namespace Microsoft.Spark.CSharp.Sql
/// <param name="value"></param>
public void SetConf(string key, string value)
{
sqlContextProxy.SetConf(key, value);
SparkSession.SparkSessionProxy.SqlContextProxy.SetConf(key, value);
}
/// <summary>
@ -155,7 +177,7 @@ namespace Microsoft.Spark.CSharp.Sql
/// <returns></returns>
public DataFrame Table(string tableName)
{
return new DataFrame(sqlContextProxy.Table(tableName), sparkContext);
return SparkSession.Table(tableName);
}
/// <summary>
@ -230,7 +252,7 @@ namespace Microsoft.Spark.CSharp.Sql
public DataFrame Sql(string sqlQuery)
{
logger.LogInfo("SQL query to execute on the dataframe is {0}", sqlQuery);
return new DataFrame(sqlContextProxy.Sql(sqlQuery), sparkContext);
return SparkSession.Sql(sqlQuery);
}
/// <summary>

Просмотреть файл

@ -1992,6 +1992,11 @@
<param name="key">Key to use</param>
<param name="defaultValue">Default value to use</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Core.SparkConf.GetAll">
<summary>
Get all parameters as a list of pairs
</summary>
</member>
<member name="T:Microsoft.Spark.CSharp.Core.SparkContext">
<summary>
Main entry point for Spark functionality. A SparkContext represents the
@ -2072,6 +2077,17 @@
<param name="sparkContextProxy"></param>
<param name="conf"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Core.SparkContext.GetOrCreate(Microsoft.Spark.CSharp.Core.SparkConf)">
<summary>
This function may be used to get or instantiate a SparkContext and register it as a
singleton object. Because we can only have one active SparkContext per JVM,
this is useful when applications may wish to share a SparkContext.
Note: This function cannot be used to create multiple SparkContext instances
even if multiple contexts are allowed.
</summary>
<param name="conf"></param>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Core.SparkContext.TextFile(System.String,System.Int32)">
<summary>
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
@ -4282,6 +4298,12 @@
Right now it just prints out the messages to Console
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Services.DefaultLoggerService.IsDebugEnabled">
<summary>
Gets a value indicating whether logging is enabled for the Debug level.
Always return true for the DefaultLoggerService object.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Services.DefaultLoggerService.GetLoggerInstance(System.Type)">
<summary>
Get an instance of ILoggerService by a given type of logger
@ -4365,6 +4387,11 @@
Defines a logger what be used in service
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Services.ILoggerService.IsDebugEnabled">
<summary>
Gets a value indicating whether logging is enabled for the Debug level.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Services.ILoggerService.GetLoggerInstance(System.Type)">
<summary>
Get an instance of ILoggerService by a given type of logger
@ -4459,6 +4486,11 @@
</summary>
<param name="type">The type of the logger</param>
</member>
<member name="P:Microsoft.Spark.CSharp.Services.Log4NetLoggerService.IsDebugEnabled">
<summary>
Gets a value indicating whether logging is enabled for the Debug level.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Services.Log4NetLoggerService.LogDebug(System.String)">
<summary>
Logs a message at debug level.
@ -4561,6 +4593,312 @@
</summary>
<returns></returns>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.Builder">
<summary>
The entry point to programming Spark with the Dataset and DataFrame API.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Builder.Master(System.String)">
<summary>
Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to
run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
</summary>
<param name="master">Master URL</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Builder.AppName(System.String)">
<summary>
Sets a name for the application, which will be shown in the Spark web UI.
If no application name is set, a randomly generated name will be used.
</summary>
<param name="appName">Name of the app</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Builder.Config(System.String,System.String)">
<summary>
Sets a config option. Options set using this method are automatically propagated to
both SparkConf and SparkSession's own configuration.
</summary>
<param name="key">Key for the configuration</param>
<param name="value">value of the configuration</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Builder.Config(System.String,System.Boolean)">
<summary>
Sets a config option. Options set using this method are automatically propagated to
both SparkConf and SparkSession's own configuration.
</summary>
<param name="key">Key for the configuration</param>
<param name="value">value of the configuration</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Builder.Config(System.String,System.Double)">
<summary>
Sets a config option. Options set using this method are automatically propagated to
both SparkConf and SparkSession's own configuration.
</summary>
<param name="key">Key for the configuration</param>
<param name="value">value of the configuration</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Builder.Config(System.String,System.Int64)">
<summary>
Sets a config option. Options set using this method are automatically propagated to
both SparkConf and SparkSession's own configuration.
</summary>
<param name="key">Key for the configuration</param>
<param name="value">value of the configuration</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Builder.Config(Microsoft.Spark.CSharp.Core.SparkConf)">
<summary>
Sets a list of config options based on the given SparkConf
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Builder.EnableHiveSupport">
<summary>
Enables Hive support, including connectivity to a persistent Hive metastore, support for
Hive serdes, and Hive user-defined functions.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Builder.GetOrCreate">
<summary>
Gets an existing [[SparkSession]] or, if there is no existing one, creates a new
one based on the options set in this builder.
</summary>
<returns></returns>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.Catalog.Catalog">
<summary>
Catalog interface for Spark.
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.CurrentDatabase">
<summary>
Returns the current default database in this session.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.ListDatabases">
<summary>
Returns a list of databases available across all sessions.
</summary>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.ListTables(System.String)">
<summary>
Returns a list of tables in the current database or given database
This includes all temporary tables.
</summary>
<param name="dbName">Optional database name. If not provided, current database is used</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.ListColumns(System.String,System.String)">
<summary>
Returns a list of columns for the given table in the current database or
the given temporary table.
</summary>
<param name="tableName">Name of the table</param>
<param name="dbName">Name of the database. If database is not provided, current database is used</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.ListFunctions(System.String)">
<summary>
Returns a list of functions registered in the specified database.
This includes all temporary functions
</summary>
<param name="dbName">Name of the database. If database is not provided, current database is used</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.SetCurrentDatabase(System.String)">
<summary>
Sets the current default database in this session.
</summary>
<param name="dbName">Name of database</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.DropTempView(System.String)">
<summary>
Drops the temporary view with the given view name in the catalog.
If the view has been cached before, then it will also be uncached.
</summary>
<param name="tempViewName">Name of the table</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.IsCached(System.String)">
<summary>
Returns true if the table is currently cached in-memory.
</summary>
<param name="tableName">Name of the table</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.CacheTable(System.String)">
<summary>
Caches the specified table in-memory.
</summary>
<param name="tableName">Name of the table</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.UnCacheTable(System.String)">
<summary>
Removes the specified table from the in-memory cache.
</summary>
<param name="tableName">Name of the table</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.RefreshTable(System.String)">
<summary>
Invalidate and refresh all the cached metadata of the given table. For performance reasons,
Spark SQL or the external data source library it uses might cache certain metadata about a
table, such as the location of blocks.When those change outside of Spark SQL, users should
call this function to invalidate the cache.
If this table is cached as an InMemoryRelation, drop the original cached version and make the
new version cached lazily.
</summary>
<param name="tableName">Name of the table</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.ClearCache">
<summary>
Removes all cached tables from the in-memory cache.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.CreateExternalTable(System.String,System.String)">
<summary>
Creates an external table from the given path and returns the corresponding DataFrame.
It will use the default data source configured by spark.sql.sources.default.
</summary>
<param name="tableName">Name of the table</param>
<param name="path">Path to table</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.CreateExternalTable(System.String,System.String,System.String)">
<summary>
Creates an external table from the given path on a data source and returns DataFrame
</summary>
<param name="tableName">Name of the table</param>
<param name="path">Path to table</param>
<param name="source">Data source</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.CreateExternalTable(System.String,System.String,System.Collections.Generic.Dictionary{System.String,System.String})">
<summary>
Creates an external table from the given path based on a data source and a set of options.
Then, returns the corresponding DataFrame.
</summary>
<param name="tableName">Name of the table</param>
<param name="source">Data source</param>
<param name="options">Options to create table</param>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Catalog.Catalog.CreateExternalTable(System.String,System.String,Microsoft.Spark.CSharp.Sql.StructType,System.Collections.Generic.Dictionary{System.String,System.String})">
<summary>
Create an external table from the given path based on a data source, a schema and
a set of options.Then, returns the corresponding DataFrame.
</summary>
<param name="tableName">Name of the table</param>
<param name="source">Data source</param>
<param name="schema">Schema of the table</param>
<param name="options">Options to create table</param>
<returns></returns>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.Catalog.Database">
<summary>
A database in Spark
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Database.Name">
<summary>
Name of the database
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Database.Description">
<summary>
Desciption for the database
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Database.LocationUri">
<summary>
Location of the database
</summary>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.Catalog.Table">
<summary>
A table in Spark
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Table.Name">
<summary>
Name of the table
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Table.Database">
<summary>
Name of the database Table belongs to
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Table.Description">
<summary>
Description of the table
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Table.TableType">
<summary>
Type of the table (table, view)
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Table.IsTemporary">
<summary>
Whether the table is a temporary table
</summary>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.Catalog.Column">
<summary>
A column in Spark
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Column.Name">
<summary>
Name of the column
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Column.DataType">
<summary>
Datatype of the column
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Column.Description">
<summary>
Description of the column
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Column.IsNullable">
<summary>
Whether the column value can be null
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Column.IsPartition">
<summary>
Whether the column is a partition column.
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Column.IsBucket">
<summary>
Whether the column is a bucket column.
</summary>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.Catalog.Function">
<summary>
A user-defined function in Spark
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Function.Name">
<summary>
Name of the column
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Function.Database">
<summary>
Name of the database
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Function.Description">
<summary>
Description of the function
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Function.ClassName">
<summary>
Fully qualified class name of the function
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Catalog.Function.IsTemporary">
<summary>
Whether the function is a temporary function or not.
</summary>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.Column">
<summary>
A column that will be computed based on the data in a DataFrame.
@ -5769,9 +6107,77 @@
Format("parquet").Save(path)
</summary>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.Dataset">
<summary>
Dataset is a strongly typed collection of domain-specific objects that can be transformed
in parallel using functional or relational operations.Each Dataset also has an untyped view
called a DataFrame, which is a Dataset of Row.
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.Dataset.Item(System.String)">
<summary>
Selects column based on the column name
</summary>
<param name="columnName">Name of the column</param>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Dataset.ToDF">
<summary>
Converts this strongly typed collection of data to generic Dataframe. In contrast to the
strongly typed objects that Dataset operations work on, a Dataframe returns generic[[Row]]
objects that allow fields to be accessed by ordinal or name.
</summary>
<returns>DataFrame created from Dataset</returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Dataset.PrintSchema">
<summary>
Prints the schema to the console in a nice tree format.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Dataset.Explain(System.Boolean)">
<summary>
Prints the plans (logical and physical) to the console for debugging purposes.
</summary>
<param name="extended"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Dataset.Explain">
<summary>
Prints the physical plan to the console for debugging purposes.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Dataset.DTypes">
<summary>
Returns all column names and their data types as an array.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Dataset.Columns">
<summary>
Returns all column names as an array.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Dataset.Show(System.Int32,System.Boolean)">
<summary>
Displays the top 20 rows of Dataset in a tabular form. Strings more than 20 characters
will be truncated, and all cells will be aligned right.
</summary>
<param name="numberOfRows">Number of rows - default is 20</param>
<param name="truncate">Indicates if rows with more than 20 characters to be truncated</param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.Dataset.ShowSchema">
<summary>
Prints schema
</summary>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.Dataset`1">
<summary>
Dataset of specific types
</summary>
<typeparam name="T">Type parameter</typeparam>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.HiveContext">
<summary>
A variant of Spark SQL that integrates with data stored in Hive.
HiveContext is deprecated. Use SparkSession.Builder().EnableHiveSupport()
HiveContext is a variant of Spark SQL that integrates with data stored in Hive.
Configuration for Hive is read from hive-site.xml on the classpath.
It supports running both SQL and HiveQL commands.
</summary>
@ -5782,6 +6188,13 @@
</summary>
<param name="sparkContext"></param>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.HiveContext.Sql(System.String)">
<summary>
Executes a SQL query using Spark, returning the result as a DataFrame. The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'
</summary>
<param name="sqlQuery"></param>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.HiveContext.RefreshTable(System.String)">
<summary>
Invalidate and refresh all the cached the metadata of the given table.
@ -6578,12 +6991,76 @@
<param name="mode">The given SaveMode</param>
<returns>The string that represents the given SaveMode</returns>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.SparkSession">
<summary>
The entry point to programming Spark with the Dataset and DataFrame API.
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.SparkSession.Catalog">
<summary>
Interface through which the user may create, drop, alter or query underlying
databases, tables, functions etc.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.SparkSession.Builder">
<summary>
Builder for SparkSession
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.SparkSession.NewSession">
<summary>
Start a new session with isolated SQL configurations, temporary tables, registered
functions are isolated, but sharing the underlying [[SparkContext]] and cached data.
Note: Other than the [[SparkContext]], all shared state is initialized lazily.
This method will force the initialization of the shared state to ensure that parent
and child sessions are set up with the same shared state. If the underlying catalog
implementation is Hive, this will initialize the metastore, which may take some time.
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.SparkSession.Stop">
<summary>
Stop underlying SparkContext
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.SparkSession.Read">
<summary>
Returns a DataFrameReader that can be used to read non-streaming data in as a DataFrame
</summary>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.SparkSession.CreateDataFrame(Microsoft.Spark.CSharp.Core.RDD{System.Object[]},Microsoft.Spark.CSharp.Sql.StructType)">
<summary>
Creates a <see cref="T:Microsoft.Spark.CSharp.Sql.DataFrame"/> from a RDD containing array of object using the given schema.
</summary>
<param name="rdd">RDD containing array of object. The array acts as a row and items within the array act as columns which the schema is specified in <paramref name="schema"/>. </param>
<param name="schema">The schema of DataFrame.</param>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.SparkSession.Table(System.String)">
<summary>
Returns the specified table as a <see cref="T:Microsoft.Spark.CSharp.Sql.DataFrame"/>
</summary>
<param name="tableName"></param>
<returns></returns>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.SparkSession.Sql(System.String)">
<summary>
Executes a SQL query using Spark, returning the result as a DataFrame. The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'
</summary>
<param name="sqlQuery"></param>
<returns></returns>
</member>
<member name="T:Microsoft.Spark.CSharp.Sql.SqlContext">
<summary>
The entry point for working with structured data (rows and columns) in Spark.
Allows the creation of [[DataFrame]] objects as well as the execution of SQL queries.
</summary>
</member>
<member name="P:Microsoft.Spark.CSharp.Sql.SqlContext.SparkSession">
<summary>
Underlying SparkSession
</summary>
</member>
<member name="M:Microsoft.Spark.CSharp.Sql.SqlContext.#ctor(Microsoft.Spark.CSharp.Core.SparkContext)">
<summary>
Creates a SqlContext

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -65,10 +65,12 @@
<Otherwise />
</Choose>
<ItemGroup>
<Compile Include="BuilderTest.cs" />
<Compile Include="ByteBufChunkListTest.cs" />
<Compile Include="ByteBufChunkTest.cs" />
<Compile Include="ByteBufPoolTest.cs" />
<Compile Include="ByteBufTest.cs" />
<Compile Include="CatalogTest.cs" />
<Compile Include="ColumnTest.cs" />
<Compile Include="AccumulatorTest.cs" />
<Compile Include="BroadcastTest.cs" />
@ -76,6 +78,7 @@
<Compile Include="DataFrameNaFunctionsTest.cs" />
<Compile Include="DataFrameReaderTest.cs" />
<Compile Include="DataFrameWriterTest.cs" />
<Compile Include="DatasetTest.cs" />
<Compile Include="EventHubsUtilsTest.cs" />
<Compile Include="HadoopConfigurationTest.cs" />
<Compile Include="JsonSerDeTest.cs" />
@ -83,6 +86,7 @@
<Compile Include="Mocks\MockDataFrameReaderProxy.cs" />
<Compile Include="Mocks\MockRDDCollector.cs" />
<Compile Include="Mocks\MockRow.cs" />
<Compile Include="Mocks\MockSparkSessionProxy.cs" />
<Compile Include="PayloadHelperTest.cs" />
<Compile Include="PriorityQueueTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
@ -91,6 +95,7 @@
<Compile Include="SocketWrapperTest.cs" />
<Compile Include="SerDeTest.cs" />
<Compile Include="HiveContextTest.cs" />
<Compile Include="SparkSessionTest.cs" />
<Compile Include="StatusTrackerTest.cs" />
<Compile Include="TestWithMoqDemo.cs" />
<Compile Include="Mocks\MockStructTypeProxy.cs" />

Просмотреть файл

@ -0,0 +1,50 @@
using System;
using Microsoft.Spark.CSharp.Sql;
using NUnit.Framework;
namespace AdapterTest
{
[TestFixture]
public class BuilderTest
{
[Test]
public void TestMaster()
{
var builder = new Builder();
builder.Master("test");
Assert.AreEqual("test", builder.options["spark.master"]);
}
[Test]
public void TestAppName()
{
var builder = new Builder();
builder.AppName("test");
Assert.AreEqual("test", builder.options["spark.app.name"]);
}
[Test]
public void TestBoolConfig()
{
var builder = new Builder();
builder.Config("boolvalue", true);
Assert.True(builder.options["boolvalue"].Equals("true", StringComparison.InvariantCultureIgnoreCase));
}
[Test]
public void TestLongConfig()
{
var builder = new Builder();
builder.Config("longvalue", 3L);
Assert.True(builder.options["longvalue"].Equals("3", StringComparison.InvariantCultureIgnoreCase));
}
[Test]
public void TestDoubleConfig()
{
var builder = new Builder();
builder.Config("doublevalue", 3.5D);
Assert.True(builder.options["doublevalue"].Equals("3.5", StringComparison.InvariantCultureIgnoreCase));
}
}
}

Просмотреть файл

@ -91,6 +91,9 @@ namespace AdapterTest
[Test]
public void TestInvalidByteBuf()
{
// Test ByteBuf with error status.
var errorByteBuf = ByteBuf.NewErrorStatusByteBuf(10054);
Assert.AreEqual(10054, errorByteBuf.Status);
// Test invalid parameter to new ByteBuf.
Assert.Throws<ArgumentOutOfRangeException>(() => new ByteBuf(null, -1, 1024));
Assert.Throws<ArgumentOutOfRangeException>(() => new ByteBuf(null, 0, -1));

Просмотреть файл

@ -0,0 +1,212 @@
using System;
using System.Collections.Generic;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Sql;
using Microsoft.Spark.CSharp.Sql.Catalog;
using Moq;
using NUnit.Framework;
using NUnit.Framework.Internal;
using Column = Microsoft.Spark.CSharp.Sql.Catalog.Column;
namespace AdapterTest
{
[TestFixture]
public class CatalogTest
{
[Test]
public void TestCurrentCatalog()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
mockCatalogProxy.Setup(m => m.CurrentDatabase).Returns("currentdb");
var catalog = new Catalog(mockCatalogProxy.Object);
Assert.AreEqual("currentdb", catalog.CurrentDatabase);
}
[Test]
public void TestGetDatabasesList()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var mockDatasetProxy = new Mock<IDatasetProxy>();
var mockDataFrameProxy = new Mock<IDataFrameProxy>();
mockDatasetProxy.Setup(m => m.ToDF()).Returns(mockDataFrameProxy.Object);
mockCatalogProxy.Setup(m => m.ListDatabases()).Returns(new Dataset<Database>(mockDatasetProxy.Object));
var catalog = new Catalog(mockCatalogProxy.Object);
var databases = catalog.ListDatabases();
Assert.AreSame(mockDataFrameProxy.Object, databases.DataFrameProxy);
}
[Test]
public void TestGetTablesList()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var mockDatasetProxy = new Mock<IDatasetProxy>();
var mockDataFrameProxy = new Mock<IDataFrameProxy>();
mockDatasetProxy.Setup(m => m.ToDF()).Returns(mockDataFrameProxy.Object);
mockCatalogProxy.Setup(m => m.ListTables(It.IsAny<string>())).Returns(new Dataset<Table>(mockDatasetProxy.Object));
var catalog = new Catalog(mockCatalogProxy.Object);
var tables = catalog.ListTables();
Assert.AreSame(mockDataFrameProxy.Object, tables.DataFrameProxy);
}
[Test]
public void TestGetColumnsList()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var mockDatasetProxy = new Mock<IDatasetProxy>();
var mockDataFrameProxy = new Mock<IDataFrameProxy>();
mockDatasetProxy.Setup(m => m.ToDF()).Returns(mockDataFrameProxy.Object);
mockCatalogProxy.Setup(m => m.ListColumns(It.IsAny<string>(), It.IsAny<string>())).Returns(new Dataset<Column>(mockDatasetProxy.Object));
var catalog = new Catalog(mockCatalogProxy.Object);
var columns = catalog.ListColumns("dbname");
Assert.AreSame(mockDataFrameProxy.Object, columns.DataFrameProxy);
}
[Test]
public void TestGetFunctionsList()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var mockDatasetProxy = new Mock<IDatasetProxy>();
var mockDataFrameProxy = new Mock<IDataFrameProxy>();
mockDatasetProxy.Setup(m => m.ToDF()).Returns(mockDataFrameProxy.Object);
mockCatalogProxy.Setup(m => m.ListFunctions(It.IsAny<string>())).Returns(new Dataset<Function>(mockDatasetProxy.Object));
var catalog = new Catalog(mockCatalogProxy.Object);
var columns = catalog.ListFunctions("dbname");
Assert.AreSame(mockDataFrameProxy.Object, columns.DataFrameProxy);
}
[Test]
public void TestSetCurrentDatabase()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var catalog = new Catalog(mockCatalogProxy.Object);
catalog.SetCurrentDatabase("dbname");
mockCatalogProxy.Verify(m => m.SetCurrentDatabase("dbname"), Times.Once);
}
[Test]
public void TestDropTempTable()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var catalog = new Catalog(mockCatalogProxy.Object);
catalog.DropTempView("tablename");
mockCatalogProxy.Verify(m => m.DropTempTable("tablename"), Times.Once);
}
[Test]
public void TestIsCached()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var catalog = new Catalog(mockCatalogProxy.Object);
mockCatalogProxy.Setup(m => m.IsCached(It.IsAny<string>())).Returns(false);
var isCached = catalog.IsCached("tablename");
mockCatalogProxy.Verify(m => m.IsCached(It.IsAny<string>()), Times.Once);
Assert.False(isCached);
}
[Test]
public void TestCacheTable()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var catalog = new Catalog(mockCatalogProxy.Object);
catalog.CacheTable("tablename");
mockCatalogProxy.Verify(m => m.CacheTable("tablename"), Times.Once);
}
[Test]
public void TestUnCacheTable()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var catalog = new Catalog(mockCatalogProxy.Object);
catalog.UnCacheTable("tablename");
mockCatalogProxy.Verify(m => m.UnCacheTable("tablename"), Times.Once);
}
[Test]
public void TestRefreshTable()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var catalog = new Catalog(mockCatalogProxy.Object);
catalog.RefreshTable("tablename");
mockCatalogProxy.Verify(m => m.RefreshTable("tablename"), Times.Once);
}
[Test]
public void TestClearCache()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
var catalog = new Catalog(mockCatalogProxy.Object);
catalog.ClearCache();
mockCatalogProxy.Verify(m => m.ClearCache(), Times.Once);
}
[Test]
public void TestCreateExternalTable()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
DataFrame dataFrame = null;
mockCatalogProxy.Setup(m => m.CreateExternalTable(It.IsAny<string>(), It.IsAny<string>())).Returns(dataFrame);
var catalog = new Catalog(mockCatalogProxy.Object);
var df = catalog.CreateExternalTable("tableName", "path");
mockCatalogProxy.Verify(m => m.CreateExternalTable("tableName", "path"), Times.Once);
}
[Test]
public void TestCreateExternalTable2()
{
var mockCatalogProxy = new Mock<ICatalogProxy>();
DataFrame dataFrame = null;
mockCatalogProxy.Setup(m => m.CreateExternalTable(It.IsAny<string>(), It.IsAny<string>())).Returns(dataFrame);
var catalog = new Catalog(mockCatalogProxy.Object);
var df = catalog.CreateExternalTable("tableName", "path", "source");
mockCatalogProxy.Verify(m => m.CreateExternalTable("tableName", "path", "source"), Times.Once);
}
[Test]
public void TestDatabaseProperties()
{
var database = new Database {Description = "desc", Name = "name", LocationUri = "uri"};
Assert.AreEqual("desc", database.Description);
Assert.AreEqual("name", database.Name);
Assert.AreEqual("uri", database.LocationUri);
}
[Test]
public void TestTableProperties()
{
var table = new Table { Description = "desc", Name = "name", Database = "db", TableType = "type", IsTemporary = false};
Assert.AreEqual("desc", table.Description);
Assert.AreEqual("name", table.Name);
Assert.AreEqual("db", table.Database);
Assert.AreEqual("type", table.TableType);
Assert.False(table.IsTemporary);
}
[Test]
public void TestColumnProperties()
{
var column = new Column { Description = "desc", Name = "name", DataType = "dtype", IsNullable = true, IsPartition = false, IsBucket = true};
Assert.AreEqual("desc", column.Description);
Assert.AreEqual("name", column.Name);
Assert.AreEqual("dtype", column.DataType);
Assert.False(column.IsPartition);
Assert.True(column.IsNullable);
Assert.True(column.IsBucket);
}
[Test]
public void TestFunctionProperties()
{
var function = new Function { Description = "desc", Name = "name", Database = "db", ClassName = "classname", IsTemporary = false };
Assert.AreEqual("desc", function.Description);
Assert.AreEqual("name", function.Name);
Assert.AreEqual("db", function.Database);
Assert.AreEqual("classname", function.ClassName);
Assert.False(function.IsTemporary);
}
}
}

Просмотреть файл

@ -0,0 +1,150 @@
using System;
using System.Collections.Generic;
using System.Linq;
using AdapterTest.Mocks;
using Microsoft.Spark.CSharp.Interop;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Sql;
using Moq;
using NUnit.Framework;
namespace AdapterTest
{
[TestFixture]
public class DatasetTest
{
private static Mock<IDatasetProxy> mockDatasetProxy;
[OneTimeSetUp]
public static void ClassInitialize()
{
mockDatasetProxy = new Mock<IDatasetProxy>();
}
[SetUp]
public void TestInitialize()
{
mockDatasetProxy.Reset();
}
[TearDown]
public void TestCleanUp()
{
// Revert to use Static mock class to prevent blocking other test methods which uses static mock class
SparkCLREnvironment.SparkCLRProxy = new MockSparkCLRProxy();
}
[Test]
public void TestShow()
{
Mock<IDataFrameProxy> mockDataFrameProxy = new Mock<IDataFrameProxy>();
mockDataFrameProxy.Setup(m => m.GetShowString(It.IsAny<int>(), It.IsAny<bool>())).Returns("Show");
mockDatasetProxy.Setup(m => m.ToDF()).Returns(mockDataFrameProxy.Object);
var dataset = new Dataset(mockDatasetProxy.Object);
dataset.Show();
mockDataFrameProxy.Verify(m => m.GetShowString(20, true), Times.Once);
}
[Test]
public void TestExplain()
{
Mock<IDataFrameProxy> mockDataFrameProxy = new Mock<IDataFrameProxy>();
mockDataFrameProxy.Setup(m => m.GetQueryExecution()).Returns("Execution Plan");
mockDataFrameProxy.Setup(m => m.GetExecutedPlan()).Returns("Execution Plan");
mockDatasetProxy.Setup(m => m.ToDF()).Returns(mockDataFrameProxy.Object);
var dataset = new Dataset(mockDatasetProxy.Object);
dataset.Explain();
mockDataFrameProxy.Verify(m => m.GetQueryExecution(), Times.Once);
dataset.Explain(true);
mockDataFrameProxy.Verify(m => m.GetExecutedPlan(), Times.Once);
}
[Test]
public void TestSchema()
{
TestSchema(true);
TestSchema(false);
}
public void TestSchema(bool usePrintSchema)
{
var requestsSchema = new StructType(new List<StructField>
{
new StructField("test", new StringType(), false),
});
var jsonValue = requestsSchema.JsonValue.ToString();
Mock<IStructTypeProxy> mockStructTypeProxy = new Mock<IStructTypeProxy>();
mockStructTypeProxy.Setup(m => m.ToJson()).Returns(jsonValue);
Mock<IDataFrameProxy> mockDataFrameProxy = new Mock<IDataFrameProxy>();
mockDataFrameProxy.Setup(m => m.GetSchema()).Returns(mockStructTypeProxy.Object);
mockDatasetProxy.Setup(m => m.ToDF()).Returns(mockDataFrameProxy.Object);
var dataset = new Dataset(mockDatasetProxy.Object);
if (usePrintSchema)
dataset.PrintSchema();
else
dataset.ShowSchema();
mockDataFrameProxy.Verify(m => m.GetSchema(), Times.Once);
mockStructTypeProxy.Verify(m => m.ToJson(), Times.Once());
}
[Test]
public void TestColumns()
{
var requestsSchema = new StructType(new List<StructField>
{
new StructField("test", new StringType(), false),
});
var x = requestsSchema.JsonValue.ToString();
Mock<IStructTypeProxy> mockStructTypeProxy = new Mock<IStructTypeProxy>();
mockStructTypeProxy.Setup(m => m.ToJson()).Returns(x);
Mock<IStructFieldProxy> mockStructFieldProxy = new Mock<IStructFieldProxy>();
mockStructFieldProxy.Setup(m => m.GetStructFieldName()).Returns("testcol");
mockStructTypeProxy.Setup(m => m.GetStructTypeFields())
.Returns(new List<IStructFieldProxy>() {mockStructFieldProxy.Object});
Mock<IDataFrameProxy> mockDataFrameProxy = new Mock<IDataFrameProxy>();
mockDataFrameProxy.Setup(m => m.GetSchema()).Returns(mockStructTypeProxy.Object);
mockDatasetProxy.Setup(m => m.ToDF()).Returns(mockDataFrameProxy.Object);
var dataset = new Dataset(mockDatasetProxy.Object);
var columns = dataset.Columns();
Assert.AreEqual(1, columns.Count());
Assert.AreEqual("testcol", columns.First());
}
[Test]
public void TestDTypes()
{
var requestsSchema = new StructType(new List<StructField>
{
new StructField("test", new StringType(), false),
});
var x = requestsSchema.JsonValue.ToString();
Mock<IStructTypeProxy> mockStructTypeProxy = new Mock<IStructTypeProxy>();
mockStructTypeProxy.Setup(m => m.ToJson()).Returns(x);
Mock<IStructFieldProxy> mockStructFieldProxy = new Mock<IStructFieldProxy>();
mockStructFieldProxy.Setup(m => m.GetStructFieldName()).Returns("testcol");
Mock<IStructDataTypeProxy> mockStructDataTypeProxy = new Mock<IStructDataTypeProxy>();
mockStructDataTypeProxy.Setup(m => m.GetDataTypeSimpleString()).Returns("ss");
mockStructFieldProxy.Setup(m => m.GetStructFieldDataType()).Returns(mockStructDataTypeProxy.Object);
mockStructTypeProxy.Setup(m => m.GetStructTypeFields())
.Returns(new List<IStructFieldProxy>() { mockStructFieldProxy.Object });
Mock<IDataFrameProxy> mockDataFrameProxy = new Mock<IDataFrameProxy>();
mockDataFrameProxy.Setup(m => m.GetSchema()).Returns(mockStructTypeProxy.Object);
mockDatasetProxy.Setup(m => m.ToDF()).Returns(mockDataFrameProxy.Object);
var dataset = new Dataset(mockDatasetProxy.Object);
var dTypes = dataset.DTypes();
Assert.AreEqual(1, dTypes.Count());
var first = dTypes.First();
Assert.AreEqual("testcol", first.Item1);
Assert.AreEqual("ss", first.Item2);
}
}
}

Просмотреть файл

@ -48,12 +48,20 @@ namespace AdapterTest
var hiveContext = new HiveContext(new SparkContext("", ""));
Assert.IsNotNull((hiveContext.SqlContextProxy as MockSqlContextProxy).mockSqlContextReference);
}
[Test]
public void TestHiveContextSql()
{
mockSqlContextProxy.Setup(m => m.Sql(It.IsAny<string>()));
var hiveContext = new HiveContext(new SparkContext("", ""), mockSqlContextProxy.Object);
hiveContext.Sql("SELECT * FROM ABC");
mockSqlContextProxy.Verify(m => m.Sql("SELECT * FROM ABC"));
}
[Test]
public void TestHiveContextRefreshTable()
{
// arrange
var mockSparkContextProxy = new Mock<ISparkContextProxy>();
mockSqlContextProxy.Setup(m => m.RefreshTable(It.IsAny<string>()));
var hiveContext = new HiveContext(new SparkContext("", ""), mockSqlContextProxy.Object);

Просмотреть файл

@ -60,5 +60,10 @@ namespace AdapterTest.Mocks
}
return defaultValue;
}
public string GetSparkConfAsString()
{
throw new NotImplementedException();
}
}
}

Просмотреть файл

@ -293,19 +293,18 @@ namespace AdapterTest.Mocks
return new MockSparkConfProxy();
}
public ISqlContextProxy CreateSqlContext()
{
return new MockSqlContextProxy(this);
}
public ISqlContextProxy CreateHiveContext()
{
return new MockSqlContextProxy(this);
}
public IRDDProxy Parallelize(IEnumerable<byte[]> values, int numSlices)
{
return new MockRddProxy(null);
}
public ISparkSessionProxy CreateSparkSession()
{
return new MockSparkSessionProxy();
}
public ISqlContextProxy CreateHiveContext()
{
return new MockSqlContextProxy(this);
}
}
}

Просмотреть файл

@ -0,0 +1,53 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Proxy;
namespace AdapterTest.Mocks
{
class MockSparkSessionProxy : ISparkSessionProxy
{
public ISqlContextProxy SqlContextProxy { get { return new MockSqlContextProxy(new MockSparkContextProxy(new MockSparkConfProxy()));} }
public IUdfRegistration Udf { get; }
public ICatalogProxy GetCatalog()
{
throw new NotImplementedException();
}
public IDataFrameReaderProxy Read()
{
return new MockDataFrameReaderProxy(SqlContextProxy);
}
internal ISparkSessionProxy InjectedSparkSessionProxy { get; set; }
public ISparkSessionProxy NewSession()
{
return InjectedSparkSessionProxy;
}
public IDataFrameProxy CreateDataFrame(IRDDProxy rddProxy, IStructTypeProxy structTypeProxy)
{
throw new NotImplementedException();
}
public IDataFrameProxy Table(string tableName)
{
return new MockDataFrameProxy(new object[] { tableName }, null);
}
public IDataFrameProxy Sql(string query)
{
return new MockDataFrameProxy(new object[] {query}, null);
}
public void Stop()
{
throw new NotImplementedException();
}
}
}

Просмотреть файл

@ -69,11 +69,6 @@ namespace AdapterTest.Mocks
throw new NotImplementedException();
}
public ISqlContextProxy NewSession()
{
throw new NotImplementedException();
}
public string GetConf(string key, string defaultValue)
{
throw new NotImplementedException();

Просмотреть файл

@ -30,5 +30,5 @@ using System.Runtime.InteropServices;
// Build Number
// Revision
//
[assembly: AssemblyVersion("1.6.1.0")]
[assembly: AssemblyFileVersion("1.6.1.0")]
[assembly: AssemblyVersion("2.0")]
[assembly: AssemblyFileVersion("2.0")]

Просмотреть файл

@ -0,0 +1,30 @@
using System;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Sql;
using Moq;
using NUnit.Framework;
namespace AdapterTest
{
[TestFixture]
public class SparkSessionTest
{
[Test]
public void TestRead()
{
var mockSparkSessionProxy = new Mock<ISparkSessionProxy>();
var sparkSession = new SparkSession(mockSparkSessionProxy.Object);
var reader = sparkSession.Read();
mockSparkSessionProxy.Verify(m => m.Read(), Times.Once);
}
[Test]
public void TestStop()
{
var mockSparkSessionProxy = new Mock<ISparkSessionProxy>();
var sparkSession = new SparkSession(mockSparkSessionProxy.Object);
sparkSession.Stop();
mockSparkSessionProxy.Verify(m => m.Stop(), Times.Once);
}
}
}

Просмотреть файл

@ -61,15 +61,16 @@ namespace AdapterTest
public void TestSqlContextNewSession()
{
// arrange
var sessionProxy = new SqlContextIpcProxy(new JvmObjectReference("1"));
mockSqlContextProxy.Setup(m => m.NewSession()).Returns(sessionProxy);
var sqlContext = new SqlContext(new SparkContext("", ""), mockSqlContextProxy.Object);
var sparkSessionProxy = new Mock<ISparkSessionProxy>();
var newSparkSessionProxy = new Mock<ISparkSessionProxy>();
// act
var actualNewSession = sqlContext.NewSession();
sparkSessionProxy.Setup(m => m.NewSession()).Returns(newSparkSessionProxy.Object);
var sqlContext = new SqlContext(new SparkSession(sparkSessionProxy.Object));
var ns = sqlContext.NewSession();
// assert
Assert.AreEqual(sessionProxy, actualNewSession.SqlContextProxy);
sparkSessionProxy.Verify(m => m.NewSession());
}
[Test]
@ -79,9 +80,24 @@ namespace AdapterTest
const string key = "key";
const string value = "value";
mockSqlContextProxy.Setup(m => m.GetConf(key, "")).Returns(value);
var sqlContext = new SqlContext(new SparkContext("", ""), mockSqlContextProxy.Object);
var mockSparkContextProxy = new Mock<ISparkContextProxy>();
// act
var mockSparkSessionProxy = new Mock<ISparkSessionProxy>();
var mockCatalogProxy = new Mock<ICatalogProxy>();
mockCatalogProxy.Setup(m => m.RefreshTable(It.IsAny<string>()));
mockSparkSessionProxy.Setup(m => m.GetCatalog()).Returns(mockCatalogProxy.Object);
mockSparkContextProxy.Setup(m => m.CreateSparkSession()).Returns(mockSparkSessionProxy.Object);
mockSparkSessionProxy.Setup(m => m.SqlContextProxy).Returns(mockSqlContextProxy.Object);
var mockSparkConfProxy = new Mock<ISparkConfProxy>();
mockSparkConfProxy.Setup(m => m.GetSparkConfAsString())
.Returns("spark.master=master;spark.app.name=appname;config1=value1;config2=value2;");
var conf = new SparkConf(mockSparkConfProxy.Object);
var sqlContext = new SqlContext(new SparkContext(mockSparkContextProxy.Object, conf));
sqlContext.SparkSession.SparkSessionProxy = mockSparkSessionProxy.Object;
//act
var actualValue = sqlContext.GetConf(key, "");
// assert
@ -95,7 +111,22 @@ namespace AdapterTest
const string key = "key";
const string value = "value";
mockSqlContextProxy.Setup(m => m.SetConf(key, value));
var sqlContext = new SqlContext(new SparkContext("", ""), mockSqlContextProxy.Object);
var mockSparkContextProxy = new Mock<ISparkContextProxy>();
var mockSparkSessionProxy = new Mock<ISparkSessionProxy>();
var mockCatalogProxy = new Mock<ICatalogProxy>();
mockCatalogProxy.Setup(m => m.RefreshTable(It.IsAny<string>()));
mockSparkSessionProxy.Setup(m => m.GetCatalog()).Returns(mockCatalogProxy.Object);
mockSparkContextProxy.Setup(m => m.CreateSparkSession()).Returns(mockSparkSessionProxy.Object);
mockSparkSessionProxy.Setup(m => m.SqlContextProxy).Returns(mockSqlContextProxy.Object);
var mockSparkConfProxy = new Mock<ISparkConfProxy>();
mockSparkConfProxy.Setup(m => m.GetSparkConfAsString())
.Returns("spark.master=master;spark.app.name=appname;config1=value1;config2=value2;");
var conf = new SparkConf(mockSparkConfProxy.Object);
var sqlContext = new SqlContext(new SparkContext(mockSparkContextProxy.Object, conf));
sqlContext.SparkSession.SparkSessionProxy = mockSparkSessionProxy.Object;
// act
sqlContext.SetConf(key, value);
@ -175,16 +206,11 @@ namespace AdapterTest
[Test]
public void TestSqlContextTable()
{
// arrange
var sqlContext = new SqlContext(new SparkContext("", ""), mockSqlContextProxy.Object);
var dataFrameProxy = new DataFrameIpcProxy(new JvmObjectReference("1"), mockSqlContextProxy.Object);
mockSqlContextProxy.Setup(m => m.Table(It.IsAny<string>())).Returns(dataFrameProxy);
// act
var actualTableDataFrame = sqlContext.Table("table");
// assert
Assert.AreEqual(dataFrameProxy, actualTableDataFrame.DataFrameProxy);
var sqlContext = new SqlContext(new SparkContext("", ""));
string tableName = "TestTableName";
var dataFrame = sqlContext.Table(tableName);
var paramValuesToTableMethod = (dataFrame.DataFrameProxy as MockDataFrameProxy).mockDataFrameReference;
Assert.AreEqual(tableName, paramValuesToTableMethod[0]);
}
[Test]
@ -292,8 +318,8 @@ namespace AdapterTest
{
var sqlContext = new SqlContext(new SparkContext("", ""));
var dataFrame = sqlContext.Sql("Query of SQL text");
var paramValuesToJsonFileMethod = (dataFrame.DataFrameProxy as MockDataFrameProxy).mockDataFrameReference;
Assert.AreEqual("Query of SQL text", paramValuesToJsonFileMethod[0]);
var paramValuesToSqlMethod = (dataFrame.DataFrameProxy as MockDataFrameProxy).mockDataFrameReference;
Assert.AreEqual("Query of SQL text", paramValuesToSqlMethod[0]);
}
[Test]

Просмотреть файл

@ -1,6 +1,11 @@
@setlocal
@ECHO off
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
SET CMDHOME=%~dp0
@REM Remove trailing backslash \
set CMDHOME=%CMDHOME:~0,-1%

Просмотреть файл

@ -1,3 +1,9 @@
@ECHO OFF
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
FOR /D /R . %%G IN (bin) DO @IF EXIST "%%G" (@echo RDMR /S /Q "%%G" & rd /s /q "%%G")
FOR /D /R . %%G IN (obj) DO @IF EXIST "%%G" (@echo RDMR /S /Q "%%G" & rd /s /q "%%G")

Просмотреть файл

@ -12,7 +12,7 @@ namespace Microsoft.Spark.CSharp.PerfBenchmark
/// <summary>
/// Perf benchmark that users Freebase deletions data
/// This data is licensed under CC-BY license (http://creativecommons.org/licenses/by/2.5)
/// Data is available for download at https://developers.google.com/freebase/data)
/// Data is available for downloading : "Freebase Deleted Triples" at https://developers.google.com/freebase
/// Data format - CSV, size - 8 GB uncompressed
/// Columns in the dataset are
/// 1. creation_timestamp (Unix epoch time in milliseconds)
@ -55,7 +55,7 @@ namespace Microsoft.Spark.CSharp.PerfBenchmark
var lines = PerfBenchmark.SparkContext.TextFile(filePath);
var parsedRows = lines.Map(s =>
{
var columns = s.Split(new[] {','});
var columns = s.Split(new[] { ',' });
//data has some bad records - use bool flag to indicate corrupt rows
if (columns.Length > 4)
@ -75,7 +75,7 @@ namespace Microsoft.Spark.CSharp.PerfBenchmark
else
return kvp2;
});
stopwatch.Stop();
PerfBenchmark.ExecutionTimeList.Add(stopwatch.Elapsed);
@ -101,22 +101,22 @@ namespace Microsoft.Spark.CSharp.PerfBenchmark
stopwatch.Restart();
var rows = PerfBenchmark.SqlContext.TextFile(args[2]);
var filtered = rows.Filter("C1 = C3");
var aggregated = filtered.GroupBy("C1").Agg(new Dictionary<string, string> { { "C1", "count" } });
var filtered = rows.Filter("_c1 = _c3");
var aggregated = filtered.GroupBy("_c1").Agg(new Dictionary<string, string> { { "_c1", "count" } });
aggregated.RegisterTempTable("freebasedeletions");
var max = PerfBenchmark.SqlContext.Sql("select max(`count(C1)`) from freebasedeletions");
var max = PerfBenchmark.SqlContext.Sql("select max(`count(_c1)`) from freebasedeletions");
var maxArray = max.Collect();
var maxValue = maxArray.First();
var maxDeletions = PerfBenchmark.SqlContext.Sql("select * from freebasedeletions where `count(C1)` = " + maxValue.Get(0));
var maxDeletions = PerfBenchmark.SqlContext.Sql("select * from freebasedeletions where `count(_c1)` = " + maxValue.Get(0));
maxDeletions.Show();
//TODO - add perf suite for subquery
stopwatch.Stop();
PerfBenchmark.ExecutionTimeList.Add(stopwatch.Elapsed);
Console.WriteLine("User with max deletions & count of deletions is listed above. Time elapsed {0}", stopwatch.Elapsed);
}
}
}

Просмотреть файл

@ -11,6 +11,7 @@
<AssemblyName>SparkCLRPerf</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<CppDll Condition="Exists('..\..\..\cpp\x64')">HasCpp</CppDll>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
@ -48,6 +49,16 @@
</None>
<None Include="data\deletionbenchmarktestdata.csv" />
</ItemGroup>
<ItemGroup Condition=" '$(CppDll)' == 'HasCpp' ">
<None Include="$(SolutionDir)..\cpp\x64\$(ConfigurationName)\Riosock.dll">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<Link>Cpp\Riosock.dll</Link>
</None>
<None Include="$(SolutionDir)..\cpp\x64\$(ConfigurationName)\Riosock.pdb">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<Link>Cpp\Riosock.pdb</Link>
</None>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Adapter\Microsoft.Spark.CSharp\Adapter.csproj">
<Project>{ce999a96-f42b-4e80-b208-709d7f49a77c}</Project>
@ -60,6 +71,10 @@
</ItemGroup>
<ItemGroup />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<Target Name="CopyCSharpWorker"
DependsOnTargets="CoreBuild">
<Copy SkipUnchangedFiles="true" SourceFiles="$(ProjectDir)..\..\..\csharp\Worker\Microsoft.Spark.CSharp\bin\$(ConfigurationName)\CSharpWorker.*" DestinationFiles="$(TargetDir)" />
</Target>
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">

Просмотреть файл

@ -6,6 +6,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Sql;
@ -24,6 +25,17 @@ namespace Microsoft.Spark.CSharp.PerfBenchmark
public static void Main(string[] args)
{
if (args.Length != 3)
{
var exe = System.Reflection.Assembly.GetEntryAssembly().Location;
Console.WriteLine(@"Usage : {0} spark-local-dir run-count data-path", exe);
Console.WriteLine(@"Example : {0} D:\Temp\perfTest 10 hdfs:///perfdata/freebasedeletions/*", exe);
Console.WriteLine(@"Example : {0} D:\Temp\perfTest 1 hdfs:///perf/data/deletions/deletions.csv-00000-of-00020", exe);
Console.WriteLine(@"Example : {0} D:\Temp\perfTest 1 file:///d:/mobius/deletions/*", exe);
Console.WriteLine(@"Example : {0} D:\Temp\perfTest 1 d:\mobius\deletions", exe);
return;
}
Console.WriteLine("Arguments are {0}", string.Join(",", args));
InitializeSparkContext(args);
@ -51,7 +63,7 @@ namespace Microsoft.Spark.CSharp.PerfBenchmark
{
var perfSuites = Assembly.GetEntryAssembly().GetTypes()
.SelectMany(type => type.GetMethods(BindingFlags.NonPublic | BindingFlags.Static))
.Where(method => method.GetCustomAttributes(typeof (PerfSuiteAttribute), false).Length > 0)
.Where(method => method.GetCustomAttributes(typeof(PerfSuiteAttribute), false).Length > 0)
.OrderByDescending(method => method.Name);
foreach (var perfSuite in perfSuites)
@ -75,42 +87,46 @@ namespace Microsoft.Spark.CSharp.PerfBenchmark
internal static void ReportResult()
{
Console.WriteLine("** Printing results of the perf run (C#) **");
var allMedianCosts = new SortedDictionary<string, long>();
foreach (var perfResultItem in PerfResults)
{
var perfResult = perfResultItem.Value;
var runTimeInSeconds = perfResult.Select(x => (long) x.TotalSeconds);
var runTimeInSeconds = perfResult.Select(x => (long)x.TotalSeconds);
//multiple enumeration happening - ignoring that for now
var max = runTimeInSeconds.Max();
var min = runTimeInSeconds.Min();
var avg = (long) runTimeInSeconds.Average();
var avg = (long)runTimeInSeconds.Average();
var median = GetMedianValue(runTimeInSeconds);
Console.WriteLine(
"** Execution time for {0} in seconds. Min={1}, Max={2}, Average={3}, Median={4}, Number of runs={5}, Individual execution duration=[{6}] **",
perfResultItem.Key, min, max, avg, median, runTimeInSeconds.Count(), string.Join(", ", runTimeInSeconds));
allMedianCosts[perfResultItem.Key] = median;
}
Console.WriteLine("** *** **");
Console.WriteLine("{0} {1} C# version: Run count = {2}, all median time costs[{3}] : {4}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"),
Regex.Replace(TimeZone.CurrentTimeZone.StandardName, @"(\w)\S*\s*", "$1"),
PerfResults.First().Value.Count, allMedianCosts.Count, string.Join("; ", allMedianCosts.Select(kv => kv.Key + "=" + kv.Value)));
}
private static long GetMedianValue(IEnumerable<long> runTimeInSeconds)
{
var values = runTimeInSeconds.ToArray();
Array.Sort(values);
var itemCount = values.Length;
if (itemCount == 1)
{
return values[0];
}
if (itemCount%2 == 0)
if (itemCount % 2 == 0)
{
return (values[itemCount/2] + values[itemCount/2 - 1])/2;
return (values[itemCount / 2] + values[itemCount / 2 - 1]) / 2;
}
return values[(itemCount-1)/2];
return values[(itemCount - 1) / 2];
}
}

Просмотреть файл

@ -34,5 +34,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.6.1.0")]
[assembly: AssemblyFileVersion("1.6.1.0")]
[assembly: AssemblyVersion("2.0")]
[assembly: AssemblyFileVersion("2.0")]

Просмотреть файл

@ -35,5 +35,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.6.1.0")]
[assembly: AssemblyFileVersion("1.6.1.0")]
[assembly: AssemblyVersion("2.0")]
[assembly: AssemblyFileVersion("2.0")]

Просмотреть файл

@ -35,5 +35,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.6.1.0")]
[assembly: AssemblyFileVersion("1.6.1.0")]
[assembly: AssemblyVersion("2.0")]
[assembly: AssemblyFileVersion("2.0")]

Просмотреть файл

@ -58,8 +58,9 @@
<!--********************************************************************************************************-->
<!--
<add key="CSharpWorkerPath" value="C:\path\to\mobius\driver\application\CSharpWorker.exe"/>
<add key="CSharpWorkerPath" value="C:\path\to\mobius\driver\application\CSharpWorker.exe"/>
-->
<!-- *** Settings for Mobius in Linux *** -->

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
namespace Microsoft.Spark.CSharp.Samples
{
class CatalogSamples
{
[Sample]
internal static void CatalogSample()
{
var catalog = SparkSessionSamples.GetSparkSession().Catalog;
var currentDatabase = catalog.CurrentDatabase;
var databasesList = SparkSessionSamples.GetSparkSession().Catalog.ListDatabases().Collect();
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
var defaultDatabase = databasesList.First(row => row.Get("name").Equals("default")); //throws exception if First() is missing
}
}
}
}

Просмотреть файл

@ -16,11 +16,11 @@ namespace Microsoft.Spark.CSharp.Samples
{
class DataFrameSamples
{
private const string PeopleJson = @"people.json";
private const string OrderJson = @"order.json";
private const string RequestsLog = @"requestslog.txt";
private const string MetricsLog = @"metricslog.txt";
private const string CSVTestLog = @"csvtestlog.txt";
internal const string PeopleJson = @"people.json";
internal const string OrderJson = @"order.json";
internal const string RequestsLog = @"requestslog.txt";
internal const string MetricsLog = @"metricslog.txt";
internal const string CSVTestLog = @"csvtestlog.txt";
private static SqlContext sqlContext;

Просмотреть файл

@ -78,6 +78,7 @@ namespace Microsoft.Spark.CSharp.Samples
{
conf.Set("spark.local.dir", Configuration.SparkLocalDirectoryOverride);
}
return new SparkContext(conf);
}

Просмотреть файл

@ -30,5 +30,5 @@ using System.Runtime.InteropServices;
// Build Number
// Revision
//
[assembly: AssemblyVersion("1.6.1.0")]
[assembly: AssemblyFileVersion("1.6.1.0")]
[assembly: AssemblyVersion("2.0")]
[assembly: AssemblyFileVersion("2.0")]

Просмотреть файл

@ -47,6 +47,7 @@
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
<ItemGroup>
<Compile Include="CatalogSamples.cs" />
<Compile Include="CommandlineArgumentProcessor.cs" />
<Compile Include="Configuration.cs" />
<Compile Include="DataFrameSamples.cs" />
@ -61,6 +62,7 @@
<Compile Include="SamplesRunner.cs" />
<Compile Include="SparkContextSamples.cs" />
<Compile Include="RDDSamples.cs" />
<Compile Include="SparkSessionSamples.cs" />
<Compile Include="SqlContextSamples.cs" />
</ItemGroup>
<ItemGroup>

Просмотреть файл

@ -0,0 +1,189 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Sql;
using NUnit.Framework;
namespace Microsoft.Spark.CSharp.Samples
{
class SparkSessionSamples
{
private static SparkSession sparkSession;
internal static SparkSession GetSparkSession()
{
return sparkSession ?? (sparkSession = SparkSession.Builder().EnableHiveSupport().GetOrCreate());
}
[Sample]
internal static void SSNewSessionSample()
{
RunDataFrameSample(true);
}
[Sample]
internal static void SSDataFrameSample()
{
RunDataFrameSample(false);
}
private static void RunDataFrameSample(bool createNewSession)
{
SparkSession ss = GetSparkSession();
if (createNewSession)
{
ss = sparkSession.NewSession();
}
var peopleDataFrame = ss.Read().Json(SparkCLRSamples.Configuration.GetInputDataPath(DataFrameSamples.PeopleJson));
var count = peopleDataFrame.Count();
Console.WriteLine("Count of items in DataFrame {0}", count);
var sortedDataFrame = peopleDataFrame.Sort(new string[] { "name", "age" }, new bool[] { true, false });
sortedDataFrame.Show();
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
var sortedDF = sortedDataFrame.Collect().ToArray();
Assert.AreEqual("789", sortedDF[0].GetAs<string>("id"));
Assert.AreEqual("123", sortedDF[1].GetAs<string>("id"));
Assert.AreEqual("531", sortedDF[2].GetAs<string>("id"));
Assert.AreEqual("456", sortedDF[3].GetAs<string>("id"));
}
}
[Sample]
internal static void SSShowSchemaSample()
{
var peopleDataFrame = GetSparkSession().Read().Json(SparkCLRSamples.Configuration.GetInputDataPath(DataFrameSamples.PeopleJson));
peopleDataFrame.Explain(true);
peopleDataFrame.ShowSchema();
}
[Sample]
internal static void SSTableSample()
{
var originalPeopleDataFrame = GetSparkSession().Read().Json(SparkCLRSamples.Configuration.GetInputDataPath(DataFrameSamples.PeopleJson));
originalPeopleDataFrame.RegisterTempTable("people");
var peopleDataFrame = GetSparkSession().Table("people");
var projectedFilteredDataFrame = peopleDataFrame.Select("name", "address.state")
.Where("name = 'Bill' or state = 'California'");
projectedFilteredDataFrame.ShowSchema();
projectedFilteredDataFrame.Show();
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
CollectionAssert.AreEqual(new[] { "name", "state" }, projectedFilteredDataFrame.Schema.Fields.Select(f => f.Name).ToArray());
Assert.IsTrue(projectedFilteredDataFrame.Collect().All(row => row.Get("name") == "Bill" || row.Get("state") == "California"));
}
}
[Sample]
internal static void SSSqlSample()
{
var originalPeopleDataFrame = GetSparkSession().Read().Json(SparkCLRSamples.Configuration.GetInputDataPath(DataFrameSamples.PeopleJson));
originalPeopleDataFrame.RegisterTempTable("people");
var nameFilteredDataFrame = GetSparkSession().Sql("SELECT name, address.city, address.state FROM people where name='Bill'");
var countDataFrame = GetSparkSession().Sql("SELECT count(name) FROM people where name='Bill'");
var maxAgeDataFrame = GetSparkSession().Sql("SELECT max(age) FROM people where name='Bill'");
long maxAgeDataFrameRowsCount = maxAgeDataFrame.Count();
long nameFilteredDataFrameRowsCount = nameFilteredDataFrame.Count();
long countDataFrameRowsCount = countDataFrame.Count();
Console.WriteLine("nameFilteredDataFrameRowsCount={0}, maxAgeDataFrameRowsCount={1}, countDataFrameRowsCount={2}", nameFilteredDataFrameRowsCount, maxAgeDataFrameRowsCount, countDataFrameRowsCount);
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(1, maxAgeDataFrameRowsCount);
Assert.AreEqual(2, nameFilteredDataFrameRowsCount);
Assert.AreEqual(1, countDataFrameRowsCount);
}
}
[Sample]
internal static void SSDropTableSample()
{
var originalPeopleDataFrame = GetSparkSession().Read().Json(SparkCLRSamples.Configuration.GetInputDataPath(DataFrameSamples.PeopleJson));
originalPeopleDataFrame.RegisterTempTable("people");
var nameFilteredDataFrame = GetSparkSession().Sql("SELECT name, address.city, address.state FROM people where name='Bill'");
long nameFilteredDataFrameRowsCount = nameFilteredDataFrame.Count();
GetSparkSession().Catalog.DropTempView("people");
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
bool tableMissing = false;
try
{
//parsing would fail
var nameFilteredDataFrame2 = GetSparkSession().Sql("SELECT name, address.city, address.state FROM people where name='Bill'");
}
catch (Exception)
{
tableMissing = true;
}
Assert.True(tableMissing);
}
}
[Sample]
internal static void SSCreateDataFrameSample()
{
var schemaPeople = new StructType(new List<StructField>
{
new StructField("id", new StringType()),
new StructField("name", new StringType()),
new StructField("age", new IntegerType()),
new StructField("address", new StructType(new List<StructField>
{
new StructField("city", new StringType()),
new StructField("state", new StringType())
})),
new StructField("phone numbers", new ArrayType(new StringType()))
});
var rddPeople = SparkCLRSamples.SparkContext.Parallelize(
new List<object[]>
{
new object[] { "123", "Bill", 43, new object[]{ "Columbus", "Ohio" }, new string[]{ "Tel1", "Tel2" } },
new object[] { "456", "Steve", 34, new object[]{ "Seattle", "Washington" }, new string[]{ "Tel3", "Tel4" } }
});
var dataFramePeople = GetSparkSession().CreateDataFrame(rddPeople, schemaPeople);
Console.WriteLine("------ Schema of People Data Frame:\r\n");
dataFramePeople.ShowSchema();
Console.WriteLine();
var collected = dataFramePeople.Collect().ToArray();
foreach (var people in collected)
{
string id = people.Get("id");
string name = people.Get("name");
int age = people.Get("age");
Row address = people.Get("address");
string city = address.Get("city");
string state = address.Get("state");
object[] phoneNumbers = people.Get("phone numbers");
Console.WriteLine("id:{0}, name:{1}, age:{2}, address:(city:{3},state:{4}), phoneNumbers:[{5},{6}]\r\n", id, name, age, city, state, phoneNumbers[0], phoneNumbers[1]);
}
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.AreEqual(2, dataFramePeople.Rdd.Count());
Assert.AreEqual(schemaPeople.Json, dataFramePeople.Schema.Json);
}
}
}
}

Просмотреть файл

@ -2,7 +2,7 @@
<package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd">
<metadata>
<id>Microsoft.SparkCLR</id>
<version>2.0.000-PREVIEW-1</version>
<version>2.0.000-PREVIEW-2</version>
<authors>Microsoft Corporation</authors>
<owners>Microsoft Corporation</owners>
<licenseUrl>https://github.com/Microsoft/Mobius/blob/master/LICENSE</licenseUrl>

Просмотреть файл

@ -1,6 +1,11 @@
@setlocal
@ECHO off
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
SET CMDHOME=%~dp0
@REM Remove trailing backslash \
set CMDHOME=%CMDHOME:~0,-1%

Просмотреть файл

@ -35,5 +35,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
[assembly: AssemblyVersion("2.0")]
[assembly: AssemblyFileVersion("2.0")]

Просмотреть файл

@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.6.0.0")]
[assembly: AssemblyFileVersion("1.6.0.0")]
[assembly: AssemblyVersion("2.0")]
[assembly: AssemblyFileVersion("2.0")]

Просмотреть файл

@ -30,5 +30,5 @@ using System.Runtime.InteropServices;
// Build Number
// Revision
//
[assembly: AssemblyVersion("1.6.1.0")]
[assembly: AssemblyFileVersion("1.6.1.0")]
[assembly: AssemblyVersion("2.0")]
[assembly: AssemblyFileVersion("2.0")]

Просмотреть файл

@ -12,6 +12,7 @@ using System.Linq;
using System.Reflection;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using System.Text;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Interop.Ipc;
using Microsoft.Spark.CSharp.Network;
@ -31,10 +32,8 @@ namespace Microsoft.Spark.CSharp
public class Worker
{
private static readonly DateTime UnixTimeEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private static ILoggerService logger = null;
private static SparkCLRAssemblyHandler assemblyHandler = null;
private static ILoggerService logger;
private static SparkCLRAssemblyHandler assemblyHandler;
public static void Main(string[] args)
{
@ -49,7 +48,7 @@ namespace Microsoft.Spark.CSharp
if (args.Length != 2)
{
Console.Error.WriteLine("Wrong number of args: {0}, will exit", args.Count());
Console.Error.WriteLine("Wrong number of args: {0}, will exit", args.Length);
Environment.Exit(-1);
}
@ -126,7 +125,7 @@ namespace Microsoft.Spark.CSharp
public static bool ProcessStream(Stream inputStream, Stream outputStream, int splitIndex)
{
logger.LogInfo(string.Format("Start of stream processing, splitIndex: {0}", splitIndex));
logger.LogInfo("Start of stream processing, splitIndex: {0}", splitIndex);
bool readComplete = true; // Whether all input data from the socket is read though completely
try
@ -170,7 +169,7 @@ namespace Microsoft.Spark.CSharp
else
{
// This may happen when the input data is not read completely, e.g., when take() operation is performed
logger.LogWarn(string.Format("**** unexpected read: {0}, not all data is read", end));
logger.LogWarn("**** unexpected read: {0}, not all data is read", end);
// write a different value to tell JVM to not reuse this worker
SerDe.Write(outputStream, (int)SpecialLengths.END_OF_DATA_SECTION);
readComplete = false;
@ -179,8 +178,8 @@ namespace Microsoft.Spark.CSharp
outputStream.Flush();
// log bytes read and write
logger.LogDebug(string.Format("total read bytes: {0}", SerDe.totalReadNum));
logger.LogDebug(string.Format("total write bytes: {0}", SerDe.totalWriteNum));
logger.LogDebug("total read bytes: {0}", SerDe.totalReadNum);
logger.LogDebug("total write bytes: {0}", SerDe.totalWriteNum);
logger.LogDebug("Stream processing completed successfully");
}
@ -202,10 +201,10 @@ namespace Microsoft.Spark.CSharp
logger.LogError("Writing exception to stream failed with exception:");
logger.LogException(ex);
}
throw e;
throw;
}
logger.LogInfo(string.Format("Stop of stream processing, splitIndex: {0}, readComplete: {1}", splitIndex, readComplete));
logger.LogInfo("Stop of stream processing, splitIndex: {0}, readComplete: {1}", splitIndex, readComplete);
return readComplete;
}
@ -310,7 +309,6 @@ namespace Microsoft.Spark.CSharp
int stageId = -1;
string deserializerMode = null;
string serializerMode = null;
CSharpWorkerFunc workerFunc = null;
for (int funcIndex = 0; funcIndex < chainedFuncCount; funcIndex++)
{
int lengthOfCommandByteArray = SerDe.ReadInt(inputStream);
@ -319,17 +317,11 @@ namespace Microsoft.Spark.CSharp
if (lengthOfCommandByteArray > 0)
{
CSharpWorkerFunc workerFunc;
ReadCommand(inputStream, formatter, out stageId, out deserializerMode, out serializerMode,
out workerFunc);
if (func == null)
{
func = workerFunc;
}
else
{
func = CSharpWorkerFunc.Chain(func, workerFunc);
}
func = func == null ? workerFunc : CSharpWorkerFunc.Chain(func, workerFunc);
}
else
{
@ -387,11 +379,14 @@ namespace Microsoft.Spark.CSharp
workerFunc = (CSharpWorkerFunc)formatter.Deserialize(stream);
logger.LogDebug(
if (!logger.IsDebugEnabled) return;
var sb = new StringBuilder(Environment.NewLine);
sb.AppendLine(
"------------------------ Printing stack trace of workerFunc for ** debugging ** ------------------------------");
logger.LogDebug(workerFunc.StackTrace);
logger.LogDebug(
sb.AppendLine(workerFunc.StackTrace);
sb.AppendLine(
"--------------------------------------------------------------------------------------------------------------");
logger.LogDebug(sb.ToString());
}
private static void ExecuteCommand(Stream inputStream, Stream outputStream, int splitIndex, DateTime bootTime,
@ -442,9 +437,8 @@ namespace Microsoft.Spark.CSharp
commandProcessWatch.Stop();
// log statistics
logger.LogInfo(string.Format("func process time: {0}", funcProcessWatch.ElapsedMilliseconds));
logger.LogInfo(string.Format("stage {0}, command process time: {1}", stageId,
commandProcessWatch.ElapsedMilliseconds));
logger.LogInfo("func process time: {0}", funcProcessWatch.ElapsedMilliseconds);
logger.LogInfo("stage {0}, command process time: {1}", stageId, commandProcessWatch.ElapsedMilliseconds);
}
private static void WriteOutput(Stream networkStream, string serializerMode, dynamic message, IFormatter formatter)
@ -509,7 +503,7 @@ namespace Microsoft.Spark.CSharp
int rddId = SerDe.ReadInt(networkStream);
int stageId = SerDe.ReadInt(networkStream);
int partitionId = SerDe.ReadInt(networkStream);
logger.LogInfo(string.Format("rddInfo: rddId {0}, stageId {1}, partitionId {2}", rddId, stageId, partitionId));
logger.LogInfo("rddInfo: rddId {0}, stageId {1}, partitionId {2}", rddId, stageId, partitionId);
return stageId;
}
@ -517,8 +511,8 @@ namespace Microsoft.Spark.CSharp
{
DateTime finishTime = DateTime.UtcNow;
const string format = "MM/dd/yyyy hh:mm:ss.fff tt";
logger.LogDebug(string.Format("bootTime: {0}, initTime: {1}, finish_time: {2}",
bootTime.ToString(format), initTime.ToString(format), finishTime.ToString(format)));
logger.LogDebug("bootTime: {0}, initTime: {1}, finish_time: {2}",
bootTime.ToString(format), initTime.ToString(format), finishTime.ToString(format));
SerDe.Write(networkStream, (int)SpecialLengths.TIMING_DATA);
SerDe.Write(networkStream, ToUnixTime(bootTime));
SerDe.Write(networkStream, ToUnixTime(initTime));
@ -538,7 +532,7 @@ namespace Microsoft.Spark.CSharp
item.Value.GetType()
.GetField("value", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(item.Value);
logger.LogDebug(string.Format("({0}, {1})", item.Key, value));
logger.LogDebug("({0}, {1})", item.Key, value);
formatter.Serialize(ms, new KeyValuePair<int, dynamic>(item.Key, value));
byte[] buffer = ms.ToArray();
SerDe.Write(networkStream, buffer.Length);
@ -548,13 +542,28 @@ namespace Microsoft.Spark.CSharp
public static void PrintFiles()
{
logger.LogDebug("Files available in executor");
var driverFolder = Path.GetDirectoryName(Assembly.GetEntryAssembly().Location);
var files = Directory.EnumerateFiles(driverFolder);
if (!logger.IsDebugEnabled) return;
var folder = Path.GetDirectoryName(Assembly.GetEntryAssembly().Location);
var files = Directory.EnumerateFiles(folder).Select(Path.GetFileName).ToArray();
var longest = files.Max(f => f.Length);
var count = 0;
var outfiles = new StringBuilder(Environment.NewLine);
foreach (var file in files)
{
logger.LogDebug(file);
switch (count++ % 2)
{
case 0:
outfiles.Append(" " + file.PadRight(longest + 2));
break;
default:
outfiles.AppendLine(file);
break;
}
}
logger.LogDebug("Files available in executor");
logger.LogDebug("Location: {0}{1}{2}", folder, Environment.NewLine, outfiles.ToString());
}
private static long ToUnixTime(DateTime dt)
@ -622,7 +631,7 @@ namespace Microsoft.Spark.CSharp
case SerializedMode.Pair:
{
byte[] pairKey = buffer;
byte[] pairValue = null;
byte[] pairValue;
watch.Start();
int valueLength = SerDe.ReadInt(inputStream);
@ -650,7 +659,6 @@ namespace Microsoft.Spark.CSharp
break;
}
case SerializedMode.Byte:
default:
{
if (buffer != null)
@ -669,7 +677,7 @@ namespace Microsoft.Spark.CSharp
watch.Start();
}
logger.LogInfo(string.Format("total receive time: {0}", watch.ElapsedMilliseconds));
logger.LogInfo("total receive time: {0}", watch.ElapsedMilliseconds);
}
internal class SparkCLRAssemblyHandler
@ -687,7 +695,7 @@ namespace Microsoft.Spark.CSharp
}
else
{
Console.Error.WriteLine("Already loaded assebmly " + assembly.FullName);
Console.Error.WriteLine("Already loaded assembly " + assembly.FullName);
}
}
}

Просмотреть файл

@ -32,5 +32,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.6.1.0")]
[assembly: AssemblyFileVersion("1.6.1.0")]
[assembly: AssemblyVersion("2.0")]
[assembly: AssemblyFileVersion("2.0")]

Просмотреть файл

@ -1,5 +1,10 @@
#!/bin/bash
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
export FWDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
export CppDll=NoCpp
export XBUILDOPT=/verbosity:minimal

Просмотреть файл

@ -1,5 +1,10 @@
#!/bin/bash
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
for g in `find . -type d -name bin`
do
rm -r -f "$g"

Просмотреть файл

@ -1,5 +1,10 @@
#!/bin/bash
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
export FWDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
if [ "$NUNITCONSOLE" = "" ];

Просмотреть файл

@ -1,3 +1,8 @@
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
#
# This script takes in "version" and "targetDir" (optional) parameters, update Spark-Clr jar
# version reference in all scripts under "targetDir".
@ -40,7 +45,7 @@ function Update-SparkClrSubmit($targetDir, $version)
#
Get-ChildItem $targetDir -filter "sparkclr-submit.cmd" -recurs | % {
Write-Output "[SetSparkClrJarVersion.Update-SparkClrSubmit] updating $($_.FullName)"
((Get-Content $_.FullName) -replace "\(set SPARKCLR_JAR=.*\)", "(set SPARKCLR_JAR=spark-clr_2.10-$version.jar)") | Set-Content $_.FullName -force
((Get-Content $_.FullName) -replace "\(set SPARKCLR_JAR=.*\)", "(set SPARKCLR_JAR=spark-clr_2.11-$version.jar)") | Set-Content $_.FullName -force
}
Write-Output "[SetSparkClrJarVersion.Update-SparkClrSubmit] Done setting sparkclr-submit.cmd under $targetDir to version=$version"
@ -54,7 +59,7 @@ function Update-SparkClrSubmit($targetDir, $version)
#
Get-ChildItem $targetDir -filter "sparkclr-submit.sh" -recurs | % {
Write-Output "[SetSparkClrJarVersion.Update-SparkClrSubmit] updating $($_.FullName)"
((Get-Content $_.FullName) -replace "export SPARKCLR_JAR=.*", "export SPARKCLR_JAR=spark-clr_2.10-$version.jar") | Set-Content $_.FullName -force
((Get-Content $_.FullName) -replace "export SPARKCLR_JAR=.*", "export SPARKCLR_JAR=spark-clr_2.11-$version.jar") | Set-Content $_.FullName -force
}
Write-Output "[SetSparkClrJarVersion.Update-SparkClrSubmit] Done setting sparkclr-submit.sh under $targetDir to version=$version"

Просмотреть файл

@ -1,3 +1,8 @@
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
#
# This script takes in and "nuspecDir" and "version" parameters, update Mobius Nuget package
# version

Просмотреть файл

@ -1,3 +1,8 @@
#
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.
#
#
# This script takes in "version" and "targetDir" (optional) parameters, update Mobius Nuget package
# version reference in all *.csproj and packages.config under "dir".
@ -17,15 +22,15 @@ function Update-Csproj($targetDir, $version)
Write-Output "[SetSparkClrPackageVersion.Update-Csproj] Start setting *.csproj under $targetDir to version=$version"
#
# Update Mobius package version to this release. Example in *.csproj:
# Update Mobius package version to this release. Example in *.csproj and *.fsproj:
# <HintPath>..\packages\Microsoft.SparkCLR.1.5.2-SNAPSHOT\lib\net45\CSharpWorker.exe</HintPath>
#
Get-ChildItem $targetDir -filter "*.csproj" -recurs | % {
Write-Output "[SetSparkClrPackageVersion.Update-Csproj] updating $($_.FullName)"
Get-ChildItem $targetDir -filter "*.*sproj" -recurs | % {
Write-Output "[SetSparkClrPackageVersion.Update-*sproj] updating $($_.FullName)"
((Get-Content $_.FullName) -replace "\\Microsoft\.SparkCLR.*\\lib", "\Microsoft.SparkCLR.$version\lib") | Set-Content -Encoding UTF8 -Path $_.FullName -force
}
Write-Output "[SetSparkClrPackageVersion.Update-Csproj] Done setting *.csproj under $targetDir to version=$version"
Write-Output "[SetSparkClrPackageVersion.Update-Csproj] Done setting *.csproj and *.fsproj under $targetDir to version=$version"
}
function Update-PackageConfig($targetDir, $version)

Просмотреть файл

@ -1,4 +1,10 @@
@echo OFF
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
setlocal enabledelayedexpansion
IF "%1"=="" (goto :usage)

Просмотреть файл

@ -34,7 +34,7 @@
<ItemGroup>
<Reference Include="CSharpWorker, Version=1.5.2.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\CSharpWorker.exe</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\CSharpWorker.exe</HintPath>
</Reference>
<Reference Include="log4net, Version=1.2.15.0, Culture=neutral, PublicKeyToken=669e0ddf0bb1aa2a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
@ -42,7 +42,7 @@
</Reference>
<Reference Include="Microsoft.Spark.CSharp.Adapter, Version=1.6.1.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=4.5.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
@ -67,7 +67,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\CSharpWorker.exe.config">
<None Include="..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\CSharpWorker.exe.config">
<Link>CSharpWorker.exe.config</Link>
</None>
<None Include="..\..\App.config">

Просмотреть файл

@ -4,5 +4,5 @@
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
<package id="Razorvine.Pyrolite" version="4.10.0.0" targetFramework="net45" />
<package id="Razorvine.Serpent" version="1.12.0.0" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="1.6.200" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="2.0.0-PREVIEW-1" targetFramework="net45" />
</packages>

Просмотреть файл

@ -37,7 +37,7 @@
<ItemGroup>
<Reference Include="CSharpWorker, Version=1.5.2.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\CSharpWorker.exe</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\CSharpWorker.exe</HintPath>
</Reference>
<Reference Include="log4net, Version=1.2.15.0, Culture=neutral, PublicKeyToken=669e0ddf0bb1aa2a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
@ -45,7 +45,7 @@
</Reference>
<Reference Include="Microsoft.Spark.CSharp.Adapter, Version=1.6.1.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=4.5.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
@ -66,7 +66,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\CSharpWorker.exe.config">
<None Include="..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\CSharpWorker.exe.config">
<Link>CSharpWorker.exe.config</Link>
</None>
<None Include="..\..\App.config">

Просмотреть файл

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="log4net" version="2.0.5" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="1.6.200" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="2.0.0-PREVIEW-1" targetFramework="net45" />
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
<package id="Razorvine.Pyrolite" version="4.10.0.0" targetFramework="net45" />
<package id="Razorvine.Serpent" version="1.12.0.0" targetFramework="net45" />

Просмотреть файл

@ -1,6 +1,11 @@
@setlocal
@ECHO off
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
SET CMDHOME=%~dp0
@REM Remove trailing backslash \
set CMDHOME=%CMDHOME:~0,-1%

Просмотреть файл

@ -1,3 +1,9 @@
@ECHO OFF
rem
rem Copyright (c) Microsoft. All rights reserved.
rem Licensed under the MIT license. See LICENSE file in the project root for full license information.
rem
FOR /D /R . %%G IN (bin) DO @IF EXIST "%%G" (@echo RDMR /S /Q "%%G" & rd /s /q "%%G")
FOR /D /R . %%G IN (obj) DO @IF EXIST "%%G" (@echo RDMR /S /Q "%%G" & rd /s /q "%%G")

Просмотреть файл

@ -35,13 +35,13 @@
</PropertyGroup>
<ItemGroup>
<Reference Include="CSharpWorker">
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\CSharpWorker.exe</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\CSharpWorker.exe</HintPath>
</Reference>
<Reference Include="log4net">
<HintPath>..\..\packages\log4net.2.0.5\lib\net45-full\log4net.dll</HintPath>
</Reference>
<Reference Include="Microsoft.Spark.CSharp.Adapter">
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=4.5.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>

Просмотреть файл

@ -4,5 +4,5 @@
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
<package id="Razorvine.Pyrolite" version="4.10.0.0" targetFramework="net45" />
<package id="Razorvine.Serpent" version="1.12.0.0" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="1.6.200" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="2.0.0-PREVIEW-1" targetFramework="net45" />
</packages>

Просмотреть файл

@ -38,11 +38,11 @@
<HintPath>..\..\packages\log4net.2.0.5\lib\net45-full\log4net.dll</HintPath>
</Reference>
<Reference Include="CSharpWorker, Version=1.6.1.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\CSharpWorker.exe</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\CSharpWorker.exe</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="Microsoft.Spark.CSharp.Adapter, Version=1.6.1.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />

Просмотреть файл

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="log4net" version="2.0.5" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="1.6.200" targetFramework="net452" />
<package id="Microsoft.SparkCLR" version="2.0.0-PREVIEW-1" targetFramework="net452" />
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
<package id="Razorvine.Pyrolite" version="4.10.0.0" targetFramework="net45" />
<package id="Razorvine.Serpent" version="1.12.0.0" targetFramework="net45" />

Просмотреть файл

@ -36,7 +36,7 @@
<ItemGroup>
<Reference Include="CSharpWorker, Version=1.5.2.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\CSharpWorker.exe</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\CSharpWorker.exe</HintPath>
</Reference>
<Reference Include="log4net, Version=1.2.15.0, Culture=neutral, PublicKeyToken=669e0ddf0bb1aa2a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
@ -44,7 +44,7 @@
</Reference>
<Reference Include="Microsoft.Spark.CSharp.Adapter, Version=1.5.2.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=4.5.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
@ -65,7 +65,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\CSharpWorker.exe.config">
<None Include="..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\CSharpWorker.exe.config">
<Link>CSharpWorker.exe.config</Link>
</None>
<None Include="..\..\App.config">

Просмотреть файл

@ -4,5 +4,5 @@
<package id="Newtonsoft.Json" version="7.0.1" targetFramework="net45" />
<package id="Razorvine.Pyrolite" version="4.10.0.0" targetFramework="net45" />
<package id="Razorvine.Serpent" version="1.12.0.0" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="1.6.200" targetFramework="net45" />
<package id="Microsoft.SparkCLR" version="2.0.0-PREVIEW-1" targetFramework="net45" />
</packages>

Просмотреть файл

@ -36,7 +36,7 @@
<ItemGroup>
<Reference Include="CSharpWorker, Version=1.5.2.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\CSharpWorker.exe</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\CSharpWorker.exe</HintPath>
</Reference>
<Reference Include="log4net, Version=1.2.15.0, Culture=neutral, PublicKeyToken=669e0ddf0bb1aa2a, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
@ -44,7 +44,7 @@
</Reference>
<Reference Include="Microsoft.Spark.CSharp.Adapter, Version=1.5.2.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
<HintPath>..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\Microsoft.Spark.CSharp.Adapter.dll</HintPath>
</Reference>
<Reference Include="Newtonsoft.Json, Version=4.5.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
@ -65,7 +65,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\packages\Microsoft.SparkCLR.1.6.200\lib\net45\CSharpWorker.exe.config">
<None Include="..\..\packages\Microsoft.SparkCLR.2.0.0-PREVIEW-1\lib\net45\CSharpWorker.exe.config">
<Link>CSharpWorker.exe.config</Link>
</None>
<None Include="..\..\App.config">

Некоторые файлы не были показаны из-за слишком большого количества измененных файлов Показать больше