Merge pull request #294 from skaarthik/example
improvements in logging, examples and documentation
This commit is contained in:
Коммит
50435268aa
|
@ -81,8 +81,9 @@ Refer to the [docs folder](docs) for design overview and other info on SparkCLR
|
|||
|---|:------:|:----:|
|
||||
|Build & run unit tests |[windows-instructions.md](notes/windows-instructions.md#building-sparkclr) |[linux-instructions.md](notes/linux-instructions.md#building-sparkclr) |
|
||||
|Run samples (functional tests) in local mode |[windows-instructions.md](notes/windows-instructions.md#running-samples) |[linux-instructions.md](notes/linux-instructions.md#running-samples) |
|
||||
|Run standlone examples in Client mode |[Quick-start wiki](https://github.com/Microsoft/SparkCLR/wiki/Quick-Start#client-mode) |[Quick-start wiki](https://github.com/Microsoft/SparkCLR/wiki/Quick-Start#client-mode) |
|
||||
|Run standlone examples in Cluster mode |[Quick-start wiki](https://github.com/Microsoft/SparkCLR/wiki/Quick-Start#cluster-mode) |[Quick-start wiki](https://github.com/Microsoft/SparkCLR/wiki/Quick-Start#cluster-mode) |
|
||||
|Run examples in local mode |[running-sparkclr-app.md](notes/running-sparkclr-app.md#running-examples-in-local-mode) |[running-sparkclr-app.md](notes/running-sparkclr-app.md#linux-instructions) |
|
||||
|Run SparkCLR app in standalone cluster |[running-sparkclr-app.md](notes/running-sparkclr-app.md#standalone-cluster) |[running-sparkclr-app.md](notes/running-sparkclr-app.md#linux-instructions) |
|
||||
|Run SparkCLR app in YARN cluster |[running-sparkclr-app.md](notes/running-sparkclr-app.md#yarn-cluster) |[running-sparkclr-app.md](notes/running-sparkclr-app.md#linux-instructions) |
|
||||
|
||||
Note: Refer to [linux-compatibility.md](notes/linux-compatibility.md) for using SparkCLR with Spark on Linux
|
||||
|
||||
|
|
|
@ -129,6 +129,12 @@ copy /y Samples\Microsoft.Spark.CSharp\bin\Release\* "%SPARKCLR_HOME%\samples\"
|
|||
copy /y Samples\Microsoft.Spark.CSharp\data\* "%SPARKCLR_HOME%\data\"
|
||||
popd
|
||||
|
||||
@echo Assemble SparkCLR examples
|
||||
pushd "%CMDHOME%\..\examples"
|
||||
call Clean.cmd
|
||||
call Build.cmd
|
||||
popd
|
||||
|
||||
@echo Assemble SparkCLR script components
|
||||
xcopy /e /y "%CMDHOME%\..\scripts" "%SPARKCLR_HOME%\scripts\"
|
||||
|
||||
|
|
|
@ -78,6 +78,14 @@ echo "SparkCLR Samples data"
|
|||
cp Samples/Microsoft.Spark.CSharp/data/* "$SPARKCLR_HOME/data/"
|
||||
popd
|
||||
|
||||
echo "Assemble SparkCLR examples"
|
||||
pushd "$FWDIR/../examples"
|
||||
# clean any possible previous build first
|
||||
./clean.sh
|
||||
|
||||
./build.sh
|
||||
popd
|
||||
|
||||
echo "Assemble SparkCLR script components"
|
||||
pushd "$FWDIR/../scripts"
|
||||
cp *.sh "$SPARKCLR_HOME/scripts/"
|
||||
|
|
|
@ -5,6 +5,7 @@ using System;
|
|||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using Microsoft.Spark.CSharp.Proxy;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Core
|
||||
{
|
||||
|
@ -18,6 +19,9 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
[Serializable]
|
||||
public class RDD<T>
|
||||
{
|
||||
[NonSerialized]
|
||||
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(RDD<T>));
|
||||
|
||||
internal IRDDProxy rddProxy;
|
||||
internal IRDDProxy previousRddProxy;
|
||||
internal SparkContext sparkContext;
|
||||
|
@ -108,6 +112,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
public RDD<T> Cache()
|
||||
{
|
||||
isCached = true;
|
||||
logger.LogInfo("Persisting RDD to default storage cache");
|
||||
RddProxy.Cache();
|
||||
return this;
|
||||
}
|
||||
|
@ -127,6 +132,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
public RDD<T> Persist(StorageLevelType storageLevelType)
|
||||
{
|
||||
isCached = true;
|
||||
logger.LogInfo("Persisting RDD to storage level type {0}", storageLevelType);
|
||||
RddProxy.Persist(storageLevelType);
|
||||
return this;
|
||||
}
|
||||
|
@ -140,6 +146,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
if (isCached)
|
||||
{
|
||||
isCached = false;
|
||||
logger.LogInfo("Unpersisting RDD from the cache");
|
||||
RddProxy.Unpersist();
|
||||
}
|
||||
return this;
|
||||
|
@ -156,6 +163,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
public void Checkpoint()
|
||||
{
|
||||
isCheckpointed = true;
|
||||
logger.LogInfo("Checkpointing RDD to SparkContext.SetCheckpointDir");
|
||||
RddProxy.Checkpoint();
|
||||
}
|
||||
|
||||
|
@ -177,6 +185,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
/// <returns></returns>
|
||||
public RDD<U> Map<U>(Func<T, U> f, bool preservesPartitioning = false)
|
||||
{
|
||||
logger.LogInfo("Executing Map operation on RDD (preservesPartitioning={0})", preservesPartitioning);
|
||||
return MapPartitionsWithIndex(new MapHelper<T, U>(f).Execute, preservesPartitioning);
|
||||
}
|
||||
|
||||
|
@ -579,6 +588,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
/// <returns></returns>
|
||||
public T Reduce(Func<T, T, T> f)
|
||||
{
|
||||
logger.LogInfo("Executing Reduce operation on RDD");
|
||||
Func<int, IEnumerable<T>, IEnumerable<T>> func = new ReduceHelper<T>(f).Execute;
|
||||
var vals = MapPartitionsWithIndex(func, true).Collect();
|
||||
|
||||
|
|
|
@ -63,6 +63,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
public SparkConf SetMaster(string master)
|
||||
{
|
||||
sparkConfProxy.SetMaster(master);
|
||||
logger.LogInfo("Spark master set to {0}", master);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -73,6 +74,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
public SparkConf SetAppName(string appName)
|
||||
{
|
||||
sparkConfProxy.SetAppName(appName);
|
||||
logger.LogInfo("Spark app name set to {0}", appName);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -84,6 +86,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
public SparkConf SetSparkHome(string sparkHome)
|
||||
{
|
||||
sparkConfProxy.SetSparkHome(sparkHome);
|
||||
logger.LogInfo("Spark home set to {0}", sparkHome);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -95,6 +98,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
public SparkConf Set(string key, string value)
|
||||
{
|
||||
sparkConfProxy.Set(key, value);
|
||||
logger.LogInfo("Spark configuration key-value set to {0}={1}", key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -10,11 +10,13 @@ using System.Text;
|
|||
|
||||
using Microsoft.Spark.CSharp.Interop;
|
||||
using Microsoft.Spark.CSharp.Proxy;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Core
|
||||
{
|
||||
public class SparkContext
|
||||
{
|
||||
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(SparkContext));
|
||||
internal ISparkContextProxy SparkContextProxy { get; private set; }
|
||||
internal SparkConf SparkConf { get; private set; }
|
||||
|
||||
|
@ -65,18 +67,15 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
|
||||
public SparkContext(string master, string appName, string sparkHome)
|
||||
: this(master, appName, sparkHome, null)
|
||||
{
|
||||
}
|
||||
{}
|
||||
|
||||
public SparkContext(string master, string appName)
|
||||
: this(master, appName, null, null)
|
||||
{
|
||||
}
|
||||
{}
|
||||
|
||||
public SparkContext(SparkConf conf)
|
||||
: this(null, null, null, conf)
|
||||
{
|
||||
}
|
||||
{}
|
||||
|
||||
/// <summary>
|
||||
/// when created from checkpoint
|
||||
|
@ -114,6 +113,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
|
||||
public RDD<string> TextFile(string filePath, int minPartitions = 0)
|
||||
{
|
||||
logger.LogInfo("Reading text file {0} as RDD<string> with {1} partitions", filePath, minPartitions);
|
||||
return new RDD<string>(SparkContextProxy.TextFile(filePath, minPartitions), this, SerializedMode.String);
|
||||
}
|
||||
|
||||
|
@ -142,6 +142,7 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
if (numSlices < 1)
|
||||
numSlices = 1;
|
||||
|
||||
logger.LogInfo("Parallelizing {0} items to form RDD in the cluster with {1} partitions", collectionOfByteRepresentationOfObjects.Count, numSlices);
|
||||
return new RDD<T>(SparkContextProxy.Parallelize(collectionOfByteRepresentationOfObjects, numSlices), this);
|
||||
}
|
||||
|
||||
|
@ -401,9 +402,16 @@ namespace Microsoft.Spark.CSharp.Core
|
|||
/// </summary>
|
||||
public void Stop()
|
||||
{
|
||||
logger.LogInfo("Stopping SparkContext");
|
||||
logger.LogInfo("Note that there might be error in Spark logs on the failure to delete userFiles directory " +
|
||||
"under Spark temp directory (spark.local.dir config value in local mode)");
|
||||
logger.LogInfo("This error may be ignored for now. See https://issues.apache.org/jira/browse/SPARK-8333 for details");
|
||||
|
||||
if (accumulatorServer != null)
|
||||
accumulatorServer.Shutdown();
|
||||
|
||||
SparkContextProxy.Stop();
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
|
@ -29,26 +29,51 @@ namespace Microsoft.Spark.CSharp.Services
|
|||
Log("Debug", message);
|
||||
}
|
||||
|
||||
public void LogDebug(string messageFormat, params object[] messageParameters)
|
||||
{
|
||||
Log("Debug", string.Format(messageFormat, messageParameters));
|
||||
}
|
||||
|
||||
public void LogInfo(string message)
|
||||
{
|
||||
Log("Info", message);
|
||||
}
|
||||
|
||||
public void LogInfo(string messageFormat, params object[] messageParameters)
|
||||
{
|
||||
Log("Info", string.Format(messageFormat, messageParameters));
|
||||
}
|
||||
|
||||
public void LogWarn(string message)
|
||||
{
|
||||
Log("Warn", message);
|
||||
}
|
||||
|
||||
public void LogWarn(string messageFormat, params object[] messageParameters)
|
||||
{
|
||||
Log("Warn", string.Format(messageFormat, messageParameters));
|
||||
}
|
||||
|
||||
public void LogFatal(string message)
|
||||
{
|
||||
Log("Fatal", message);
|
||||
}
|
||||
|
||||
public void LogFatal(string messageFormat, params object[] messageParameters)
|
||||
{
|
||||
Log("Fatal", string.Format(messageFormat, messageParameters));
|
||||
}
|
||||
|
||||
public void LogError(string message)
|
||||
{
|
||||
Log("Error", message);
|
||||
}
|
||||
|
||||
public void LogError(string messageFormat, params object[] messageParameters)
|
||||
{
|
||||
Log("Error", string.Format(messageFormat, messageParameters));
|
||||
}
|
||||
|
||||
public void LogException(Exception e)
|
||||
{
|
||||
Log("Exception", string.Format("{0}{1}{2}", e.Message, Environment.NewLine, e.StackTrace));
|
||||
|
|
|
@ -10,10 +10,15 @@ namespace Microsoft.Spark.CSharp.Services
|
|||
{
|
||||
ILoggerService GetLoggerInstance(Type type);
|
||||
void LogDebug(string message);
|
||||
void LogDebug(string messageFormat, params object[] messageParameters);
|
||||
void LogInfo(string message);
|
||||
void LogInfo(string messageFormat, params object[] messageParameters);
|
||||
void LogWarn(string message);
|
||||
void LogWarn(string messageFormat, params object[] messageParameters);
|
||||
void LogFatal(string message);
|
||||
void LogFatal(string messageFormat, params object[] messageParameters);
|
||||
void LogError(string message);
|
||||
void LogError(string messageFormat, params object[] messageParameters);
|
||||
void LogException(Exception e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,26 +33,51 @@ namespace Microsoft.Spark.CSharp.Services
|
|||
logger.Debug(message);
|
||||
}
|
||||
|
||||
public void LogDebug(string messageFormat, params object[] messageParameters)
|
||||
{
|
||||
logger.DebugFormat(messageFormat, messageParameters);
|
||||
}
|
||||
|
||||
public void LogInfo(string message)
|
||||
{
|
||||
logger.Info(message);
|
||||
}
|
||||
|
||||
public void LogInfo(string messageFormat, params object[] messageParameters)
|
||||
{
|
||||
logger.InfoFormat(messageFormat, messageParameters);
|
||||
}
|
||||
|
||||
public void LogWarn(string message)
|
||||
{
|
||||
logger.Warn(message);
|
||||
}
|
||||
|
||||
public void LogWarn(string messageFormat, params object[] messageParameters)
|
||||
{
|
||||
logger.WarnFormat(messageFormat, messageParameters);
|
||||
}
|
||||
|
||||
public void LogFatal(string message)
|
||||
{
|
||||
logger.Fatal(message);
|
||||
}
|
||||
|
||||
public void LogFatal(string messageFormat, params object[] messageParameters)
|
||||
{
|
||||
logger.FatalFormat(messageFormat, messageParameters);
|
||||
}
|
||||
|
||||
public void LogError(string message)
|
||||
{
|
||||
logger.Error(message);
|
||||
}
|
||||
|
||||
public void LogError(string messageFormat, params object[] messageParameters)
|
||||
{
|
||||
logger.ErrorFormat(messageFormat, messageParameters);
|
||||
}
|
||||
|
||||
public void LogException(Exception e)
|
||||
{
|
||||
|
||||
|
|
|
@ -12,9 +12,12 @@ namespace Microsoft.Spark.CSharp.Services
|
|||
public class LoggerServiceFactory
|
||||
{
|
||||
private static ILoggerService loggerService = DefaultLoggerService.Instance;
|
||||
|
||||
public static void SetLoggerService(ILoggerService loggerServiceOverride)
|
||||
{
|
||||
loggerService = loggerServiceOverride;
|
||||
var logger = GetLogger(typeof(LoggerServiceFactory));
|
||||
logger.LogInfo("Logger service configured to use {0}", logger.GetType().Name);
|
||||
}
|
||||
|
||||
public static ILoggerService GetLogger(Type type)
|
||||
|
|
|
@ -7,6 +7,7 @@ using System.Globalization;
|
|||
using System.Linq;
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Proxy;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Sql
|
||||
{
|
||||
|
@ -18,6 +19,9 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
[Serializable]
|
||||
public class DataFrame
|
||||
{
|
||||
[NonSerialized]
|
||||
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(DataFrame));
|
||||
|
||||
[NonSerialized]
|
||||
private readonly IDataFrameProxy dataFrameProxy;
|
||||
[NonSerialized]
|
||||
|
@ -119,6 +123,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// <returns>row count</returns>
|
||||
public long Count()
|
||||
{
|
||||
logger.LogInfo("Calculating the number of rows in the dataframe");
|
||||
return dataFrameProxy.Count();
|
||||
}
|
||||
|
||||
|
@ -129,6 +134,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// <param name="truncate">Indicates if strings more than 20 characters long will be truncated</param>
|
||||
public void Show(int numberOfRows = 20, bool truncate = true)
|
||||
{
|
||||
logger.LogInfo("Writing {0} rows in the DataFrame to Console output", numberOfRows);
|
||||
Console.WriteLine(dataFrameProxy.GetShowString(numberOfRows, truncate));
|
||||
}
|
||||
|
||||
|
@ -138,6 +144,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
public void ShowSchema()
|
||||
{
|
||||
var nameTypeList = Schema.Fields.Select(structField => structField.SimpleString);
|
||||
logger.LogInfo("Writing Schema to Console output");
|
||||
Console.WriteLine(string.Join(", ", nameTypeList));
|
||||
}
|
||||
|
||||
|
@ -954,6 +961,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
// write(self)
|
||||
public DataFrameWriter Write()
|
||||
{
|
||||
logger.LogInfo("Using DataFrameWriter to write output data to external data storage");
|
||||
return new DataFrameWriter(dataFrameProxy.Write());
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ using System;
|
|||
using System.Collections.Generic;
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Proxy;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Sql
|
||||
{
|
||||
|
@ -14,6 +15,8 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
public class DataFrameReader
|
||||
{
|
||||
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(DataFrameReader));
|
||||
|
||||
private readonly IDataFrameReaderProxy dataFrameReaderProxy;
|
||||
private readonly SparkContext sparkContext;
|
||||
|
||||
|
@ -27,6 +30,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
public DataFrameReader Format(string source)
|
||||
{
|
||||
logger.LogInfo("Input data source format for the reader is '{0}'", source);
|
||||
dataFrameReaderProxy.Format(source);
|
||||
return this;
|
||||
}
|
||||
|
@ -48,6 +52,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
public DataFrameReader Option(string key, string value)
|
||||
{
|
||||
dataFrameReaderProxy.Options(new Dictionary<string, string>(){{key, value}});
|
||||
logger.LogInfo("Input key-vaue option for the data source is {0}={1}", key, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -75,6 +80,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
public DataFrame Load()
|
||||
{
|
||||
logger.LogInfo("Loading DataFrame using the reader");
|
||||
return new DataFrame(dataFrameReaderProxy.Load(), sparkContext);
|
||||
}
|
||||
|
||||
|
@ -84,6 +90,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
public DataFrame Jdbc(string url, string table, Dictionary<String, String> properties)
|
||||
{
|
||||
logger.LogInfo("Constructing DataFrame using JDBC source. Url={0}, tableName={1}", url, table);
|
||||
return new DataFrame(dataFrameReaderProxy.Jdbc(url, table, properties), sparkContext);
|
||||
}
|
||||
|
||||
|
@ -106,6 +113,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
public DataFrame Jdbc(string url, string table, string columnName, string lowerBound, string upperBound,
|
||||
int numPartitions, Dictionary<String, String> connectionProperties)
|
||||
{
|
||||
logger.LogInfo("Constructing DataFrame using JDBC source. Url={0}, tableName={1}, columnName={2}", url, table, columnName);
|
||||
return new DataFrame(dataFrameReaderProxy.Jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, connectionProperties), sparkContext);
|
||||
}
|
||||
|
||||
|
@ -125,6 +133,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// Normally at least a "user" and "password" property should be included.</param>
|
||||
public DataFrame Jdbc(string url, string table, string[] predicates, Dictionary<String, String> connectionProperties)
|
||||
{
|
||||
logger.LogInfo("Constructing DataFrame using JDBC source. Url={0}, table={1}", url, table);
|
||||
return new DataFrame(dataFrameReaderProxy.Jdbc(url, table, predicates, connectionProperties), sparkContext);
|
||||
}
|
||||
|
||||
|
@ -137,6 +146,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// <param name="path">input path</param>
|
||||
public DataFrame Json(string path)
|
||||
{
|
||||
logger.LogInfo("Constructing DataFrame using JSON source {0}", path);
|
||||
return Format("json").Load(path);
|
||||
}
|
||||
|
||||
|
@ -146,6 +156,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
public DataFrame Parquet(params string[] path)
|
||||
{
|
||||
logger.LogInfo("Constructing DataFrame using Parquet source {0}", string.Join(";", path));
|
||||
return new DataFrame(dataFrameReaderProxy.Parquet(path), sparkContext);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
|
||||
using System.Collections.Generic;
|
||||
using Microsoft.Spark.CSharp.Proxy;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Sql
|
||||
{
|
||||
|
@ -14,6 +15,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
public class DataFrameWriter
|
||||
{
|
||||
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(DataFrameWriter));
|
||||
internal IDataFrameWriterProxy DataFrameWriterProxy
|
||||
{
|
||||
get { return dataFrameWriterProxy; }
|
||||
|
@ -56,6 +58,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
public DataFrameWriter Format(string source)
|
||||
{
|
||||
logger.LogInfo("Output data storage format for the writer is '{0}'", source);
|
||||
dataFrameWriterProxy.Format(source);
|
||||
return this;
|
||||
}
|
||||
|
@ -66,6 +69,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
public DataFrameWriter Option(string key, string value)
|
||||
{
|
||||
var options = new Dictionary<string, string>() { { key, value } };
|
||||
logger.LogInfo("Output key-vaue option for the external data stroage is {0}={1}", key, value);
|
||||
return Options(options);
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ using System;
|
|||
using System.Collections.Generic;
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
using Microsoft.Spark.CSharp.Proxy;
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Sql
|
||||
{
|
||||
|
@ -14,6 +15,8 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
public class SqlContext
|
||||
{
|
||||
private readonly ILoggerService logger = LoggerServiceFactory.GetLogger(typeof(SqlContext));
|
||||
|
||||
private readonly ISqlContextProxy sqlContextProxy;
|
||||
private readonly SparkContext sparkContext;
|
||||
internal ISqlContextProxy SqlContextProxy { get { return sqlContextProxy; } }
|
||||
|
@ -28,6 +31,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
public DataFrameReader Read()
|
||||
{
|
||||
logger.LogInfo("Using DataFrameReader to read input data from external data source");
|
||||
return new DataFrameReader(sqlContextProxy.Read(), sparkContext);
|
||||
}
|
||||
|
||||
|
@ -40,6 +44,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// <returns></returns>
|
||||
public DataFrame ReadDataFrame(string path, StructType schema, Dictionary<string, string> options)
|
||||
{
|
||||
logger.LogInfo("Reading DataFrame from file {0}", path);
|
||||
return new DataFrame(sqlContextProxy.ReadDataFrame(path, schema, options), sparkContext);
|
||||
}
|
||||
|
||||
|
@ -62,6 +67,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// <returns></returns>
|
||||
public DataFrame Sql(string sqlQuery)
|
||||
{
|
||||
logger.LogInfo("SQL query to execute on the dataframe is {0}", sqlQuery);
|
||||
return new DataFrame(sqlContextProxy.Sql(sqlQuery), sparkContext);
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,21 @@
|
|||
to be used in SparkCLR runtime
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Configuration.IConfigurationService">
|
||||
<summary>
|
||||
Helps getting config settings to be used in SparkCLR runtime
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Configuration.IConfigurationService.GetCSharpWorkerExePath">
|
||||
<summary>
|
||||
The full path of the CSharp external backend worker process executable.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Configuration.IConfigurationService.BackendPortNumber">
|
||||
<summary>
|
||||
The port number used for communicating with the CSharp external backend worker process.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Configuration.ConfigurationService.SparkCLRConfiguration">
|
||||
<summary>
|
||||
Default configuration for SparkCLR jobs.
|
||||
|
@ -41,21 +56,6 @@
|
|||
The full path of the CSharp external backend worker process.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Configuration.IConfigurationService">
|
||||
<summary>
|
||||
Helps getting config settings to be used in SparkCLR runtime
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Configuration.IConfigurationService.BackendPortNumber">
|
||||
<summary>
|
||||
The port number used for communicating with the CSharp external backend worker process.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Configuration.IConfigurationService.GetCSharpWorkerExePath">
|
||||
<summary>
|
||||
The full path of the CSharp external backend worker process executable.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Core.Accumulator">
|
||||
<summary>
|
||||
A shared variable that can be accumulated, i.e., has a commutative and associative "add"
|
||||
|
@ -131,23 +131,16 @@
|
|||
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.Broadcast`1.Value">
|
||||
<summary>
|
||||
Return the broadcasted value
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Core.Broadcast`1.Unpersist(System.Boolean)">
|
||||
<summary>
|
||||
Delete cached copies of this broadcast on the executors.
|
||||
</summary>
|
||||
<param name="blocking"></param>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Core.Option`1">
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.Broadcast`1.Value">
|
||||
<summary>
|
||||
Container for an optional value of type T. If the value of type T is present, the Option.IsDefined is TRUE and GetValue() return the value.
|
||||
If the value is absent, the Option.IsDefined is FALSE, exception will be thrown when calling GetValue().
|
||||
Return the broadcasted value
|
||||
</summary>
|
||||
<typeparam name="T"></typeparam>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Core.Option`1">
|
||||
<summary>
|
||||
|
@ -161,6 +154,11 @@
|
|||
Used for collect operation on RDD
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Core.IRDDCollector">
|
||||
<summary>
|
||||
Interface for collect operation on RDD
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Core.DoubleRDDFunctions.Sum(Microsoft.Spark.CSharp.Core.RDD{System.Double})">
|
||||
<summary>
|
||||
Add up the elements in this RDD.
|
||||
|
@ -273,11 +271,6 @@
|
|||
<param name="self"></param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Core.IRDDCollector">
|
||||
<summary>
|
||||
Interface for collect operation on RDD
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Core.OrderedRDDFunctions.SortByKey``2(Microsoft.Spark.CSharp.Core.RDD{System.Collections.Generic.KeyValuePair{``0,``1}},System.Boolean,System.Nullable{System.Int32})">
|
||||
<summary>
|
||||
Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
|
||||
|
@ -354,7 +347,7 @@
|
|||
<summary>
|
||||
Return an RDD with the keys of each tuple.
|
||||
|
||||
>>> m = sc.Parallelize(new[] { new <see cref="!:KeyValuePair<int, int>"/>(1, 2), new <see cref="!:KeyValuePair<int, int>"/>(3, 4) }, 1).Keys().Collect()
|
||||
>>> m = sc.Parallelize(new[] { new <see cref="!:KeyValuePair<int, int>"/>(1, 2), new <see cref="!:KeyValuePair<int, int>"/>(3, 4) }, 1).Keys().Collect()
|
||||
[1, 3]
|
||||
</summary>
|
||||
<typeparam name="K"></typeparam>
|
||||
|
@ -366,7 +359,7 @@
|
|||
<summary>
|
||||
Return an RDD with the values of each tuple.
|
||||
|
||||
>>> m = sc.Parallelize(new[] { new <see cref="!:KeyValuePair<int, int>"/>(1, 2), new <see cref="!:KeyValuePair<int, int>"/>(3, 4) }, 1).Values().Collect()
|
||||
>>> m = sc.Parallelize(new[] { new <see cref="!:KeyValuePair<int, int>"/>(1, 2), new <see cref="!:KeyValuePair<int, int>"/>(3, 4) }, 1).Values().Collect()
|
||||
[2, 4]
|
||||
|
||||
</summary>
|
||||
|
@ -391,7 +384,7 @@
|
|||
new <see cref="!:KeyValuePair<string, int>"/>("b", 1),
|
||||
new <see cref="!:KeyValuePair<string, int>"/>("a", 1)
|
||||
}, 2)
|
||||
.ReduceByKey((x, y) => x + y).Collect()
|
||||
.ReduceByKey((x, y) => x + y).Collect()
|
||||
|
||||
[('a', 2), ('b', 1)]
|
||||
|
||||
|
@ -417,7 +410,7 @@
|
|||
new <see cref="!:KeyValuePair<string, int>"/>("b", 1),
|
||||
new <see cref="!:KeyValuePair<string, int>"/>("a", 1)
|
||||
}, 2)
|
||||
.ReduceByKeyLocally((x, y) => x + y).Collect()
|
||||
.ReduceByKeyLocally((x, y) => x + y).Collect()
|
||||
|
||||
[('a', 2), ('b', 1)]
|
||||
|
||||
|
@ -438,7 +431,7 @@
|
|||
new <see cref="!:KeyValuePair<string, int>"/>("b", 1),
|
||||
new <see cref="!:KeyValuePair<string, int>"/>("a", 1)
|
||||
}, 2)
|
||||
.CountByKey((x, y) => x + y).Collect()
|
||||
.CountByKey((x, y) => x + y).Collect()
|
||||
|
||||
[('a', 2), ('b', 1)]
|
||||
|
||||
|
@ -562,7 +555,7 @@
|
|||
<summary>
|
||||
Return a copy of the RDD partitioned using the specified partitioner.
|
||||
|
||||
sc.Parallelize(new[] { 1, 2, 3, 4, 2, 4, 1 }, 1).Map(x => new <see cref="!:KeyValuePair<int, int>"/>(x, x)).PartitionBy(3).Glom().Collect()
|
||||
sc.Parallelize(new[] { 1, 2, 3, 4, 2, 4, 1 }, 1).Map(x => new <see cref="!:KeyValuePair<int, int>"/>(x, x)).PartitionBy(3).Glom().Collect()
|
||||
</summary>
|
||||
<param name="self"></param>
|
||||
<param name="numPartitions"></param>
|
||||
|
@ -594,7 +587,7 @@
|
|||
new <see cref="!:KeyValuePair<string, int>"/>("b", 1),
|
||||
new <see cref="!:KeyValuePair<string, int>"/>("a", 1)
|
||||
}, 2)
|
||||
.CombineByKey(() => string.Empty, (x, y) => x + y.ToString(), (x, y) => x + y).Collect()
|
||||
.CombineByKey(() => string.Empty, (x, y) => x + y.ToString(), (x, y) => x + y).Collect()
|
||||
|
||||
[('a', '11'), ('b', '1')]
|
||||
</summary>
|
||||
|
@ -625,7 +618,7 @@
|
|||
new <see cref="!:KeyValuePair<string, int>"/>("b", 1),
|
||||
new <see cref="!:KeyValuePair<string, int>"/>("a", 1)
|
||||
}, 2)
|
||||
.CombineByKey(() => string.Empty, (x, y) => x + y.ToString(), (x, y) => x + y).Collect()
|
||||
.CombineByKey(() => string.Empty, (x, y) => x + y.ToString(), (x, y) => x + y).Collect()
|
||||
|
||||
[('a', 2), ('b', 1)]
|
||||
</summary>
|
||||
|
@ -653,7 +646,7 @@
|
|||
new <see cref="!:KeyValuePair<string, int>"/>("b", 1),
|
||||
new <see cref="!:KeyValuePair<string, int>"/>("a", 1)
|
||||
}, 2)
|
||||
.CombineByKey(() => string.Empty, (x, y) => x + y.ToString(), (x, y) => x + y).Collect()
|
||||
.CombineByKey(() => string.Empty, (x, y) => x + y.ToString(), (x, y) => x + y).Collect()
|
||||
|
||||
[('a', 2), ('b', 1)]
|
||||
</summary>
|
||||
|
@ -681,7 +674,7 @@
|
|||
new <see cref="!:KeyValuePair<string, int>"/>("b", 1),
|
||||
new <see cref="!:KeyValuePair<string, int>"/>("a", 1)
|
||||
}, 2)
|
||||
.GroupByKey().MapValues(l => string.Join(" ", l)).Collect()
|
||||
.GroupByKey().MapValues(l => string.Join(" ", l)).Collect()
|
||||
|
||||
[('a', [1, 1]), ('b', [1])]
|
||||
|
||||
|
@ -703,7 +696,7 @@
|
|||
new <see cref="!:KeyValuePair<string, string[]>"/>("a", new[]{"apple", "banana", "lemon"}),
|
||||
new <see cref="!:KeyValuePair<string, string[]>"/>("b", new[]{"grapes"})
|
||||
}, 2)
|
||||
.MapValues(x => x.Length).Collect()
|
||||
.MapValues(x => x.Length).Collect()
|
||||
|
||||
[('a', 3), ('b', 1)]
|
||||
|
||||
|
@ -726,7 +719,7 @@
|
|||
new <see cref="!:KeyValuePair<string, string[]>"/>("a", new[]{"x", "y", "z"}),
|
||||
new <see cref="!:KeyValuePair<string, string[]>"/>("b", new[]{"p", "r"})
|
||||
}, 2)
|
||||
.FlatMapValues(x => x).Collect()
|
||||
.FlatMapValues(x => x).Collect()
|
||||
|
||||
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
|
||||
|
||||
|
@ -782,7 +775,7 @@
|
|||
var y = sc.Parallelize(new[] { new <see cref="!:KeyValuePair<string, int>"/>("a", 1), new <see cref="!:KeyValuePair<string, int>"/>("b", 4) }, 2);
|
||||
var z = sc.Parallelize(new[] { new <see cref="!:KeyValuePair<string, int>"/>("a", 2) }, 1);
|
||||
var w = sc.Parallelize(new[] { new <see cref="!:KeyValuePair<string, int>"/>("b", 42) }, 1);
|
||||
var m = x.GroupWith(y, z, w).MapValues(l => string.Join(" ", l.Item1) + " : " + string.Join(" ", l.Item2) + " : " + string.Join(" ", l.Item3) + " : " + string.Join(" ", l.Item4)).Collect();
|
||||
var m = x.GroupWith(y, z, w).MapValues(l => string.Join(" ", l.Item1) + " : " + string.Join(" ", l.Item2) + " : " + string.Join(" ", l.Item3) + " : " + string.Join(" ", l.Item4)).Collect();
|
||||
</summary>
|
||||
<typeparam name="K"></typeparam>
|
||||
<typeparam name="V"></typeparam>
|
||||
|
@ -821,9 +814,9 @@
|
|||
is done efficiently if the RDD has a known partitioner by only
|
||||
searching the partition that the key maps to.
|
||||
|
||||
>>> l = range(1000)
|
||||
>>> rdd = sc.Parallelize(Enumerable.Range(0, 1000).Zip(Enumerable.Range(0, 1000), (x, y) => new <see cref="!:KeyValuePair<int, int>"/>(x, y)), 10)
|
||||
>>> rdd.lookup(42)
|
||||
>>> l = range(1000)
|
||||
>>> rdd = sc.Parallelize(Enumerable.Range(0, 1000).Zip(Enumerable.Range(0, 1000), (x, y) => new <see cref="!:KeyValuePair<int, int>"/>(x, y)), 10)
|
||||
>>> rdd.lookup(42)
|
||||
[42]
|
||||
|
||||
</summary>
|
||||
|
@ -914,14 +907,6 @@
|
|||
</summary>
|
||||
</member>
|
||||
<!-- Badly formed XML comment ignored for member "T:Microsoft.Spark.CSharp.Core.PipelinedRDD`1" -->
|
||||
<member name="T:Microsoft.Spark.CSharp.Core.PipelinedRDD`1.MapPartitionsWithIndexHelper`2">
|
||||
<summary>
|
||||
This class is defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating
|
||||
private anonymous type that is not serializable. Since the delegate has to be serialized and sent to the Spark workers
|
||||
for execution, it is necessary to have the type marked [Serializable]. This class is to work around the limitation
|
||||
on the serializability of compiler generated types
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Core.RDD`1">
|
||||
<summary>
|
||||
Represents a Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
|
||||
|
@ -931,21 +916,6 @@
|
|||
</summary>
|
||||
<typeparam name="T">Type of the RDD</typeparam>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.RDD`1.IsCached">
|
||||
<summary>
|
||||
Return whether this RDD has been cached or not
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.RDD`1.IsCheckpointed">
|
||||
<summary>
|
||||
Return whether this RDD has been checkpointed or not
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.RDD`1.Name">
|
||||
<summary>
|
||||
Return the name of this RDD.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Core.RDD`1.Cache">
|
||||
<summary>
|
||||
Persist this RDD with the default storage level <see cref="F:Microsoft.Spark.CSharp.Core.StorageLevelType.MEMORY_ONLY_SER"/>.
|
||||
|
@ -987,7 +957,7 @@
|
|||
<summary>
|
||||
Return a new RDD by applying a function to each element of this RDD.
|
||||
|
||||
sc.Parallelize(new string[]{"b", "a", "c"}, 1).Map(x => new <see cref="!:KeyValuePair<string, int>"/>(x, 1)).Collect()
|
||||
sc.Parallelize(new string[]{"b", "a", "c"}, 1).Map(x => new <see cref="!:KeyValuePair<string, int>"/>(x, 1)).Collect()
|
||||
[('a', 1), ('b', 1), ('c', 1)]
|
||||
|
||||
</summary>
|
||||
|
@ -1028,7 +998,7 @@
|
|||
Return a new RDD by applying a function to each partition of this RDD,
|
||||
while tracking the index of the original partition.
|
||||
|
||||
<see cref="!:sc.Parallelize(new int[]<1, 2, 3, 4>, 4).MapPartitionsWithIndex<double>"/>((pid, iter) => (double)pid).Sum()
|
||||
<see cref="!:sc.Parallelize(new int[]<1, 2, 3, 4>, 4).MapPartitionsWithIndex<double>"/>((pid, iter) => (double)pid).Sum()
|
||||
6
|
||||
</summary>
|
||||
<typeparam name="U"></typeparam>
|
||||
|
@ -1496,7 +1466,7 @@
|
|||
n is the number of partitions. So there may exist gaps, but this
|
||||
method won't trigger a spark job, which is different from <see cref="M:Microsoft.Spark.CSharp.Core.RDD`1.ZipWithIndex"/>
|
||||
|
||||
>>> sc.Parallelize(new string[] { "a", "b", "c", "d" }, 1).ZipWithIndex().Collect()
|
||||
>>> sc.Parallelize(new string[] { "a", "b", "c", "d" }, 1).ZipWithIndex().Collect()
|
||||
[('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
|
||||
|
||||
</summary>
|
||||
|
@ -1550,6 +1520,29 @@
|
|||
<param name="seed">the seed for the Random number generator</param>
|
||||
<returns>A random sub-sample of the RDD without replacement.</returns>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.RDD`1.IsCached">
|
||||
<summary>
|
||||
Return whether this RDD has been cached or not
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.RDD`1.IsCheckpointed">
|
||||
<summary>
|
||||
Return whether this RDD has been checkpointed or not
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.RDD`1.Name">
|
||||
<summary>
|
||||
Return the name of this RDD.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Core.PipelinedRDD`1.MapPartitionsWithIndexHelper`2">
|
||||
<summary>
|
||||
This class is defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating
|
||||
private anonymous type that is not serializable. Since the delegate has to be serialized and sent to the Spark workers
|
||||
for execution, it is necessary to have the type marked [Serializable]. This class is to work around the limitation
|
||||
on the serializability of compiler generated types
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Core.StringRDDFunctions">
|
||||
<summary>
|
||||
Some useful utility functions for <c>RDD{string}</c>
|
||||
|
@ -1747,36 +1740,6 @@
|
|||
<param name="key">Key to use</param>
|
||||
<param name="defaultValue">Default value to use</param>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.Version">
|
||||
<summary>
|
||||
The version of Spark on which this application is running.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.StartTime">
|
||||
<summary>
|
||||
Return the epoch time when the Spark Context was started.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.DefaultParallelism">
|
||||
<summary>
|
||||
Default level of parallelism to use when not given by user (e.g. for reduce tasks)
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.DefaultMinPartitions">
|
||||
<summary>
|
||||
Default min number of partitions for Hadoop RDDs when not given by user
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.SparkUser">
|
||||
<summary>
|
||||
Get SPARK_USER for user who is running SparkContext.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.StatusTracker">
|
||||
<summary>
|
||||
Return :class:`StatusTracker` object
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Core.SparkContext.#ctor(Microsoft.Spark.CSharp.Proxy.ISparkContextProxy,Microsoft.Spark.CSharp.Core.SparkConf)">
|
||||
<summary>
|
||||
when created from checkpoint
|
||||
|
@ -2090,6 +2053,36 @@
|
|||
Cancel all jobs that have been scheduled or are running.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.Version">
|
||||
<summary>
|
||||
The version of Spark on which this application is running.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.StartTime">
|
||||
<summary>
|
||||
Return the epoch time when the Spark Context was started.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.DefaultParallelism">
|
||||
<summary>
|
||||
Default level of parallelism to use when not given by user (e.g. for reduce tasks)
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.DefaultMinPartitions">
|
||||
<summary>
|
||||
Default min number of partitions for Hadoop RDDs when not given by user
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.SparkUser">
|
||||
<summary>
|
||||
Get SPARK_USER for user who is running SparkContext.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Core.SparkContext.StatusTracker">
|
||||
<summary>
|
||||
Return :class:`StatusTracker` object
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Core.StatCounter.Merge(System.Double)">
|
||||
<summary>
|
||||
Add a value into this StatCounter, updating the internal statistics.
|
||||
|
@ -2195,6 +2188,11 @@
|
|||
Extend method to sort items in a JSON array by keys.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Interop.SparkCLREnvironment">
|
||||
<summary>
|
||||
Contains everything needed to setup an environment for using C# with Spark
|
||||
</summary>
|
||||
</member>
|
||||
<!-- Badly formed XML comment ignored for member "T:Microsoft.Spark.CSharp.Interop.Ipc.IJvmBridge" -->
|
||||
<!-- Badly formed XML comment ignored for member "T:Microsoft.Spark.CSharp.Interop.Ipc.JvmBridge" -->
|
||||
<member name="T:Microsoft.Spark.CSharp.Interop.Ipc.JvmObjectReference">
|
||||
|
@ -2213,11 +2211,6 @@
|
|||
</summary>
|
||||
</member>
|
||||
<!-- Badly formed XML comment ignored for member "T:Microsoft.Spark.CSharp.Interop.Ipc.SerDe" -->
|
||||
<member name="T:Microsoft.Spark.CSharp.Interop.SparkCLREnvironment">
|
||||
<summary>
|
||||
Contains everything needed to setup an environment for using C# with Spark
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Proxy.Ipc.DataFrameIpcProxy.Intersect(Microsoft.Spark.CSharp.Proxy.IDataFrameProxy)">
|
||||
<summary>
|
||||
Call https://github.com/apache/spark/blob/branch-1.4/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala, intersect(other: DataFrame): DataFrame
|
||||
|
@ -3462,7 +3455,7 @@
|
|||
<member name="M:Microsoft.Spark.CSharp.Sql.SqlContext.RegisterFunction``1(System.String,System.Func{``0})">
|
||||
<summary>
|
||||
Register UDF with no input argument, e.g:
|
||||
<see cref="!:SqlContext.RegisterFunction<bool>"/>("MyFilter", () => true);
|
||||
<see cref="!:SqlContext.RegisterFunction<bool>"/>("MyFilter", () => true);
|
||||
sqlContext.Sql("SELECT * FROM MyTable where MyFilter()");
|
||||
</summary>
|
||||
<typeparam name="RT"></typeparam>
|
||||
|
@ -3472,7 +3465,7 @@
|
|||
<member name="M:Microsoft.Spark.CSharp.Sql.SqlContext.RegisterFunction``2(System.String,System.Func{``1,``0})">
|
||||
<summary>
|
||||
Register UDF with 1 input argument, e.g:
|
||||
<see cref="!:SqlContext.RegisterFunction<bool, string>"/>("MyFilter", (arg1) => arg1 != null);
|
||||
<see cref="!:SqlContext.RegisterFunction<bool, string>"/>("MyFilter", (arg1) => arg1 != null);
|
||||
sqlContext.Sql("SELECT * FROM MyTable where MyFilter(columnName1)");
|
||||
</summary>
|
||||
<typeparam name="RT"></typeparam>
|
||||
|
@ -3525,11 +3518,6 @@
|
|||
</summary>
|
||||
<typeparam name="T"></typeparam>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Streaming.DStream`1.SlideDuration">
|
||||
<summary>
|
||||
Return the slideDuration in seconds of this DStream
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.DStream`1.Count">
|
||||
<summary>
|
||||
Return a new DStream in which each RDD has a single element
|
||||
|
@ -3787,6 +3775,11 @@
|
|||
<param name="numPartitions">number of partitions of each RDD in the new DStream.</param>
|
||||
<returns></returns>
|
||||
</member>
|
||||
<member name="P:Microsoft.Spark.CSharp.Streaming.DStream`1.SlideDuration">
|
||||
<summary>
|
||||
Return the slideDuration in seconds of this DStream
|
||||
</summary>
|
||||
</member>
|
||||
<member name="T:Microsoft.Spark.CSharp.Streaming.MapPartitionsWithIndexHelper`2">
|
||||
<summary>
|
||||
Following classes are defined explicitly instead of using anonymous method as delegate to prevent C# compiler from generating
|
||||
|
@ -4143,7 +4136,7 @@
|
|||
Wait for the execution to stop.
|
||||
</summary>
|
||||
</member>
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.AwaitTermination(System.Int32)">
|
||||
<member name="M:Microsoft.Spark.CSharp.Streaming.StreamingContext.AwaitTerminationOrTimeout(System.Int32)">
|
||||
<summary>
|
||||
Wait for the execution to stop.
|
||||
</summary>
|
||||
|
|
Различия файлов скрыты, потому что одна или несколько строк слишком длинны
|
@ -0,0 +1,71 @@
|
|||
@setlocal
|
||||
@ECHO off
|
||||
|
||||
SET CMDHOME=%~dp0
|
||||
@REM Remove trailing backslash \
|
||||
set CMDHOME=%CMDHOME:~0,-1%
|
||||
|
||||
@REM Set some .NET directory locations required if running from PowerShell prompt.
|
||||
if "%FrameworkDir%" == "" set FrameworkDir=%WINDIR%\Microsoft.NET\Framework
|
||||
if "%FrameworkVersion%" == "" set FrameworkVersion=v4.0.30319
|
||||
|
||||
SET MSBUILDEXEDIR=%FrameworkDir%\%FrameworkVersion%
|
||||
SET MSBUILDEXE=%MSBUILDEXEDIR%\MSBuild.exe
|
||||
SET MSBUILDOPT=/verbosity:minimal
|
||||
|
||||
if "%builduri%" == "" set builduri=Build.cmd
|
||||
|
||||
cd "%CMDHOME%"
|
||||
@cd
|
||||
|
||||
set PROJ_NAME=Examples
|
||||
set PROJ=%CMDHOME%\%PROJ_NAME%.sln
|
||||
|
||||
@echo ===== Building %PROJ% =====
|
||||
|
||||
@echo Restore NuGet packages ===================
|
||||
SET STEP=NuGet-Restore
|
||||
|
||||
nuget restore "%PROJ%"
|
||||
|
||||
@if ERRORLEVEL 1 GOTO :ErrorStop
|
||||
|
||||
@echo Build Debug ==============================
|
||||
SET STEP=Debug
|
||||
|
||||
SET CONFIGURATION=%STEP%
|
||||
|
||||
SET STEP=%CONFIGURATION%
|
||||
|
||||
"%MSBUILDEXE%" /p:Configuration=%CONFIGURATION% %MSBUILDOPT% "%PROJ%"
|
||||
@if ERRORLEVEL 1 GOTO :ErrorStop
|
||||
@echo BUILD ok for %CONFIGURATION% %PROJ%
|
||||
|
||||
@echo Build Release ============================
|
||||
SET STEP=Release
|
||||
|
||||
SET CONFIGURATION=%STEP%
|
||||
|
||||
"%MSBUILDEXE%" /p:Configuration=%CONFIGURATION% %MSBUILDOPT% "%PROJ%"
|
||||
@if ERRORLEVEL 1 GOTO :ErrorStop
|
||||
@echo BUILD ok for %CONFIGURATION% %PROJ%
|
||||
|
||||
if EXIST %PROJ_NAME%.nuspec (
|
||||
@echo ===== Build NuGet package for %PROJ% =====
|
||||
SET STEP=NuGet-Pack
|
||||
|
||||
powershell -f %CMDHOME%\..\build\localmode\nugetpack.ps1
|
||||
@if ERRORLEVEL 1 GOTO :ErrorStop
|
||||
@echo NuGet package ok for %PROJ%
|
||||
)
|
||||
|
||||
@echo ===== Build succeeded for %PROJ% =====
|
||||
|
||||
@GOTO :EOF
|
||||
|
||||
:ErrorStop
|
||||
set RC=%ERRORLEVEL%
|
||||
if "%STEP%" == "" set STEP=%CONFIGURATION%
|
||||
@echo ===== Build FAILED for %PROJ% -- %STEP% with error %RC% - CANNOT CONTINUE =====
|
||||
exit /B %RC%
|
||||
:EOF
|
|
@ -0,0 +1,3 @@
|
|||
FOR /F "tokens=*" %%G IN ('DIR /B /AD /S bin') DO RMDIR /S /Q "%%G"
|
||||
FOR /F "tokens=*" %%G IN ('DIR /B /AD /S obj') DO RMDIR /S /Q "%%G"
|
||||
@REM FOR /F "tokens=*" %%G IN ('DIR /B /AD /S TestResults') DO RMDIR /S /Q "%%G"
|
|
@ -20,16 +20,22 @@ namespace Microsoft.Spark.CSharp.Examples
|
|||
LoggerServiceFactory.SetLoggerService(Log4NetLoggerService.Instance); //this is optional - DefaultLoggerService will be used if not set
|
||||
var logger = LoggerServiceFactory.GetLogger(typeof(JdbcDataFrameExample));
|
||||
|
||||
//For SQL Server use the connection string formats below
|
||||
//"jdbc:sqlserver://localhost:1433;databaseName=Temp;integratedSecurity=true;" or
|
||||
//"jdbc:sqlserver://localhost;databaseName=Temp;user=MyUserName;password=myPassword;"
|
||||
var connectionString = args[0];
|
||||
var tableName = args[1];
|
||||
|
||||
var sparkConf = new SparkConf();
|
||||
var sparkContext = new SparkContext(sparkConf);
|
||||
var sqlContext = new SqlContext(sparkContext);
|
||||
var df = sqlContext.Read()
|
||||
.Jdbc("jdbc:sqlserver://localhost:1433;databaseName=Temp;;integratedSecurity=true;", "xyz",
|
||||
new Dictionary<string, string>());
|
||||
var df = sqlContext
|
||||
.Read()
|
||||
.Jdbc(connectionString, tableName, new Dictionary<string, string>());
|
||||
df.ShowSchema();
|
||||
var rowCount = df.Count();
|
||||
logger.LogInfo("Row count is " + rowCount);
|
||||
|
||||
sparkContext.Stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,13 +18,17 @@ namespace Microsoft.Spark.CSharp.Examples
|
|||
LoggerServiceFactory.SetLoggerService(Log4NetLoggerService.Instance); //this is optional - DefaultLoggerService will be used if not set
|
||||
var logger = LoggerServiceFactory.GetLogger(typeof(SparkXmlExample));
|
||||
|
||||
var inputXmlFilePath = args[0];
|
||||
var outputXmlFilePath = args[1];
|
||||
|
||||
var sparkConf = new SparkConf();
|
||||
sparkConf.SetAppName("myapp");
|
||||
var sparkContext = new SparkContext(sparkConf);
|
||||
var sqlContext = new SqlContext(sparkContext);
|
||||
var df = sqlContext.Read()
|
||||
.Format("com.databricks.spark.xml")
|
||||
.Option("rowTag", "book")
|
||||
.Load(@"D:\temp\spark-xml\books.xml");
|
||||
.Format("com.databricks.spark.xml")
|
||||
.Option("rowTag", "book")
|
||||
.Load(inputXmlFilePath); //"D:\temp\books.xml", "file:/D:/temp/books.xml" or "hdfs://temp/books.xml"
|
||||
df.ShowSchema();
|
||||
var rowCount = df.Count();
|
||||
logger.LogInfo("Row count is " + rowCount);
|
||||
|
@ -32,10 +36,12 @@ namespace Microsoft.Spark.CSharp.Examples
|
|||
var selectedData = df.Select("author", "@id");
|
||||
|
||||
selectedData.Write()
|
||||
.Format("com.databricks.spark.xml")
|
||||
.Option("rootTag", "books")
|
||||
.Option("rowTag", "book")
|
||||
.Save(@"D:\temp\spark-xml\newbooks.xml");
|
||||
.Format("com.databricks.spark.xml")
|
||||
.Option("rootTag", "books")
|
||||
.Option("rowTag", "book")
|
||||
.Save(outputXmlFilePath); //"D:\temp\booksUpdated.xml", "file:/D:/temp/booksUpdated.xml" or "hdfs://temp/booksUpdated.xml"
|
||||
|
||||
sparkContext.Stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
#!/bin/bash
|
||||
|
||||
export FWDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
|
||||
export XBUILDOPT=/verbosity:minimal
|
||||
|
||||
if [ $builduri = "" ];
|
||||
then
|
||||
export builduri=build.sh
|
||||
fi
|
||||
|
||||
export PROJ_NAME=Examples
|
||||
export PROJ="$FWDIR/$PROJ_NAME.sln"
|
||||
|
||||
echo "===== Building $PROJ ====="
|
||||
|
||||
function error_exit() {
|
||||
if [ "$STEP" = "" ];
|
||||
then
|
||||
export STEP=$CONFIGURATION
|
||||
fi
|
||||
echo "===== Build FAILED for $PROJ -- $STEP with error $RC - CANNOT CONTINUE ====="
|
||||
exit 1
|
||||
}
|
||||
|
||||
echo "Restore NuGet packages ==================="
|
||||
export STEP=NuGet-Restore
|
||||
|
||||
nuget restore
|
||||
|
||||
export RC=$? && [ $RC -ne 0 ] && error_exit
|
||||
|
||||
echo "Build Debug =============================="
|
||||
export STEP=Debug
|
||||
|
||||
export CONFIGURATION=$STEP
|
||||
|
||||
export STEP=$CONFIGURATION
|
||||
|
||||
xbuild /p:Configuration=$CONFIGURATION $XBUILDOPT $PROJ
|
||||
export RC=$? && [ $RC -ne 0 ] && error_exit
|
||||
echo "BUILD ok for $CONFIGURATION $PROJ"
|
||||
|
||||
echo "Build Release ============================"
|
||||
export STEP=Release
|
||||
|
||||
export CONFIGURATION=$STEP
|
||||
|
||||
xbuild /p:Configuration=$CONFIGURATION $XBUILDOPT $PROJ
|
||||
export RC=$? && [ $RC -ne 0 ] && error_exit
|
||||
echo "BUILD ok for $CONFIGURATION $PROJ"
|
||||
|
||||
echo "===== Build succeeded for $PROJ ====="
|
|
@ -0,0 +1,16 @@
|
|||
#!/bin/bash
|
||||
|
||||
for g in `find . -type d -name bin`
|
||||
do
|
||||
rm -r -f "$g"
|
||||
done
|
||||
|
||||
for g in `find . -type d -name obj`
|
||||
do
|
||||
rm -r -f "$g"
|
||||
done
|
||||
|
||||
# for g in `find . -type d -name TestResults`
|
||||
# do
|
||||
# rm -r -f "$g"
|
||||
# done
|
|
@ -3,94 +3,78 @@
|
|||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Reflection;
|
||||
using Microsoft.Spark.CSharp.Core;
|
||||
|
||||
[assembly: log4net.Config.XmlConfigurator(Watch = true)]
|
||||
using Microsoft.Spark.CSharp.Services;
|
||||
|
||||
namespace Microsoft.Spark.CSharp.Examples
|
||||
{
|
||||
/// <summary>
|
||||
/// SparkCLR Pi example
|
||||
/// Calculate Pi
|
||||
/// Reference: https://github.com/apache/spark/blob/branch-1.5/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
|
||||
/// </summary>
|
||||
public static class PiExample
|
||||
{
|
||||
internal static log4net.ILog Logger { get { return log4net.LogManager.GetLogger(typeof(PiExample)); } }
|
||||
internal static SparkContext SparkContext;
|
||||
|
||||
private static void Main(string[] args)
|
||||
private static ILoggerService Logger;
|
||||
public static void Main(string[] args)
|
||||
{
|
||||
var success = true;
|
||||
LoggerServiceFactory.SetLoggerService(Log4NetLoggerService.Instance); //this is optional - DefaultLoggerService will be used if not set
|
||||
Logger = LoggerServiceFactory.GetLogger(typeof(PiExample));
|
||||
|
||||
SparkContext = CreateSparkContext();
|
||||
var sparkContext = new SparkContext(new SparkConf());
|
||||
|
||||
var stopWatch = Stopwatch.StartNew();
|
||||
var clockStart = stopWatch.Elapsed;
|
||||
try
|
||||
{
|
||||
Logger.Info("----- Running Pi example -----");
|
||||
const int slices = 3;
|
||||
var numberOfItems = (int)Math.Min(100000L * slices, int.MaxValue);
|
||||
var values = new List<int>(numberOfItems);
|
||||
for (var i = 0; i <= numberOfItems; i++)
|
||||
{
|
||||
values.Add(i);
|
||||
}
|
||||
|
||||
Pi();
|
||||
var rdd = sparkContext.Parallelize(values, slices);
|
||||
|
||||
var duration = stopWatch.Elapsed - clockStart;
|
||||
Logger.InfoFormat("----- Successfully finished running Pi example (duration={0}) -----", duration);
|
||||
CalculatePiUsingAnonymousMethod(numberOfItems, rdd);
|
||||
|
||||
CalculatePiUsingSerializedClassApproach(numberOfItems, rdd);
|
||||
|
||||
Logger.LogInfo("Completed calculating the value of Pi");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
success = false;
|
||||
var duration = stopWatch.Elapsed - clockStart;
|
||||
Logger.InfoFormat("----- Error running Pi example (duration={0}) -----{1}{2}", duration, Environment.NewLine, ex);
|
||||
Logger.LogError("Error calculating Pi");
|
||||
Logger.LogException(ex);
|
||||
}
|
||||
|
||||
Logger.Info("Completed running examples. Calling SparkContext.Stop() to tear down ...");
|
||||
// following comment is necessary due to known issue in Spark. See https://issues.apache.org/jira/browse/SPARK-8333
|
||||
Logger.Info("If this program (SparkCLRExamples.exe) does not terminate in 10 seconds, please manually terminate java process launched by this program!!!");
|
||||
sparkContext.Stop();
|
||||
|
||||
SparkContext.Stop();
|
||||
|
||||
if (!success)
|
||||
{
|
||||
Environment.Exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Calculate Pi
|
||||
/// Reference: https://github.com/apache/spark/blob/branch-1.5/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
|
||||
/// </summary>
|
||||
private static void Pi()
|
||||
private static void CalculatePiUsingSerializedClassApproach(int n, RDD<int> rdd)
|
||||
{
|
||||
const int slices = 3;
|
||||
var n = (int)Math.Min(100000L * slices, int.MaxValue);
|
||||
var values = new List<int>(n);
|
||||
for (var i = 0; i <= n; i++)
|
||||
{
|
||||
values.Add(i);
|
||||
}
|
||||
var count = rdd
|
||||
.Map(new PiHelper().Execute)
|
||||
.Reduce((x, y) => x + y);
|
||||
|
||||
//
|
||||
// Anonymous method approach
|
||||
//
|
||||
var count = SparkContext.Parallelize(values, slices)
|
||||
Logger.LogInfo(string.Format("(serialized class approach) Pi is roughly {0}.", 4.0 * count / n));
|
||||
}
|
||||
|
||||
private static void CalculatePiUsingAnonymousMethod(int n, RDD<int> rdd)
|
||||
{
|
||||
var count = rdd
|
||||
.Map(i =>
|
||||
{
|
||||
var random = new Random();
|
||||
var x = random.NextDouble() * 2 - 1;
|
||||
var y = random.NextDouble() * 2 - 1;
|
||||
{
|
||||
var random = new Random();
|
||||
var x = random.NextDouble() * 2 - 1;
|
||||
var y = random.NextDouble() * 2 - 1;
|
||||
|
||||
return (x * x + y * y) < 1 ? 1 : 0;
|
||||
}
|
||||
).Reduce((x, y) => x + y);
|
||||
Logger.InfoFormat("(anonymous method approach) Pi is roughly {0}.", 4.0 * (int)count / n);
|
||||
return (x * x + y * y) < 1 ? 1 : 0;
|
||||
})
|
||||
.Reduce((x, y) => x + y);
|
||||
|
||||
//
|
||||
// Serialized class approach, an alternative to the anonymous method approach above
|
||||
//
|
||||
var countComputedUsingAnotherApproach = SparkContext.Parallelize(values, slices).Map(new PiHelper().Execute).Reduce((x, y) => x + y);
|
||||
var approximatePiValue = 4.0 * countComputedUsingAnotherApproach / n;
|
||||
Logger.InfoFormat("(serialized class approach) Pi is roughly {0}.", approximatePiValue);
|
||||
Logger.LogInfo(string.Format("(anonymous method approach) Pi is roughly {0}.", 4.0 * count / n));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -108,26 +92,5 @@ namespace Microsoft.Spark.CSharp.Examples
|
|||
return (x * x + y * y) < 1 ? 1 : 0;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates and returns a context
|
||||
/// </summary>
|
||||
/// <returns>SparkContext</returns>
|
||||
private static SparkContext CreateSparkContext()
|
||||
{
|
||||
var conf = new SparkConf();
|
||||
|
||||
// set up local directory
|
||||
var tempDir = Environment.GetEnvironmentVariable("spark.local.dir");
|
||||
if (string.IsNullOrEmpty(tempDir))
|
||||
{
|
||||
tempDir = Path.GetTempPath();
|
||||
}
|
||||
|
||||
conf.Set("spark.local.dir", tempDir);
|
||||
Logger.DebugFormat("spark.local.dir is set to {0}", tempDir);
|
||||
|
||||
return new SparkContext(conf);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,102 @@
|
|||
## Pre-Requisites
|
||||
The following software need to be installed and appropriate environment variables must to be set to run SparkCLR applications.
|
||||
|
||||
| |Version | Environment variables |Notes |
|
||||
|---|----|-----------------------------------------------------|------|
|
||||
|JDK |7u85 or 8u60 ([OpenJDK](http://www.azul.com/downloads/zulu/zulu-windows/) or [Oracle JDK](http://www.oracle.com/technetwork/java/javase/downloads/index.html)) |JAVA_HOME | After setting JAVA_HOME, run `set PATH=%PATH%;%JAVA_HOME%\bin` to add java to PATH |
|
||||
|Spark |[1.5.2 or 1.6.0](http://spark.apache.org/downloads.html) | SPARK_HOME |Spark can be downloaded from Spark download website. Alternatively, if you used [`RunSamples.cmd`](../csharp/Samples/Microsoft.Spark.CSharp/samplesusage.md) to run SparkCLR samples, you can find `toos\spark*` directory (under [`build`](../build) directory) that can be used as SPARK_HOME |
|
||||
|winutils.exe | see [Running Hadoop on Windows](https://wiki.apache.org/hadoop/WindowsProblems) for details |HADOOP_HOME |Spark in Windows needs this utility in `%HADOOP_HOME%\bin` directory. It can be copied over from any Hadoop distribution. Alternative, if you used [`RunSamples.cmd`](../csharp/Samples/Microsoft.Spark.CSharp/samplesusage.md) to run SparkCLR samples, you can find `toos\winutils` directory (under [`build`](../build) directory) that can be used as HADOOP_HOME |
|
||||
|SparkCLR |[v1.5.200](https://github.com/Microsoft/SparkCLR/releases) or v1.6.000-SNAPSHOT | SPARKCLR_HOME |If you downloaded a [SparkCLR release](https://github.com/Microsoft/SparkCLR/releases), SPARKCLR_HOME should be set to the directory named `runtime` (for example, `D:\downloads\spark-clr_2.10-1.5.200\runtime`). Alternatively, if you used [`RunSamples.cmd`](../csharp/Samples/Microsoft.Spark.CSharp/samplesusage.md) to run SparkCLR samples, you can find `runtime` directory (under [`build`](../build) directory) that can be used as SPARKCLR_HOME. **Note** - setting SPARKCLR_HOME is _optional_ and it is set by sparkclr-submit.cmd if not set. |
|
||||
|
||||
## Windows Instructions
|
||||
### Local Mode
|
||||
To use SparkCLR with Spark available locally in a machine, navigate to `%SPARKCLR_HOME%\scripts` directory and run the following command
|
||||
|
||||
`sparkclr-submit.cmd <spark arguments> --exe <SparkCLR driver name> <path to driver> <driver arguments>`
|
||||
|
||||
**Notes**
|
||||
* `<spark arguments>` - Standard arguments support by Apache Spark except `--class`. See [spark-submit.cmd arguments] (http://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit) for details
|
||||
* `<SparkCLR driver name>` - name of the C# application that implement SparkCLR driver
|
||||
* `<path to driver>` - directory contains driver executable and all its dependencies
|
||||
* `<driver arguments>` - command line arguments to driver executable
|
||||
|
||||
**Sample Commands**
|
||||
* `sparkclr-submit.cmd` `--total-executor-cores 2` `--exe SparkClrPi.exe C:\Git\SparkCLR\examples\Pi\bin\Debug`
|
||||
* `sparkclr-submit.cmd` `--conf spark.local.dir=C:\sparktemp` `--exe SparkClrPi.exe C:\Git\SparkCLR\examples\Pi\bin\Debug`
|
||||
* `sparkclr-submit.cmd` `--jars c:\dependency\some.jar` `--exe SparkClrPi.exe C:\Git\SparkCLR\examples\Pi\bin\Debug`
|
||||
|
||||
### Debug Mode
|
||||
Debug mode is used to step through the C# code in Visual Studio during a debugging session. With this mode, driver-side operations can be debugged.
|
||||
|
||||
1. Navigate to `%SPARKCLR_HOME%\scripts` directory and run `sparkclr-submit.cmd debug`
|
||||
2. Look for the message in the console output that looks like "Port number used by CSharpBackend is <portnumber>". Note down the port number and use it in the next step
|
||||
3. Add the following XML snippet to App.Config in the Visual Studio project for SparkCLR application that you want to debug and start debugging
|
||||
```
|
||||
<appSettings>
|
||||
<add key="CSharpWorkerPath" value="/path/to/CSharpWorker.exe"/>
|
||||
<add key="CSharpBackendPortNumber" value="port_number_from_previous_step"/>
|
||||
</appSettings>
|
||||
```
|
||||
|
||||
**Notes**
|
||||
* `CSharpWorkerPath` - the folder containing CSharpWorker.exe should also contain Microsoft.Spark.CSharp.Adapter.dll, executable that has the SparkCLR driver application and any dependent binaries. Typically, the path to CSharpWorker.exe in the build output directory of SparkCLR application is used for this configuration value
|
||||
* If a jar file is required by Spark (for example, spark-xml_2.10-0.3.1.jar to process XML files) then the local path to the jar file must set using the command `set SPARKCLR_DEBUGMODE_EXT_JARS=C:\ext\spark-xml\spark-xml_2.10-0.3.1.jar` before launching CSharpBackend in step #1
|
||||
|
||||
### Standalone Cluster
|
||||
#### Client Mode
|
||||
SparkCLR `runtime` folder and the build output of SparkCLR driver application must be copied over to the machine where you submit SparkCLR apps to a Spark Standalone cluster. Once copying is done, instructions are same as that of [localmode](RunningSparkCLRApp.md#local-mode) but specifying master URL (`--master <spark://host:port>`) is required in addition.
|
||||
|
||||
**Sample Commands**
|
||||
* `sparkclr-submit.cmd` `--master spark://93.184.216.34:7077` `--total-executor-cores 2` `--exe SparkClrPi.exe C:\Git\SparkCLR\examples\Pi\bin\Debug`
|
||||
* `sparkclr-submit.cmd` `--master spark://93.184.216.34:7077` `--conf spark.local.dir=C:\sparktemp` `--exe SparkClrPi.exe C:\Git\SparkCLR\examples\Pi\bin\Debug`
|
||||
* `sparkclr-submit.cmd` `--master spark://93.184.216.34:7077` `--jars c:\dependency\some.jar` `--exe SparkClrPi.exe C:\Git\SparkCLR\examples\Pi\bin\Debug`
|
||||
|
||||
#### Cluster Mode
|
||||
To submit SparkCLR app in Cluster mode, both spark-clr*.jar and app binaries need be made available in HDFS. Let's say `Pi.zip` includes all files under `Pi\bin\[debug|release]`:
|
||||
````
|
||||
hdfs dfs -copyFromLocal \path\to\pi.zip hdfs://path/to/pi
|
||||
hdfs dfs -copyFromLocal \path\to\runtime\lib\spark-clr*.jar hdfs://path/to/spark-clr-jar
|
||||
|
||||
cd \path\to\runtime
|
||||
scripts\sparkclr-submit.cmd ^
|
||||
--total-executor-cores 2 ^
|
||||
--deploy-mode cluster ^
|
||||
--master <spark://host:port> ^
|
||||
--remote-sparkclr-jar hdfs://path/to/spark-clr-jar/spark-clr_2.10-1.5.200.jar ^
|
||||
--exe Pi.exe ^
|
||||
hdfs://path/to/pi/pi.zip ^
|
||||
spark.local.dir <full-path to temp directory on any spark worker>
|
||||
````
|
||||
|
||||
### YARN Cluster
|
||||
#### Client Mode
|
||||
To be added
|
||||
|
||||
#### Cluster Mode
|
||||
To be added
|
||||
|
||||
## Linux Instructions
|
||||
The instructions above cover running SparkCLR applications in Windows. With the following tweaks, the same instructions can be used to run SparkCLR applications in Linux.
|
||||
* Instead of `RunSamples.cmd`, use `run-samples.sh`
|
||||
* Instead of `sparkclr-submit.cmd`, use `sparkclr-submit.sh`
|
||||
|
||||
## Running Examples in Local Mode
|
||||
The following sample commands show how to run SparkCLR examples in local mode. Using the instruction above, the following sample commands can be tweaked to run in other modes
|
||||
|
||||
### Pi Example
|
||||
* Run `sparkclr-submit.cmd --exe SparkClrPi.exe C:\Git\SparkCLR\examples\Pi\bin\Debug`
|
||||
|
||||
Computes the _approximate_ value of Pi using two appropaches and displays the value.
|
||||
|
||||
### JDBC Example
|
||||
* Download a JDBC driver for the SQL Database you want to use
|
||||
* `sparkclr-submit.cmd --jars C:\SparkCLRDependencies\sqljdbc4.jar --exe SparkClrJdbc.exe C:\Git\SparkCLR\examples\JdbcDataFrame\bin\Debug <jdbc connection string> <table name>`
|
||||
|
||||
The schema and row count of the table name provided as the commandline argument to SparkClrJdbc.exe is displayed.
|
||||
|
||||
### Spark-XML Example
|
||||
* Download [books.xml](https://github.com/databricks/spark-xml/blob/master/src/test/resources/books.xml) and the location of this file is the first argument to SparkClrXml.exe below
|
||||
*
|
||||
`sparkclr-submit.cmd --jars C:\SparkCLRDependencies\spark-xml_2.10-0.3.1.jar --exe SparkClrXml.exe C:\Git\SparkCLR\examples\SparkXml\bin\Debug C:\SparkCLRData\books.xml C:\SparkCLRData\booksModified.xml`
|
||||
|
||||
Displays the number of XML elements in the input XML file provided as the first argument to SparkClrXml.exe and writes the modified XML to the file specified in the second commandline argument.
|
|
@ -38,10 +38,11 @@ if "%SPARK_ASSEMBLY_JAR%"=="0" (
|
|||
|
||||
if not defined SPARKCLR_JAR (set SPARKCLR_JAR=spark-clr_2.10-1.6.0-SNAPSHOT.jar)
|
||||
echo SPARKCLR_JAR=%SPARKCLR_JAR%
|
||||
|
||||
set SPARKCLR_CLASSPATH=%SPARKCLR_HOME%\lib\%SPARKCLR_JAR%
|
||||
if not "%SPARKCSV_JARS%" == "" (
|
||||
SET SPARKCLR_CLASSPATH=%SPARKCLR_CLASSPATH%;%SPARKCSV_JARS%
|
||||
REM SPARKCLR_DEBUGMODE_EXT_JARS environment variable is used to specify external dependencies to use in debug mode
|
||||
if not "%SPARKCLR_DEBUGMODE_EXT_JARS%" == "" (
|
||||
echo [sparkclr-submit.cmd] External jars path is configured to %SPARKCLR_DEBUGMODE_EXT_JARS%
|
||||
SET SPARKCLR_CLASSPATH=%SPARKCLR_CLASSPATH%;%SPARKCLR_DEBUGMODE_EXT_JARS%
|
||||
)
|
||||
set LAUNCH_CLASSPATH=%SPARK_ASSEMBLY_JAR%;%SPARKCLR_CLASSPATH%
|
||||
|
||||
|
|
|
@ -56,7 +56,8 @@ fi
|
|||
|
||||
export SPARKCLR_JAR=spark-clr_2.10-1.6.0-SNAPSHOT.jar
|
||||
export SPARKCLR_CLASSPATH="$SPARKCLR_HOME/lib/$SPARKCLR_JAR"
|
||||
[ ! "$SPARKCSV_JARS" = "" ] && export SPARKCLR_CLASSPATH="$SPARKCLR_CLASSPATH:$SPARKCSV_JARS"
|
||||
# SPARKCLR_DEBUGMODE_EXT_JARS environment variable is used to specify external dependencies to use in debug mode
|
||||
[ ! "$SPARKCLR_DEBUGMODE_EXT_JARS" = "" ] && export SPARKCLR_CLASSPATH="$SPARKCLR_CLASSPATH:$SPARKCLR_DEBUGMODE_EXT_JARS"
|
||||
export LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR:$SPARKCLR_CLASSPATH"
|
||||
|
||||
if [ $1 = "debug" ];
|
||||
|
|
Загрузка…
Ссылка в новой задаче