Merge pull request #160 from tawan0109/dataframe

Add DataFrame.Write() API
This commit is contained in:
Tao Wang 2015-12-14 14:41:52 +08:00
Родитель bdcf8bf279 1147585859
Коммит ad1ff5e841
16 изменённых файлов: 754 добавлений и 5 удалений

Просмотреть файл

@ -7,7 +7,7 @@ call precheck.cmd
if %precheck% == "bad" (goto :eof)
@rem Windows 7/8/10 may not allow powershell scripts by default
powershell -Command Set-ExecutionPolicy Unrestricted
powershell -Command Set-ExecutionPolicy -Scope CurrentUser Unrestricted
@rem download build tools
pushd %~dp0

Просмотреть файл

@ -82,8 +82,10 @@
<Compile Include="Interop\Ipc\SerDe.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Proxy\IDataFrameProxy.cs" />
<Compile Include="Proxy\IDataFrameWriterProxy.cs" />
<Compile Include="Proxy\IDStreamProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DataFrameWriterIpcProxy.cs" />
<Compile Include="Proxy\Ipc\DStreamIpcProxy.cs" />
<Compile Include="Proxy\Ipc\RDDIpcProxy.cs" />
<Compile Include="Proxy\Ipc\SparkCLRIpcProxy.cs" />
@ -107,8 +109,10 @@
<Compile Include="Services\LoggerServiceFactory.cs" />
<Compile Include="Sql\Column.cs" />
<Compile Include="Sql\DataFrame.cs" />
<Compile Include="Sql\DataFrameWriter.cs" />
<Compile Include="Sql\Row.cs" />
<Compile Include="Sql\Functions.cs" />
<Compile Include="Sql\SaveMode.cs" />
<Compile Include="Sql\SqlContext.cs" />
<Compile Include="Sql\Struct.cs" />
<Compile Include="Sql\UserDefinedFunction.cs" />

Просмотреть файл

@ -56,6 +56,7 @@ namespace Microsoft.Spark.CSharp.Proxy
void Unpersist(bool blocking = true);
IDataFrameProxy Repartition(int numPartitions);
IDataFrameProxy Sample(bool withReplacement, double fraction, long seed);
IDataFrameWriterProxy Write();
}
internal interface IUDFProxy

Просмотреть файл

@ -0,0 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Collections.Generic;
namespace Microsoft.Spark.CSharp.Proxy
{
internal interface IDataFrameWriterProxy
{
void Mode(string saveMode);
void Format(string source);
void Options(Dictionary<string, string> options);
void PartitionBy(params string[] colNames);
void Save();
void InsertInto(string tableName);
void SaveAsTable(string tableName);
void Jdbc(string url, string table, Dictionary<string, string> properties);
}
}

Просмотреть файл

@ -532,6 +532,12 @@ namespace Microsoft.Spark.CSharp.Proxy.Ipc
jvmDataFrameReference, "sample",
new object[] { withReplacement, fraction, seed }).ToString()), sqlContextProxy);
}
public IDataFrameWriterProxy Write()
{
return new DataFrameWriterIpcProxy(new JvmObjectReference(
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameReference, "write").ToString()));
}
}
internal class UDFIpcProxy : IUDFProxy

Просмотреть файл

@ -0,0 +1,66 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.Collections.Generic;
using Microsoft.Spark.CSharp.Interop.Ipc;
namespace Microsoft.Spark.CSharp.Proxy.Ipc
{
internal class DataFrameWriterIpcProxy : IDataFrameWriterProxy
{
private readonly JvmObjectReference jvmDataFrameWriterReference;
internal DataFrameWriterIpcProxy(JvmObjectReference jvmDataFrameWriterReference)
{
this.jvmDataFrameWriterReference = jvmDataFrameWriterReference;
}
public void Mode(string saveMode)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "mode", new object[] { saveMode });
}
public void Format(string source)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "format", new object[] { source });
}
public void Options(Dictionary<string, string> options)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "options", new object[] { options });
}
public void PartitionBy(params string[] colNames)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "partitionBy", new object[] { colNames });
}
public void Save()
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(jvmDataFrameWriterReference, "save");
}
public void InsertInto(string tableName)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "insertInto", new object[] { tableName });
}
public void SaveAsTable(string tableName)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "saveAsTable", new object[] { tableName });
}
public void Jdbc(string url, string table, Dictionary<string, string> properties)
{
SparkCLRIpcProxy.JvmBridge.CallNonStaticJavaMethod(
jvmDataFrameWriterReference, "jdbc", new object[] { url, table, properties });
}
}
}

