From c71bd4bb5d6be6697eaf720bb51a25228ab8d942 Mon Sep 17 00:00:00 2001 From: Adam Weigert Date: Sun, 14 Apr 2024 18:45:30 -0400 Subject: [PATCH] Add Statement Execution API (#185) * Add Statement Execution API * Use standard datetime format string when converting SqlStatementParameter value --------- Co-authored-by: Jason Wang --- .../SampleProgram.StatementExecution.cs | 48 ++ .../SampleProgram.cs | 3 +- .../StatementExecutionApiClientTest.cs | 185 ++++++ .../ISQLApi.cs | 31 + .../Models/StatementExecution.cs | 626 ++++++++++++++++++ .../SQLApiClient.cs | 3 + .../StatementExecutionApiClient.cs | 63 ++ 7 files changed, 958 insertions(+), 1 deletion(-) create mode 100644 csharp/Microsoft.Azure.Databricks.Client.Sample/SampleProgram.StatementExecution.cs create mode 100644 csharp/Microsoft.Azure.Databricks.Client.Test/StatementExecutionApiClientTest.cs create mode 100644 csharp/Microsoft.Azure.Databricks.Client/Models/StatementExecution.cs create mode 100644 csharp/Microsoft.Azure.Databricks.Client/StatementExecutionApiClient.cs diff --git a/csharp/Microsoft.Azure.Databricks.Client.Sample/SampleProgram.StatementExecution.cs b/csharp/Microsoft.Azure.Databricks.Client.Sample/SampleProgram.StatementExecution.cs new file mode 100644 index 0000000..5b054c5 --- /dev/null +++ b/csharp/Microsoft.Azure.Databricks.Client.Sample/SampleProgram.StatementExecution.cs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Microsoft.Azure.Databricks.Client.Models; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Azure.Databricks.Client.Sample; + +internal static partial class SampleProgram +{ + private static async Task TestStatementExecutionApi(DatabricksClient client) + { + Console.WriteLine($"Creating new warehouse"); + + var w = new WarehouseAttributes + { + Name = "Testing Warehouse", + WarehouseType = WarehouseType.PRO, + EnableServerlessCompute = true, + Channel = new Channel { Name = ChannelName.CHANNEL_NAME_UNSPECIFIED }, + SpotInstancePolicy = SpotInstancePolicy.POLICY_UNSPECIFIED, + ClusterSize = "2X-Small", + AutoStopMins = 20, + MaxNumClusters = 2, + }; + + var id = await client.SQL.Warehouse.Create(w); + + Console.WriteLine($"Starting warehouse id {id}"); + await client.SQL.Warehouse.Start(id); + Thread.Sleep(10 * 1000); + + Console.WriteLine($"Querying warehouse id {id}"); + var s = SqlStatement.Create("select * from main.information_schema.catalogs", id); + + var result = await client.SQL.StatementExecution.Execute(s); + Console.WriteLine(result.Status.State); + Console.WriteLine($"Row count: {result.Manifest.TotalRowCount}"); + + Console.WriteLine($"Stopping warehouse id {id}"); + await client.SQL.Warehouse.Stop(id); + + Console.WriteLine($"Deleting warehouse id {id}"); + await client.SQL.Warehouse.Delete(id); + } +} diff --git a/csharp/Microsoft.Azure.Databricks.Client.Sample/SampleProgram.cs b/csharp/Microsoft.Azure.Databricks.Client.Sample/SampleProgram.cs index a8f7f28..8817263 100644 --- a/csharp/Microsoft.Azure.Databricks.Client.Sample/SampleProgram.cs +++ b/csharp/Microsoft.Azure.Databricks.Client.Sample/SampleProgram.cs @@ -62,7 +62,8 @@ internal static partial class SampleProgram //await TestWarehouseApi(client); //await TestReposApi(client); //await TestPipelineApi(client); - await TestUnityCatalogApi(client); + //await TestUnityCatalogApi(client); + await TestStatementExecutionApi(client); } Console.WriteLine("Press enter to exit"); diff --git a/csharp/Microsoft.Azure.Databricks.Client.Test/StatementExecutionApiClientTest.cs b/csharp/Microsoft.Azure.Databricks.Client.Test/StatementExecutionApiClientTest.cs new file mode 100644 index 0000000..e025ce4 --- /dev/null +++ b/csharp/Microsoft.Azure.Databricks.Client.Test/StatementExecutionApiClientTest.cs @@ -0,0 +1,185 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Microsoft.Azure.Databricks.Client.Models; +using Moq; +using Moq.Contrib.HttpClient; +using System.Net; +using System.Text.Json; +using System.Text.Json.Nodes; + +namespace Microsoft.Azure.Databricks.Client.Test; + +[TestClass] +public class StatementExecutionApiClientTest : ApiClientTest +{ + private static readonly Uri StatementExecutionApiUri = new(BaseApiUri, "2.0/sql/statements"); + + [TestMethod] + public async Task TestExecute() + { + const string expectedRequest = "{\"statement\":\"string\",\"warehouse_id\":\"string\",\"parameters\":[]}"; + const string expectedResponse = @" + { + ""statement_id"": ""string"", + ""status"": { + ""state"": ""SUCCEEDED"" + }, + ""manifest"": { + ""format"": ""JSON_ARRAY"", + ""schema"": { + ""column_count"": 1, + ""columns"": [ + { + ""name"": ""string"", + ""position"": 0, + ""type_name"": ""string"", + ""type_text"": ""string"" + } + ] + } + }, + ""result"": { + ""chunk_index"": 0, + ""row_offset"": 0, + ""row_count"": 1, + ""data_array"": [ + [ ""0"" ] + ] + } + } + "; + + var expected = JsonSerializer.Deserialize(expectedResponse, Options); + + var handler = CreateMockHandler(); + handler + .SetupRequest(HttpMethod.Post, StatementExecutionApiUri) + .ReturnsResponse(HttpStatusCode.OK, JsonSerializer.Serialize(expected, Options), "application/json") + .Verifiable(); + + var hc = handler.CreateClient(); + hc.BaseAddress = BaseApiUri; + + using var client = new StatementExecutionApiClient(hc); + var statement = JsonNode.Parse(expectedRequest).Deserialize(Options); + + var actual = await client.Execute(statement); + Assert.IsTrue(expected.Equals(actual)); + + handler.VerifyRequest( + HttpMethod.Post, + StatementExecutionApiUri, + GetMatcher(expectedRequest), + Times.Once() + ); + } + + [TestMethod] + public async Task TestGet() + { + string testId = "1234-567890-cited123"; + string apiUri = $"{StatementExecutionApiUri}/{testId}"; + + const string expectedResponse = @" + { + ""statement_id"": ""string"", + ""status"": { + ""state"": ""SUCCEEDED"" + }, + ""manifest"": { + ""format"": ""JSON_ARRAY"", + ""schema"": { + ""column_count"": 1, + ""columns"": [ + { + ""name"": ""string"", + ""position"": 0, + ""type_name"": ""string"", + ""type_text"": ""string"" + } + ] + } + }, + ""result"": { + ""chunk_index"": 0, + ""row_offset"": 0, + ""row_count"": 1, + ""data_array"": [ + [ ""0"" ] + ] + } + } + "; + + var expected = JsonSerializer.Deserialize(expectedResponse, Options); + + var handler = CreateMockHandler(); + handler + .SetupRequest(HttpMethod.Get, apiUri) + .ReturnsResponse(HttpStatusCode.OK, expectedResponse, "application/json"); + + var hc = handler.CreateClient(); + hc.BaseAddress = BaseApiUri; + + using var client = new StatementExecutionApiClient(hc); + var actual = await client.Get(testId); + Assert.IsTrue(expected.Equals(actual)); + } + + [TestMethod] + public async Task TestCancel() + { + string testId = "1234-567890-cited123"; + string apiUri = $"{StatementExecutionApiUri}/{testId}/cancel"; + var handler = CreateMockHandler(); + handler + .SetupRequest(HttpMethod.Post, apiUri) + .ReturnsResponse(HttpStatusCode.OK) + .Verifiable(); + + var hc = handler.CreateClient(); + hc.BaseAddress = BaseApiUri; + + using var client = new StatementExecutionApiClient(hc); + await client.Cancel(testId); + handler.VerifyRequest( + HttpMethod.Post, + apiUri, + Times.Once() + ); + } + + [TestMethod] + public async Task TestGetResultChunk() + { + string testId = "1234-567890-cited123"; + int chunkIndex = 0; + string apiUri = $"{StatementExecutionApiUri}/{testId}/result/chunks/{chunkIndex}"; + + const string expectedResponse = @" + { + ""chunk_index"": 0, + ""row_offset"": 0, + ""row_count"": 1, + ""data_array"": [ + [ ""0"" ] + ] + } + "; + + var expected = JsonSerializer.Deserialize(expectedResponse, Options); + + var handler = CreateMockHandler(); + handler + .SetupRequest(HttpMethod.Get, apiUri) + .ReturnsResponse(HttpStatusCode.OK, expectedResponse, "application/json"); + + var hc = handler.CreateClient(); + hc.BaseAddress = BaseApiUri; + + using var client = new StatementExecutionApiClient(hc); + var actual = await client.GetResultChunk(testId, chunkIndex); + Assert.IsTrue(expected.Equals(actual)); + } +} diff --git a/csharp/Microsoft.Azure.Databricks.Client/ISQLApi.cs b/csharp/Microsoft.Azure.Databricks.Client/ISQLApi.cs index 447994b..c77489e 100644 --- a/csharp/Microsoft.Azure.Databricks.Client/ISQLApi.cs +++ b/csharp/Microsoft.Azure.Databricks.Client/ISQLApi.cs @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Databricks.Client { public interface ISQLApi : IDisposable { + IStatementExecutionApi StatementExecution { get; } IWarehouseApi Warehouse { get; } } @@ -58,4 +59,34 @@ namespace Microsoft.Azure.Databricks.Client /// Required. Id of the SQL warehouse. Task Stop(string id, CancellationToken cancellationToken = default); } + + public interface IStatementExecutionApi : IDisposable + { + /// + /// Execute a SQL statement. + /// + Task Execute(SqlStatement statement, CancellationToken cancellationToken = default); + + /// + /// Cancel statement execution. + /// + /// Requried. Id of statement execution. + Task Cancel(string id, CancellationToken cancellationToken = default); + + /// + /// Get status, manifest, and result first chunk. + /// + /// Requried. Id of statement execution. + Task Get(string id, CancellationToken cancellationToken = default); + + /// + /// Get result chunk by index. + /// + /// + /// After the statement execution has SUCCEEDED, this request can be used to fetch any chunk by index. Whereas the first chunk with chunk_index=0 is typically fetched with statementexecution/executestatement or statementexecution/getstatement, this request can be used to fetch subsequent chunks. The response structure is identical to the nested result element described in the statementexecution/getstatement request, and similarly includes the next_chunk_index and next_chunk_internal_link fields for simple iteration through the result set. + /// + /// Requried. Id of statement execution. + /// Required. The index of the chunk. + Task GetResultChunk(string id, int chunkIndex, CancellationToken cancellationToken = default); + } } diff --git a/csharp/Microsoft.Azure.Databricks.Client/Models/StatementExecution.cs b/csharp/Microsoft.Azure.Databricks.Client/Models/StatementExecution.cs new file mode 100644 index 0000000..f3e1d03 --- /dev/null +++ b/csharp/Microsoft.Azure.Databricks.Client/Models/StatementExecution.cs @@ -0,0 +1,626 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json.Serialization; + +namespace Microsoft.Azure.Databricks.Client.Models; + +/// +/// A SQL statement and its execution options. +/// +public record SqlStatement +{ + /// + /// Creates a new SQL statement. + /// + /// The SQL statement to execute. + /// Warehouse upon which to execute a statement. + /// A list of parameters to pass into a SQL statement containing parameter markers. + /// A SQL statement that can be executed. + public static SqlStatement Create(string statement, string warehouseId, params SqlStatementParameter[] parameters) + { + ArgumentNullException.ThrowIfNull(statement); + ArgumentNullException.ThrowIfNull(warehouseId); + + return new SqlStatement + { + Statement = statement, + WarehouseId = warehouseId, + Parameters = new List(parameters) + }; + } + + /// + /// The SQL statement to execute. The statement can optionally be parameterized, see . + /// + [JsonPropertyName("statement")] + public string Statement { get; set; } + + /// + /// Warehouse upon which to execute a statement. + /// + [JsonPropertyName("warehouse_id")] + public string WarehouseId { get; set; } + + /// + /// Sets default catalog for statement execution, similar to USE CATALOG in SQL. + /// + [JsonPropertyName("catalog")] + public string Catalog { get; set; } + + /// + /// Sets default schema for statement execution, similar to USE SCHEMA in SQL. + /// + [JsonPropertyName("schema")] + public string Schema { get; set; } + + /// + /// A list of parameters to pass into a SQL statement containing parameter markers. + /// + [JsonPropertyName("parameters")] + public ICollection Parameters { get; set; } = new List(); + + /// + /// Applies the given row limit to the statement's result set, but unlike the LIMIT clause in SQL, it also sets the truncated field in the response to indicate whether the result was trimmed due to the limit or not. + /// + [JsonPropertyName("row_limit")] + public long? RowLimit { get; set; } + + /// + /// Applies the given byte limit to the statement's result size. Byte counts are based on internal data representations and might not match the final size in the requested format. If the result was truncated due to the byte limit, then truncated in the response is set to true. When using EXTERNAL_LINKS disposition, a default byte_limit of 100 GiB is applied if byte_limit is not explcitly set. + /// + [JsonPropertyName("byte_limit")] + public long? ByteLimit { get; set; } + + /// + /// The fetch disposition provides two modes of fetching results: INLINE and EXTERNAL_LINKS. + /// + [JsonPropertyName("disposition")] + public SqlStatementDisposition Disposition { get; set; } + + /// + /// Statement execution supports three result formats: JSON_ARRAY (default), ARROW_STREAM, and CSV. + /// + /// + /// Important: The formats ARROW_STREAM and CSV are supported only with EXTERNAL_LINKS disposition. JSON_ARRAY is supported in INLINE and EXTERNAL_LINKS disposition. + /// + [JsonPropertyName("format")] + public StatementFormat Format { get; set; } = StatementFormat.JSON_ARRAY; + + /// + /// The time in seconds the call will wait for the statement's result set as Ns, where N can be set to 0 or to a value between 5 and 50. + /// + [JsonPropertyName("wait_timeout")] + public string WaitTimeout { get; set; } + + /// + /// When wait_timeout > 0s, the call will block up to the specified time. If the statement execution doesn't finish within this time, on_wait_timeout determines whether the execution should continue or be canceled. When set to CONTINUE, the statement execution continues asynchronously and the call returns a statement ID which can be used for polling with statementexecution/getstatement. When set to CANCEL, the statement execution is canceled and the call returns with a CANCELED state. + /// + [JsonPropertyName("on_wait_timeout")] + public SqlStatementOnWaitTimeout OnWaitTimeout { get; set; } = SqlStatementOnWaitTimeout.CONTINUE; +} + +public enum SqlStatementOnWaitTimeout +{ + CONTINUE, + CANCEL +} + +public static class SqlStatementWaitTimeout +{ + /// + /// Create a wait_timeout formatted string. + /// + /// The number of seconds. + /// A string formatted to the wait_timeout specification. + public static string Create(int seconds) + => $"{seconds}s"; + + /// + /// Create a wait_timeout formatted string. + /// + /// The number of seconds. + /// A string formatted to the wait_timeout specification. + public static string Create(TimeSpan timeout) + => $"{Convert.ToInt32(timeout.TotalSeconds)}s"; +} + +public enum SqlStatementDisposition +{ + /// + /// Statements executed with INLINE disposition will return result data inline, in JSON_ARRAY format, in a series of chunks. If a given statement produces a result set with a size larger than 25 MiB, that statement execution is aborted, and no result set will be available. + /// + INLINE, + + /// + /// Statements executed with EXTERNAL_LINKS disposition will return result data as external links: URLs that point to cloud storage internal to the workspace. Using EXTERNAL_LINKS disposition allows statements to generate arbitrarily sized result sets for fetching up to 100 GiB. The resulting links have two important properties: + /// 1. They point to resources external to the Databricks compute; therefore any associated authentication information (typically a personal access token, OAuth token, or similar) must be removed when fetching from these links. + /// 2. These are presigned URLs with a specific expiration, indicated in the response. The behavior when attempting to use an expired link is cloud specific. + /// + EXTERNAL_LINKS +} + +public enum StatementFormat +{ + /// + /// When specifying format=JSON_ARRAY, result data will be formatted as an array of arrays of values, where each value is either the string representation of a value, or null. For example, the output of SELECT concat('id-', id) AS strCol, id AS intCol, null AS nullCol FROM range(3) would look like this: + /// [ + /// ["id-1", "1", null], + /// ["id-2", "2", null], + /// ["id-3", "3", null], + /// ] + /// + /// When specifying format=JSON_ARRAY and disposition=EXTERNAL_LINKS, each chunk in the result contains compact JSON with no indentation or extra whitespace. + /// + JSON_ARRAY, + + /// + /// When specifying format=ARROW_STREAM and disposition=EXTERNAL_LINKS, each chunk in the result will be formatted as Apache Arrow Stream. + /// + ARROW_STREAM, + + /// + /// When specifying format=CSV and disposition=EXTERNAL_LINKS, each chunk in the result will be a CSV according to RFC 4180 standard. All the columns values will have string representation similar to the JSON_ARRAY format, and null values will be encoded as “null”. Only the first chunk in the result would contain a header row with column names. For example, the output of SELECT concat('id-', id) AS strCol, id AS intCol, null as nullCol FROM range(3) would look like this: + /// strCol, intCol, nullCol + /// id-1,1,null + /// id-2,2,null + /// id-3,3,null + /// + CSV +} + +public record SqlStatementParameter +{ + /// + /// The name of a parameter marker to be substituted in the statement. + /// + [JsonPropertyName("name")] + public string Name { get; set; } + + /// + /// The value to substitute, represented as a string. If omitted, the value is interpreted as NULL. + /// + [JsonPropertyName("value")] + public string Value { get; set; } + + /// + /// The data type, given as a string. For example: INT, STRING, DECIMAL(10,2). If no type is given the type is assumed to be STRING. Complex types, such as ARRAY, MAP, and STRUCT are not supported. For valid types, refer to the section Data types of the SQL language reference. + /// + [JsonPropertyName("type")] + public string Type { get; set; } + + /// + /// Create a instance based on the specified data type. + /// + /// The type of the parameter value. + /// The parameter name. + /// The parameter value. + /// An instance that has the best fit type and converted value format. + public static SqlStatementParameter Create(string name, T value) + { + return new SqlStatementParameter + { + Name = name, + Value = Convert(value), + Type = SqlStatementParameterTypes.From(value) + }; + } + + private static string Convert(T value) + => value switch + { + null => string.Empty, + byte[] bytes => System.Convert.ToHexString(bytes), + DateOnly date => date.ToString("yyyy-MM-dd"), + DateTime timestamp => timestamp.ToString("O"), + DateTimeOffset timestamp => timestamp.ToString("O"), + _ => value.ToString() + }; +} + +public static class SqlStatementParameterTypes +{ + public static readonly string Boolean = "BOOLEAN"; + public static readonly string DateOnly = "DATE"; + public static readonly string DateTime = "TIMESTAMP"; + public static string Decimal(int precision = 10, int scale = 0) => $"DECIMAL({precision}, {scale})"; + public static readonly string Double = "DOUBLE"; + public static readonly string Float = "FLOAT"; + public static readonly string Binary = "BINARY"; + public static readonly string Byte = "TINYINT"; + public static readonly string Int16 = "SMALLINT"; + public static readonly string Int32 = "INT"; + public static readonly string Int64 = "BIGINT"; + public static readonly string String = "STRING"; + + /// + /// Attempts to convert the value to a best fit parameter data type. + /// + /// The value type. + /// The value of the type. + /// The best parameter data type for the provided value type. + public static string From(T value) + => value switch + { + bool _ => Boolean, + DateOnly _ => DateOnly, + DateTime _ => DateTime, + decimal _ => Decimal(), + double _ => Double, + float _ => Float, + byte[] _ => Binary, + byte _ => Byte, + short _ => Int16, + int _ => Int32, + long _ => Int64, + _ => String + }; +} + +public record StatementExecution +{ + /// + /// The statement ID is returned upon successfully submitting a SQL statement, and is a required reference for all subsequent calls. + /// + [JsonPropertyName("statement_id")] + public string StatementId { get; set; } + + /// + /// The status response includes execution state and if relevant, error information. + /// + [JsonPropertyName("status")] + public StatementExecutionStatus Status { get; set; } + + /// + /// The result manifest provides schema and metadata for the result set. + /// + [JsonPropertyName("manifest")] + public StatementExecutionManifest Manifest { get; set; } + + /// + /// Contains the result data of a single chunk when using INLINE disposition. When using EXTERNAL_LINKS disposition, the array external_links is used instead to provide presigned URLs to the result data in cloud storage. Exactly one of these alternatives is used. (While the external_links array prepares the API to return multiple links in a single response. Currently only a single link is returned.) + /// + [JsonPropertyName("result")] + public StatementExecutionResultChunk Result { get; set; } +} + +public record StatementExecutionStatus +{ + /// + /// The statement execution state. + /// + [JsonPropertyName("state")] + public StatementExecutionState State { get; set; } + + /// + /// The statement error information, if available. + /// + [JsonPropertyName("error")] + public StatementExecutionError Error { get; set; } +} + +public enum StatementExecutionState +{ + /// + /// Waiting for warehouse. + /// + PENDING, + + RUNNING, + + /// + /// Execution was successful, result data available for fetch + /// + SUCCEEDED, + + /// + /// Execution failed; reason for failure described in accompanying error message + /// + FAILED, + + /// + /// User canceled; can come from explicit cancel call, or timeout with on_wait_timeout=CANCEL + /// + CANCELED, + + /// + /// Execution successful, and statement closed; result no longer available for fetch + /// + CLOSED +} + +public record StatementExecutionError +{ + /// + /// The statement error code. + /// + [JsonPropertyName("error_code")] + public StatementExecutionErrorCode ErrorCode { get; set; } + + /// + /// A brief summary of the error condition. + /// + [JsonPropertyName("message")] + public string Message { get; set; } +} + +public enum StatementExecutionErrorCode +{ + UNKNOWN, + INTERNAL_ERROR, + TEMPORARILY_UNAVAILABLE, + IO_ERROR, + BAD_REQUEST, + SERVICE_UNDER_MAINTENANCE, + WORKSPACE_TEMPORARILY_UNAVAILABLE, + DEADLINE_EXCEEDED, + CANCELLED, + RESOURCE_EXHAUSTED, + ABORTED, + NOT_FOUND, + ALREADY_EXISTS, + UNAUTHENTICATED +} + +public record StatementExecutionManifest +{ + /// + /// The statement execution result format. + /// + [JsonPropertyName("format")] + public StatementFormat Format { get; set; } + + /// + /// The schema is an ordered list of column descriptions. + /// + [JsonPropertyName("schema")] + public StatementExecutionSchema Schema { get; set; } + + /// + /// The total number of chunks that the result set has been divided into. + /// + [JsonPropertyName("total_chunk_count")] + public int TotalChunkCount { get; set; } + + /// + /// Array of result set chunk metadata. + /// + [JsonPropertyName("chunks")] + public StatementExecutionChunk[] Chunks { get; set; } = Array.Empty(); + + /// + /// The total number of rows in the result set. + /// + [JsonPropertyName("total_row_count")] + public long TotalRowCount { get; set; } + + /// + /// The total number of bytes in the result set. This field is not available when using INLINE disposition. + /// + [JsonPropertyName("total_byte_count")] + public long TotalByteCount { get; set; } + + /// + /// Indicates whether the result is truncated due to row_limit or byte_limit. + /// + [JsonPropertyName("truncated")] + public bool Truncated { get; set; } + + public virtual bool Equals(StatementExecutionManifest other) + { + return other is not null + && Format.Equals(other.Format) + && Schema.Equals(other.Schema) + && TotalChunkCount == other.TotalChunkCount + && Chunks.SequenceEqual(other.Chunks) + && TotalRowCount == other.TotalRowCount + && TotalByteCount == other.TotalByteCount + && Truncated == other.Truncated; + } + + public override int GetHashCode() + { + var hash = HashCode.Combine(Format, Schema, TotalChunkCount, TotalRowCount, TotalByteCount, Truncated); + foreach (var chunk in Chunks) + { + hash *= chunk.GetHashCode(); + } + return hash; + } +} + +public record StatementExecutionSchema +{ + /// + /// The number of columns. + /// + [JsonPropertyName("column_count")] + public int ColumnCount { get; set; } + + /// + /// The columns in the schema. + /// + [JsonPropertyName("columns")] + public IEnumerable Columns { get; set; } + + public virtual bool Equals(StatementExecutionSchema other) + { + return other is not null + && ColumnCount == other.ColumnCount + && Columns.SequenceEqual(other.Columns); + } + + public override int GetHashCode() + { + var hash = ColumnCount.GetHashCode(); + foreach (var column in Columns) + { + hash *= column.GetHashCode(); + } + return hash; + } +} + +public record StatementExecutionSchemaColumn +{ + /// + /// The name of the column. + /// + [JsonPropertyName("name")] + public string Name { get; set; } + + /// + /// The full SQL type specification. + /// + [JsonPropertyName("type_text")] + public string TypeText { get; set; } + + /// + /// The name of the base data type. This doesn't include details for complex types such as STRUCT, MAP or ARRAY. + /// + [JsonPropertyName("type_name")] + public string TypeName { get; set; } + + /// + /// Specifies the number of digits in a number. This applies to the DECIMAL type. + /// + [JsonPropertyName("type_precision")] + public int TypePrecision { get; set; } + + /// + /// Specifies the number of digits to the right of the decimal point in a number. This applies to the DECIMAL type. + /// + [JsonPropertyName("type_scale")] + public int TypeScale { get; set; } + + /// + /// The format of the interval type. + /// + [JsonPropertyName("type_interval_type")] + public string TypeIntervalType { get; set; } + + /// + /// The ordinal position of the column (starting at position 0). + /// + [JsonPropertyName("position")] + public int Position { get; set; } +} + +public static class StatementExecutionSchemaColumnTypes +{ + public static readonly string Boolean = "BOOLEAN"; + public static readonly string Byte = "BYTE"; + public static readonly string Short = "SHORT"; + public static readonly string Int = "INT"; + public static readonly string Long = "LONG"; + public static readonly string Float = "FLOAT"; + public static readonly string Double = "DOUBLE"; + public static readonly string Date = "DATE"; + public static readonly string Timestamp = "TIMESTAMP"; + public static readonly string String = "STRING"; + public static readonly string Binary = "BINARY"; + public static readonly string Decimal = "DECIMAL"; + public static readonly string Interval = "INTERVAL"; + public static readonly string Array = "ARRAY"; + public static readonly string Struct = "STRUCT"; + public static readonly string Map = "MAP"; + public static readonly string Char = "CHAR"; + public static readonly string Null = "NULL"; + public static readonly string UserDefinedType = "USER_DEFINED_TYPE"; +} + +public record StatementExecutionChunk +{ + /// + /// The position within the sequence of result set chunks. + /// + [JsonPropertyName("chunk_index")] + public int ChunkIndex { get; set; } + + /// + /// The starting row offset within the result set. + /// + [JsonPropertyName("row_offset")] + public long RowOffset { get; set; } + + /// + /// The number of rows within the result chunk. + /// + [JsonPropertyName("row_count")] + public long RowCount { get; set; } + + /// + /// The number of bytes in the result chunk. This field is not available when using INLINE disposition. + /// + [JsonPropertyName("byte_count")] + public long ByteCount { get; set; } +} + +public record StatementExecutionResult : StatementExecutionChunk +{ + /// + /// When fetching, provides the chunk_index for the next chunk. If absent, indicates there are no more chunks. + /// + [JsonPropertyName("next_chunk_index")] + public int NextChunkIndex { get; set; } + + /// + /// When fetching, provides a link to fetch the next chunk. If absent, indicates there are no more chunks. This link is an absolute path to be joined with your $DATABRICKS_HOST, and should be treated as an opaque link. This is an alternative to using next_chunk_index. + /// + [JsonPropertyName("next_chunk_internal_link")] + public string NextChunkInternalLink { get; set; } +} + +public record StatementExecutionResultChunk : StatementExecutionResult +{ + /// + /// The JSON_ARRAY format is an array of arrays of values, where each non-null value is formatted as a string. Null values are encoded as JSON null. + /// + [JsonPropertyName("data_array")] + public string[][] DataArray { get; set; } = Array.Empty(); + + /// + /// The list of external links to the result data in cloud storage. This field is not available when using INLINE disposition. + /// + [JsonPropertyName("external_links")] + public IEnumerable ExternalLinks { get; set; } = Array.Empty(); + + public virtual bool Equals(StatementExecutionResultChunk other) + { + if (other is null) + { + return false; + } + + if (DataArray.Length != other.DataArray.Length) + { + return false; + } + + return base.Equals(other) + && ExternalLinks.SequenceEqual(other.ExternalLinks); + } + + public override int GetHashCode() + { + return base.GetHashCode(); + } +} + +public record StatementExecutionExternalLink : StatementExecutionResult +{ + /// + /// A presigned URL pointing to a chunk of result data, hosted by an external service, with a short expiration time (<= 15 minutes). As this URL contains a temporary credential, it should be considered sensitive and the client should not expose this URL in a log. + /// + [JsonPropertyName("external_link")] + public string ExternalLink { get; set; } + + /// + /// Indicates the date-time that the given external link will expire and becomes invalid, after which point a new external_link must be requested. + /// + [JsonPropertyName("expiration")] + public DateTime Expiration { get; set; } +} \ No newline at end of file diff --git a/csharp/Microsoft.Azure.Databricks.Client/SQLApiClient.cs b/csharp/Microsoft.Azure.Databricks.Client/SQLApiClient.cs index 68af19d..7d068bc 100644 --- a/csharp/Microsoft.Azure.Databricks.Client/SQLApiClient.cs +++ b/csharp/Microsoft.Azure.Databricks.Client/SQLApiClient.cs @@ -9,9 +9,12 @@ namespace Microsoft.Azure.Databricks.Client { public SQLApiClient(HttpClient httpClient) : base(httpClient) { + this.StatementExecution = new StatementExecutionApiClient(httpClient); this.Warehouse = new WarehouseApiClient(httpClient); } + public IStatementExecutionApi StatementExecution { get; } + public IWarehouseApi Warehouse { get; } } } diff --git a/csharp/Microsoft.Azure.Databricks.Client/StatementExecutionApiClient.cs b/csharp/Microsoft.Azure.Databricks.Client/StatementExecutionApiClient.cs new file mode 100644 index 0000000..95985f8 --- /dev/null +++ b/csharp/Microsoft.Azure.Databricks.Client/StatementExecutionApiClient.cs @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Microsoft.Azure.Databricks.Client.Models; +using System.Net.Http; +using System.Text.Json; +using System.Text.Json.Nodes; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Azure.Databricks.Client +{ + public class StatementExecutionApiClient : ApiClient, IStatementExecutionApi + { + private readonly string _apiBaseUrl; + + public StatementExecutionApiClient(HttpClient httpClient) : base(httpClient) + { + _apiBaseUrl = $"{ApiVersion}/sql/statements"; + } + + public async Task Cancel(string id, CancellationToken cancellationToken = default) + { + await HttpPost(this.HttpClient, $"{this._apiBaseUrl}/{id}/cancel", new { }, cancellationToken).ConfigureAwait(false); + } + + public async Task Execute(SqlStatement statement, CancellationToken cancellationToken = default) + { + var jsonObj = JsonSerializer.SerializeToNode(statement, Options)!.AsObject(); + + var execution = await HttpPost( + this.HttpClient, + this._apiBaseUrl, + jsonObj, + cancellationToken + ).ConfigureAwait(false); + + return execution.Deserialize(Options); + } + + public async Task GetResultChunk(string id, int chunkIndex, CancellationToken cancellationToken = default) + { + var execution = await HttpGet( + this.HttpClient, + $"{this._apiBaseUrl}/{id}/result/chunks/{chunkIndex}", + cancellationToken + ).ConfigureAwait(false); + + return execution.Deserialize(Options); + } + + public async Task Get(string id, CancellationToken cancellationToken = default) + { + var execution = await HttpGet( + this.HttpClient, + $"{this._apiBaseUrl}/{id}", + cancellationToken + ).ConfigureAwait(false); + + return execution.Deserialize(Options); + } + } +}