Support for new Delta v0.5.0 APIs (#374)

This commit is contained in:
elvaliuliuliu 2019-12-20 17:53:47 -08:00 коммит произвёл Terry Kim
Родитель 9a2903628a
Коммит 192997cb55
3 изменённых файлов: 85 добавлений и 19 удалений

Просмотреть файл

@ -16,7 +16,7 @@ namespace Microsoft.Spark.Extensions.Delta.E2ETest
{
Environment.SetEnvironmentVariable(
SparkFixture.EnvironmentVariableNames.ExtraSparkSubmitArgs,
"--packages io.delta:delta-core_2.11:0.4.0 " +
"--packages io.delta:delta-core_2.11:0.5.0 " +
"--conf spark.databricks.delta.snapshotPartitions=2 " +
"--conf spark.sql.sources.parallelPartitionDiscovery.parallelism=5");
SparkFixture = new SparkFixture();

Просмотреть файл

@ -10,6 +10,7 @@ using Microsoft.Spark.E2ETest.Utils;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
using Microsoft.Spark.Sql.Types;
using Xunit;
namespace Microsoft.Spark.Extensions.Delta.E2ETest
@ -206,7 +207,16 @@ namespace Microsoft.Spark.Extensions.Delta.E2ETest
identifier,
$"{partitionColumnName} bigint"),
partitionColumnName);
// TODO: Test with StructType partition schema once StructType is supported.
testWrapper(
data.Repartition(Functions.Col(partitionColumnName)),
identifier => DeltaTable.ConvertToDelta(
_spark,
identifier,
new StructType(new[]
{
new StructField(partitionColumnName, new IntegerType())
})),
partitionColumnName);
}
/// <summary>
@ -276,6 +286,9 @@ namespace Microsoft.Spark.Extensions.Delta.E2ETest
Assert.IsType<DataFrame>(table.Vacuum());
Assert.IsType<DataFrame>(table.Vacuum(168));
// Generate should return void.
table.Generate("symlink_format_manifest");
// Delete should return void.
table.Delete("id > 10");
table.Delete(Functions.Expr("id > 5"));
@ -303,7 +316,13 @@ namespace Microsoft.Spark.Extensions.Delta.E2ETest
_spark,
parquetIdentifier,
"id bigint"));
// TODO: Test with StructType partition schema once StructType is supported.
Assert.IsType<DeltaTable>(DeltaTable.ConvertToDelta(
_spark,
parquetIdentifier,
new StructType(new[]
{
new StructField("id", new IntegerType())
})));
}
/// <summary>

Просмотреть файл

@ -31,6 +31,43 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
/// <summary>
/// Create a DeltaTable from the given parquet table and partition schema.
/// Takes an existing parquet table and constructs a delta transaction log in the base path
/// of that table.
///
/// Note: Any changes to the table during the conversion process may not result in a
/// consistent state at the end of the conversion. Users should stop any changes to the
/// table before the conversion is started.
///
/// An example usage would be
/// <code>
/// DeltaTable.ConvertToDelta(
/// spark,
/// "parquet.`/path`",
/// new StructType(new[]
/// {
/// new StructField("key1", new LongType()),
/// new StructField("key2", new StringType())
/// });
/// </code>
/// </summary>
/// <param name="spark">The relevant session.</param>
/// <param name="identifier">String used to identify the parquet table.</param>
/// <param name="partitionSchema">StructType representing the partition schema.</param>
/// <returns>The converted DeltaTable.</returns>
public static DeltaTable ConvertToDelta(
SparkSession spark,
string identifier,
StructType partitionSchema) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_deltaTableClassName,
"convertToDelta",
spark,
identifier,
DataType.FromJson(SparkEnvironment.JvmBridge, partitionSchema.Json)));
/// <summary>
/// Create a DeltaTable from the given parquet table and partition schema.
/// Takes an existing parquet table and constructs a delta transaction log in the base path
@ -55,11 +92,11 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
string partitionSchema) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_deltaTableClassName,
"convertToDelta",
spark,
identifier,
partitionSchema));
s_deltaTableClassName,
"convertToDelta",
spark,
identifier,
partitionSchema));
/// <summary>
/// Create a DeltaTable from the given parquet table. Takes an existing parquet table and
@ -80,10 +117,10 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
public static DeltaTable ConvertToDelta(SparkSession spark, string identifier) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_deltaTableClassName,
"convertToDelta",
spark,
identifier));
s_deltaTableClassName,
"convertToDelta",
spark,
identifier));
/// <summary>
/// Create a DeltaTable for the data at the given <c>path</c>.
@ -97,9 +134,9 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
public static DeltaTable ForPath(string path) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_deltaTableClassName,
"forPath",
path));
s_deltaTableClassName,
"forPath",
path));
/// <summary>
/// Create a DeltaTable for the data at the given <c>path</c> using the given SparkSession
@ -111,10 +148,10 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
public static DeltaTable ForPath(SparkSession sparkSession, string path) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
s_deltaTableClassName,
"forPath",
sparkSession,
path));
s_deltaTableClassName,
"forPath",
sparkSession,
path));
/// <summary>
/// Check if the provided <c>identifier</c> string, in this case a file path,
@ -220,6 +257,16 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
public DataFrame History() =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke("history"));
/// <summary>
/// Generate a manifest for the given Delta Table.
/// </summary>
/// <param name="mode">Specifies the mode for the generation of the manifest.
/// The valid modes are as follows (not case sensitive):
/// - "symlink_format_manifest" : This will generate manifests in symlink format
/// for Presto and Athena read support.
/// See the online documentation for more information.</param>
public void Generate(string mode) => _jvmObject.Invoke("generate", mode);
/// <summary>
/// Delete data from the table that match the given <c>condition</c>.
/// </summary>