Просмотреть файл

@ -902,6 +902,14 @@ namespace Microsoft.Spark.CSharp.Sql
{
Rdd.Foreach(f);
}
/// <summary>
/// Interface for saving the content of the DataFrame out into external storage.
/// </summary>
public DataFrameWriter Write()
{
return new DataFrameWriter(dataFrameProxy.Write());
}
}
public class JoinType

Просмотреть файл

@ -0,0 +1,170 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Collections.Generic;
using Microsoft.Spark.CSharp.Proxy;
namespace Microsoft.Spark.CSharp.Sql
{
/// <summary>
/// Interface used to write a DataFrame to external storage systems (e.g. file systems,
/// key-value stores, etc). Use DataFrame.Write to access this.
///
/// See also http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
/// </summary>
public class DataFrameWriter
{
internal IDataFrameWriterProxy DataFrameWriterProxy
{
get { return dataFrameWriterProxy; }
}
private readonly IDataFrameWriterProxy dataFrameWriterProxy;
internal DataFrameWriter(IDataFrameWriterProxy dataFrameWriterProxy)
{
this.dataFrameWriterProxy = dataFrameWriterProxy;
}
/// <summary>
/// Specifies the behavior when data or table already exists. Options include:
/// - `SaveMode.Overwrite`: overwrite the existing data.
/// - `SaveMode.Append`: append the data.
/// - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
/// - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
/// </summary>
public DataFrameWriter Mode(SaveMode saveMode)
{
return Mode(saveMode.GetStringValue());
}
/// <summary>
/// Specifies the behavior when data or table already exists. Options include:
/// - `SaveMode.Overwrite`: overwrite the existing data.
/// - `SaveMode.Append`: append the data.
/// - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
/// - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
/// </summary>
public DataFrameWriter Mode(string saveMode)
{
dataFrameWriterProxy.Mode(saveMode);
return this;
}
/// <summary>
/// Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
/// </summary>
public DataFrameWriter Format(string source)
{
dataFrameWriterProxy.Format(source);
return this;
}
/// <summary>
/// Adds an output option for the underlying data source.
/// </summary>
public DataFrameWriter Option(string key, string value)
{
var options = new Dictionary<string, string>() { { key, value } };
return Options(options);
}
/// <summary>
/// Adds output options for the underlying data source.
/// </summary>
public DataFrameWriter Options(Dictionary<string,string> options)
{
dataFrameWriterProxy.Options(options);
return this;
}
/// <summary>
/// Partitions the output by the given columns on the file system. If specified, the output is
/// laid out on the file system similar to Hive's partitioning scheme.
///
/// This is only applicable for Parquet at the moment.
/// </summary>
public DataFrameWriter PartitionBy(params string[] colNames)
{
dataFrameWriterProxy.PartitionBy(colNames);
return this;
}
/// <summary>
/// Saves the content of the DataFrame at the specified path.
/// </summary>
public void Save(string path)
{
Option("path", path).Save();
}
/// <summary>
/// Saves the content of the DataFrame as the specified table.
/// </summary>
public void Save()
{
dataFrameWriterProxy.Save();
}
/// <summary>
/// Inserts the content of the DataFrame to the specified table. It requires that
/// the schema of the DataFrame is the same as the schema of the table.
/// Because it inserts data to an existing table, format or options will be ignored.
/// </summary>
public void InsertInto(string tableName)
{
dataFrameWriterProxy.InsertInto(tableName);
}
/// <summary>
/// Saves the content of the DataFrame as the specified table.
/// In the case the table already exists, behavior of this function depends on the
/// save mode, specified by the `mode` function (default to throwing an exception).
/// When `mode` is `Overwrite`, the schema of the DataFrame does not need to be
/// the same as that of the existing table.
/// When `mode` is `Append`, the schema of the DataFrame need to be
/// the same as that of the existing table, and format or options will be ignored.
/// </summary>
public void SaveAsTable(string tableName)
{
dataFrameWriterProxy.SaveAsTable(tableName);
}
/// <summary>
/// Saves the content of the DataFrame to a external database table via JDBC. In the case the
/// table already exists in the external database, behavior of this function depends on the
/// save mode, specified by the `mode` function (default to throwing an exception).
///
/// Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
/// your external database systems.
/// </summary>
/// <param name="url">JDBC database url of the form `jdbc:subprotocol:subname`</param>
/// <param name="table">Name of the table in the external database.</param>
/// <param name="properties">JDBC database connection arguments, a list of arbitrary string tag/value.
/// Normally at least a "user" and "password" property should be included.</param>
public void Jdbc(string url, string table, Dictionary<string, string> properties)
{
dataFrameWriterProxy.Jdbc(url, table, properties);
}
/// <summary>
/// Saves the content of the DataFrame in JSON format at the specified path.
/// This is equivalent to:
/// Format("json").Save(path)
/// </summary>
public void Json(string path)
{
Format("json").Save(path);
}
/// <summary>
/// Saves the content of the DataFrame in JSON format at the specified path.
/// This is equivalent to:
/// Format("parquet").Save(path)
/// </summary>
public void Parquet(string path)
{
Format("parquet").Save(path);
}
}
}

