Merge pull request #216 from ms-guizha/master
SparkClr upgrade to spark 1.5.2 and deprecate API: DataFrame.InsertInto, Save, SaveASTable, SaveAsParquet
This commit is contained in:
Коммит
e5fdffa14d
|
@ -964,6 +964,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
// Python API: https://github.com/apache/spark/blob/branch-1.4/python/pyspark/sql/dataframe.py
|
||||
// saveAsParquetFile(self, path)
|
||||
[Obsolete(" Deprecated. As of 1.4.0, replaced by write().parquet()", true)]
|
||||
public void SaveAsParquetFile(String path)
|
||||
{
|
||||
Write().Parquet(path);
|
||||
|
@ -974,12 +975,13 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
// Python API: https://github.com/apache/spark/blob/branch-1.4/python/pyspark/sql/dataframe.py
|
||||
// insertInto(self, tableName, overwrite=False)
|
||||
[Obsolete("Deprecated. As of 1.4.0, replaced by write().mode(SaveMode.Append|SaveMode.Overwrite).saveAsTable(tableName)", true)]
|
||||
public void InsertInto(string tableName, bool overwrite = false)
|
||||
{
|
||||
var mode = overwrite ? SaveMode.Overwrite : SaveMode.Append;
|
||||
Write().Mode(mode).InsertInto(tableName);
|
||||
}
|
||||
|
||||
|
||||
/// <summary>
|
||||
/// Creates a table from the the contents of this DataFrame based on a given data source,
|
||||
/// SaveMode specified by mode, and a set of options.
|
||||
|
@ -994,6 +996,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
// Python API: https://github.com/apache/spark/blob/branch-1.4/python/pyspark/sql/dataframe.py
|
||||
// saveAsTable(self, tableName, source=None, mode="error", **options)
|
||||
[Obsolete("Deprecated. As of 1.4.0, replaced by write().format(source).mode(mode).options(options).saveAsTable(tableName)", true)]
|
||||
public void SaveAsTable(string tableName, string source = null, SaveMode mode = SaveMode.ErrorIfExists, params string[] options)
|
||||
{
|
||||
var dataFrameWriter = Write().Mode(mode);
|
||||
|
@ -1014,6 +1017,7 @@ namespace Microsoft.Spark.CSharp.Sql
|
|||
/// </summary>
|
||||
// Python API: https://github.com/apache/spark/blob/branch-1.4/python/pyspark/sql/dataframe.py
|
||||
// save(self, path=None, source=None, mode="error", **options)
|
||||
[Obsolete("Deprecated. As of 1.4.0, replaced by write().format(source).mode(mode).options(options).save(path)", true)]
|
||||
public void Save(string path = null, string source = null, SaveMode mode = SaveMode.ErrorIfExists, params string[] options)
|
||||
{
|
||||
var dataFrameWriter = Write().Mode(mode);
|
||||
|
|
|
@ -1329,7 +1329,7 @@ namespace AdapterTest
|
|||
const string path = "file_path";
|
||||
|
||||
// Act
|
||||
dataFrame.SaveAsParquetFile(path);
|
||||
dataFrame.Write().Parquet(path);
|
||||
|
||||
// assert
|
||||
mockDataFrameProxy.Verify(m => m.Write(), Times.Once());
|
||||
|
@ -1349,18 +1349,18 @@ namespace AdapterTest
|
|||
// arrange
|
||||
mockDataFrameProxy.Setup(m => m.Write()).Returns(mockDataFrameWriterProxy.Object);
|
||||
mockDataFrameWriterProxy.Setup(m => m.Mode(It.IsAny<string>()));
|
||||
mockDataFrameWriterProxy.Setup(m => m.InsertInto(It.IsAny<string>()));
|
||||
mockDataFrameWriterProxy.Setup(m => m.SaveAsTable(It.IsAny<string>()));
|
||||
|
||||
var sc = new SparkContext(null);
|
||||
var dataFrame = new DataFrame(mockDataFrameProxy.Object, sc);
|
||||
const string table = "table_name";
|
||||
// Act
|
||||
dataFrame.InsertInto(table, true);
|
||||
dataFrame.Write().Mode(SaveMode.Overwrite).SaveAsTable(table);
|
||||
|
||||
// assert
|
||||
mockDataFrameProxy.Verify(m => m.Write(), Times.Once());
|
||||
|
||||
mockDataFrameWriterProxy.Verify(m => m.InsertInto(table), Times.Once);
|
||||
mockDataFrameWriterProxy.Verify(m => m.SaveAsTable(table), Times.Once);
|
||||
mockDataFrameWriterProxy.Verify(m => m.Mode(SaveMode.Overwrite.GetStringValue()), Times.Once);
|
||||
}
|
||||
|
||||
|
@ -1379,8 +1379,9 @@ namespace AdapterTest
|
|||
var sc = new SparkContext(null);
|
||||
var dataFrame = new DataFrame(mockDataFrameProxy.Object, sc);
|
||||
const string table = "table_name";
|
||||
|
||||
// Act
|
||||
dataFrame.SaveAsTable(table);
|
||||
dataFrame.Write().Mode(SaveMode.ErrorIfExists).SaveAsTable(table);
|
||||
|
||||
// assert
|
||||
mockDataFrameProxy.Verify(m => m.Write(), Times.Once());
|
||||
|
@ -1405,7 +1406,7 @@ namespace AdapterTest
|
|||
const string path = "path_value";
|
||||
|
||||
// Act
|
||||
dataFrame.Save(path);
|
||||
dataFrame.Write().Mode(SaveMode.ErrorIfExists).Save(path);
|
||||
|
||||
// assert
|
||||
mockDataFrameProxy.Verify(m => m.Write(), Times.Once());
|
||||
|
|
|
@ -1871,7 +1871,7 @@ namespace Microsoft.Spark.CSharp.Samples
|
|||
{
|
||||
var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));
|
||||
var parquetPath = Path.GetTempPath() + "DF_Parquet_Samples_" + (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0)).TotalSeconds;
|
||||
peopleDataFrame.Coalesce(1).SaveAsParquetFile(parquetPath);
|
||||
peopleDataFrame.Coalesce(1).Write().Parquet(parquetPath);
|
||||
|
||||
Console.WriteLine("Save dataframe to parquet: {0}", parquetPath);
|
||||
Console.WriteLine("Files:");
|
||||
|
@ -1898,7 +1898,7 @@ namespace Microsoft.Spark.CSharp.Samples
|
|||
{
|
||||
var peopleDataFrame = GetSqlContext().JsonFile(SparkCLRSamples.Configuration.GetInputDataPath(PeopleJson));
|
||||
var path = Path.GetTempPath() + "DF_Samples_" + (long)(DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0)).TotalSeconds;
|
||||
peopleDataFrame.Save(path, "json", SaveMode.ErrorIfExists, "option1", "option_value1");
|
||||
peopleDataFrame.Write().Format("json").Mode(SaveMode.ErrorIfExists).Options(new Dictionary<string, string>() { { "option1", "option_value1" } }).Save(path);
|
||||
|
||||
Console.WriteLine("Save dataframe to: {0}", path);
|
||||
Console.WriteLine("Files:");
|
||||
|
|
Загрузка…
Ссылка в новой задаче