Просмотреть файл

@ -0,0 +1,54 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Spark.CSharp.Sql
{
/// <summary>
/// SaveMode is used to specify the expected behavior of saving a DataFrame to a data source.
/// </summary>
public enum SaveMode
{
/// <summary>
/// Append mode means that when saving a DataFrame to a data source, if data/table already exists,
/// contents of the DataFrame are expected to be appended to existing data.
/// </summary>
Append,
/// <summary>
/// Overwrite mode means that when saving a DataFrame to a data source,
/// if data/table already exists, existing data is expected to be overwritten by the contents of
/// the DataFrame.
/// </summary>
Overwrite,
/// <summary>
/// ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists,
/// an exception is expected to be thrown.
/// </summary>
ErrorIfExists,
/// <summary>
/// Ignore mode means that when saving a DataFrame to a data source, if data already exists,
/// the save operation is expected to not save the contents of the DataFrame and to not
/// change the existing data.
/// </summary>
Ignore
}
/// <summary>
/// For SaveMode.ErrorIfExists, the corresponding literal string in spark is "error" or "default".
/// </summary>
public static class SaveModeExtensions
{
public static string GetStringValue(this SaveMode mode)
{
switch (mode)
{
case SaveMode.ErrorIfExists:
return "error";
default:
return mode.ToString();
}
}
}
}

Просмотреть файл

@ -61,6 +61,7 @@
<Otherwise />
</Choose>
<ItemGroup>
<Compile Include="DataFrameWriterTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TestWithMoqDemo.cs" />
<Compile Include="Mocks\MockStructTypeProxy.cs" />

Просмотреть файл

@ -1120,6 +1120,23 @@ namespace AdapterTest
Assert.AreEqual(expectedResultGroupedDataProxy, actualResult.GroupedDataProxy);
}
[Test]
public void TestWrite()
{
// Arrange
var expectedDataFrameWriterProxy = new Mock<IDataFrameWriterProxy>().Object;
mockDataFrameProxy.Setup(m => m.Write()).Returns(expectedDataFrameWriterProxy);
var sc = new SparkContext(null);
// Act
var originalDataFrame = new DataFrame(mockDataFrameProxy.Object, sc);
var dataFrameWriter = originalDataFrame.Write();
// Assert
mockDataFrameProxy.Verify(m => m.Write(), Times.Once);
Assert.AreEqual(expectedDataFrameWriterProxy, dataFrameWriter.DataFrameWriterProxy);
}
#region GroupedDataTest
[Test]

Просмотреть файл

@ -0,0 +1,270 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System.Collections.Generic;
using System.Linq.Expressions;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Proxy;
using Microsoft.Spark.CSharp.Sql;
using Moq;
using NUnit.Framework;
namespace AdapterTest
{
/// <summary>
/// Validates interaction between DataFrameWriter and its proxies
/// </summary>
[TestFixture]
public class DataFrameWriterTest
{
private static Mock<IDataFrameWriterProxy> mockDataFrameWriterProxy;
[OneTimeSetUp]
public static void ClassInitialize()
{
mockDataFrameWriterProxy = new Mock<IDataFrameWriterProxy>();
}
[SetUp]
public void TestInitialize()
{
mockDataFrameWriterProxy.Reset();
}
[Test]
public void TestMode()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.Mode(It.IsAny<string>()));
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
dataFrameWriter.Mode(SaveMode.Append);
mockDataFrameWriterProxy.Verify(m => m.Mode(SaveMode.Append.ToString()));
mockDataFrameWriterProxy.Reset();
dataFrameWriter.Mode(SaveMode.Ignore);
mockDataFrameWriterProxy.Verify(m => m.Mode(SaveMode.Ignore.ToString()));
mockDataFrameWriterProxy.Reset();
dataFrameWriter.Mode(SaveMode.Overwrite);
mockDataFrameWriterProxy.Verify(m => m.Mode(SaveMode.Overwrite.ToString()));
mockDataFrameWriterProxy.Reset();
dataFrameWriter.Mode(SaveMode.ErrorIfExists);
mockDataFrameWriterProxy.Verify(m => m.Mode(It.IsIn("error", "default")));
mockDataFrameWriterProxy.Reset();
}
[Test]
public void TestStringMode()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.Mode(It.IsAny<string>()));
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
foreach (var mode in new string[] { "append", "ignore", "overwrite", "error", "default" })
{
dataFrameWriter.Mode(mode);
mockDataFrameWriterProxy.Verify(m => m.Mode(mode));
mockDataFrameWriterProxy.Reset();
}
}
[Test]
public void TestFormat()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.Format(It.IsAny<string>()));
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
foreach (var format in new string[] { "parquet", "json" })
{
dataFrameWriter.Format(format);
mockDataFrameWriterProxy.Verify(m => m.Format(format));
mockDataFrameWriterProxy.Reset();
}
}
[Test]
public void TestOption()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.Options(It.IsAny<Dictionary<string, string>>()));
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
const string key = "path";
const string value = "path_value";
// Act
dataFrameWriter.Option(key, value);
// Assert
mockDataFrameWriterProxy.Verify(m => m.Options(
It.Is<Dictionary<string, string>>(dict => dict[key] == value && dict.Count == 1)), Times.Once);
}
[Test]
public void TestOptions()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.Options(It.IsAny<Dictionary<string, string>>()));
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
const string key1 = "key1";
const string value1 = "value1";
const string key2 = "key2";
const string value2 = "value2";
var opts = new Dictionary<string, string>()
{
{key1, value1},
{key2, value2}
};
// Act
dataFrameWriter.Options(opts);
// Assert
mockDataFrameWriterProxy.Verify(m => m.Options(It.Is<Dictionary<string, string>>(
dict =>
dict[key1] == value1
&& dict[key2] == value2
&& dict.Count == 2)
),
Times.Once
);
}
[Test]
public void TestPartitionBy()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.PartitionBy(It.IsAny<string[]>()));
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
var colNames = new string[] { "col1", "col2", "col3" };
// Act
dataFrameWriter.PartitionBy(colNames);
// Assert
mockDataFrameWriterProxy.Verify(m => m.PartitionBy(colNames));
}
[Test]
public void TestSaveWithPath()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.Save());
mockDataFrameWriterProxy.Setup(m => m.Options(It.IsAny<Dictionary<string,string>>()));
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
const string path = "/path/to/save";
// Act
dataFrameWriter.Save(path);
// Assert
mockDataFrameWriterProxy.Verify(m => m.Save(), Times.Once);
mockDataFrameWriterProxy.Verify(m => m.Options(
It.Is<Dictionary<string, string>>(dict => dict["path"] == path && dict.Count == 1)), Times.Once);
}
[Test]
public void TestSave()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.Save());
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
// Act
dataFrameWriter.Save();
// Assert
mockDataFrameWriterProxy.Verify(m => m.Save(), Times.Once);
}
[Test]
public void TestInsertInto()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.InsertInto(It.IsAny<string>()));
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
const string table = "table";
// Act
dataFrameWriter.InsertInto(table);
// Assert
mockDataFrameWriterProxy.Verify(m => m.InsertInto(table), Times.Once);
}
[Test]
public void TestSaveAsTable()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.SaveAsTable(It.IsAny<string>()));
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
const string table = "table";
// Act
dataFrameWriter.SaveAsTable(table);
// Assert
mockDataFrameWriterProxy.Verify(m => m.SaveAsTable(table), Times.Once);
}
[Test]
public void TestJson()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.Format(It.IsAny<string>()));
mockDataFrameWriterProxy.Setup(m => m.Options(It.IsAny<Dictionary<string, string>>()));
mockDataFrameWriterProxy.Setup(m => m.Save());
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
const string path = "/path/to/save";
// Act
dataFrameWriter.Json(path);
// Assert
mockDataFrameWriterProxy.Verify(m => m.Save(), Times.Once);
mockDataFrameWriterProxy.Verify(m => m.Format("json"), Times.Once);
mockDataFrameWriterProxy.Verify(m => m.Options(
It.Is<Dictionary<string, string>>(dict => dict["path"] == path && dict.Count == 1)), Times.Once);
}
[Test]
public void TestParquet()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.Format(It.IsAny<string>()));
mockDataFrameWriterProxy.Setup(m => m.Options(It.IsAny<Dictionary<string, string>>()));
mockDataFrameWriterProxy.Setup(m => m.Save());
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
const string path = "/path/to/save";
// Act
dataFrameWriter.Parquet(path);
// Assert
mockDataFrameWriterProxy.Verify(m => m.Save(), Times.Once);
mockDataFrameWriterProxy.Verify(m => m.Format("parquet"), Times.Once);
mockDataFrameWriterProxy.Verify(m => m.Options(
It.Is<Dictionary<string, string>>(dict => dict["path"] == path && dict.Count == 1)), Times.Once);
}
[Test]
public void TestJdbc()
{
// arrange
mockDataFrameWriterProxy.Setup(m => m.Jdbc(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Dictionary<string, string>>()));
var dataFrameWriter = new DataFrameWriter(mockDataFrameWriterProxy.Object);
const string url = "jdbc:subprotocol:subname";
const string table = "table";
var properties = new Dictionary<string, string>() { { "autocommit", "false" } };
// Act
dataFrameWriter.Jdbc(url, table, properties);
// Assert
mockDataFrameWriterProxy.Verify(m => m.Jdbc(url, table, properties), Times.Once);
}
}
}

Просмотреть файл

@ -240,6 +240,11 @@ namespace AdapterTest.Mocks
throw new NotImplementedException();
}
public IDataFrameWriterProxy Write()
{
throw new NotImplementedException();
}
public IDataFrameProxy Alias(string alias)
{
throw new NotImplementedException();

Просмотреть файл

@ -77,7 +77,7 @@ namespace Microsoft.Spark.CSharp.Samples
Console.WriteLine(" ");
Console.WriteLine(" [--data | sparkclr.sampledata.loc] <SAMPLE_DATA_DIR> SAMPLE_DATA_DIR is the directory where data files used by samples reside. ");
Console.WriteLine(" ");
Console.WriteLine(" [--torun | sparkclr.samples.torun] <SAMPLE_LIST> SAMPLE_LIST specifies a list of samples to run. ");
Console.WriteLine(" [--torun | sparkclr.samples.torun] <SAMPLE_LIST> SAMPLE_LIST specifies a list of samples to run, samples in list are delimited by comma. ");
Console.WriteLine(" Case-insensitive command line wild card matching by default. Or, use \"/\" (forward slash) to enclose regular expression. ");
Console.WriteLine(" ");
Console.WriteLine(" [--cat | sparkclr.samples.category] <SAMPLE_CATEGORY> SAMPLE_CATEGORY can be \"all\", \"default\", \"experimental\" or any new categories. ");

Просмотреть файл

@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.IO;
using System.Linq;
using Microsoft.Spark.CSharp.Core;
using Microsoft.Spark.CSharp.Sql;
@ -1295,5 +1296,132 @@ namespace Microsoft.Spark.CSharp.Samples
var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));
peopleDataFrame.Foreach(row => { Console.WriteLine(row); });
}
[Serializable]
internal class ActionHelper
{
private Accumulator<int> accumulator;
internal ActionHelper(Accumulator<int> accumulator)
{
this.accumulator = accumulator;
}
internal void Execute(IEnumerable<Row> iter)
{
foreach (var row in iter)
{
accumulator = accumulator + 1;
}
}
}
/// <summary>
/// ForeachPartition sample with accumulator
/// </summary>
[Sample]
internal static void DFForeachPartitionSampleWithAccumulator()
{
var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));
var accumulator = SparkCLRSamples.SparkContext.Accumulator(0);
peopleDataFrame.ForeachPartition(new ActionHelper(accumulator).Execute);
Console.WriteLine("Total count of rows: {0}", accumulator.Value);
}
/// <summary>
/// Write to parquet sample
/// </summary>
[Sample]
internal static void DFWriteToParquetSample()
{
var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));
var parquetPath = Path.GetTempPath() + "DF_Parquet_Samples_" + (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0)).TotalSeconds;
peopleDataFrame.Write().Parquet(parquetPath);
Console.WriteLine("Save dataframe to parquet: {0}", parquetPath);
Console.WriteLine("Files:");
foreach (var f in Directory.EnumerateFiles(parquetPath))
{
Console.WriteLine(f);
}
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(Directory.Exists(parquetPath));
}
Directory.Delete(parquetPath, true);
Console.WriteLine("Remove parquet directory: {0}", parquetPath);
}
/// <summary>
/// Write to parquet sample with 'append' mode
/// </summary>
[Sample]
internal static void DFWriteToParquetSampleWithAppendMode()
{
var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));
var parquetPath = Path.GetTempPath() + "DF_Parquet_Samples_" + (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0)).TotalSeconds;
peopleDataFrame.Write().Mode(SaveMode.ErrorIfExists).Parquet(parquetPath);
Console.WriteLine("Save dataframe to parquet: {0}", parquetPath);
Console.WriteLine("Files:");
foreach (var f in Directory.EnumerateFiles(parquetPath))
{
Console.WriteLine(f);
}
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(Directory.Exists(parquetPath));
}
peopleDataFrame.Write().Mode(SaveMode.Append).Parquet(parquetPath);
Console.WriteLine("Append dataframe to parquet: {0}", parquetPath);
Console.WriteLine("Files:");
foreach (var f in Directory.EnumerateFiles(parquetPath))
{
Console.WriteLine(f);
}
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(Directory.Exists(parquetPath));
}
Directory.Delete(parquetPath, true);
Console.WriteLine("Remove parquet directory: {0}", parquetPath);
}
/// <summary>
/// Write to json sample
/// </summary>
[Sample]
internal static void DFWriteToJsonSample()
{
var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));
var jsonPath = Path.GetTempPath() + "DF_Json_Samples_" + (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0)).TotalSeconds;
peopleDataFrame.Write().Mode(SaveMode.Overwrite).Json(jsonPath);
Console.WriteLine("Save dataframe to: {0}", jsonPath);
Console.WriteLine("Files:");
foreach (var f in Directory.EnumerateFiles(jsonPath))
{
Console.WriteLine(f);
}
if (SparkCLRSamples.Configuration.IsValidationEnabled)
{
Assert.IsTrue(Directory.Exists(jsonPath));
}
Directory.Delete(jsonPath, true);
Console.WriteLine("Remove parquet directory: {0}", jsonPath);
}
}
}

Просмотреть файл

@ -61,9 +61,9 @@ namespace Microsoft.Spark.CSharp.Samples
if (samplesToRunRegex != null)
{
if ((SparkCLRSamples.Configuration.SamplesToRun.IndexOf(sampleName, StringComparison.InvariantCultureIgnoreCase) < 0)
//assumes method/sample names are unique
&& !samplesToRunRegex.IsMatch(sampleName))
//assumes method/sample names are unique, and different samples are delimited by ','
var containedInSamplesToRun = SparkCLRSamples.Configuration.SamplesToRun.ToLowerInvariant().Trim().Split(',').Contains(sampleName.ToLowerInvariant());
if (!containedInSamplesToRun && !samplesToRunRegex.IsMatch(sampleName))
{
return null;
}