From 537dcd242f6c4c22170cc719e78cfad159672d2e Mon Sep 17 00:00:00 2001 From: AsafMah Date: Thu, 14 Apr 2022 09:44:43 +0300 Subject: [PATCH] From rust sdk - squashed Signed-off-by: AsafMah --- azure-kusto-data/Cargo.toml | 34 ++ azure-kusto-data/README.md | 8 + azure-kusto-data/examples/query.rs | 52 ++ azure-kusto-data/src/arrow.rs | 272 ++++++++++ azure-kusto-data/src/authorization_policy.rs | 55 ++ azure-kusto-data/src/client.rs | 182 +++++++ azure-kusto-data/src/connection_string.rs | 480 ++++++++++++++++++ azure-kusto-data/src/error.rs | 47 ++ azure-kusto-data/src/lib.rs | 22 +- azure-kusto-data/src/operations/mod.rs | 1 + azure-kusto-data/src/operations/query.rs | 216 ++++++++ azure-kusto-data/src/prelude.rs | 16 + azure-kusto-data/tests/arrow.rs | 16 + .../tests/inputs/adminthenquery.json | 194 +++++++ azure-kusto-data/tests/inputs/dataframe.json | 216 ++++++++ codecov.yml | 8 + 16 files changed, 1805 insertions(+), 14 deletions(-) create mode 100644 azure-kusto-data/README.md create mode 100644 azure-kusto-data/examples/query.rs create mode 100644 azure-kusto-data/src/arrow.rs create mode 100644 azure-kusto-data/src/authorization_policy.rs create mode 100644 azure-kusto-data/src/client.rs create mode 100644 azure-kusto-data/src/connection_string.rs create mode 100644 azure-kusto-data/src/error.rs create mode 100644 azure-kusto-data/src/operations/mod.rs create mode 100644 azure-kusto-data/src/operations/query.rs create mode 100644 azure-kusto-data/src/prelude.rs create mode 100644 azure-kusto-data/tests/arrow.rs create mode 100644 azure-kusto-data/tests/inputs/adminthenquery.json create mode 100644 azure-kusto-data/tests/inputs/dataframe.json diff --git a/azure-kusto-data/Cargo.toml b/azure-kusto-data/Cargo.toml index 2dd47ce..fa5c6e7 100644 --- a/azure-kusto-data/Cargo.toml +++ b/azure-kusto-data/Cargo.toml @@ -1,6 +1,40 @@ [package] name = "azure-kusto-data" version = "0.1.0" +description = "Rust wrappers around Microsoft Azure REST APIs - Azure Data Explorer" +readme = "README.md" +license = "MIT" edition = "2021" +repository = "https://github.com/azure/azure-sdk-for-rust" +homepage = "https://github.com/azure/azure-sdk-for-rust" +documentation = "https://docs.rs/azure_kusto_data" +keywords = ["sdk", "azure", "kusto", "azure-data-explorer"] +categories = ["api-bindings"] [dependencies] +arrow = { version = "9", optional = true } +azure_core = { git = "https://github.com/roeap/azure-sdk-for-rust", branch="kusto" , features = [ + "enable_reqwest", + "enable_reqwest_gzip", +] } +azure_identity = { git = "https://github.com/roeap/azure-sdk-for-rust", branch="kusto" } +async-trait = "0.1" +async-convert = "1" +bytes = "1" +futures = "0.3" +http = "0.2" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1" +thiserror = "1" +lazy_static = "1.4.0" +hashbrown = "0.12.0" + +[dev-dependencies] +env_logger = "0.9" +tokio = { version = "1", features = ["macros"] } + +[features] +default = ["arrow"] +mock_transport_framework = ["azure_core/mock_transport_framework"] +#into_future = [] TODO - properly turn it on +test_e2e = [] diff --git a/azure-kusto-data/README.md b/azure-kusto-data/README.md new file mode 100644 index 0000000..52e979d --- /dev/null +++ b/azure-kusto-data/README.md @@ -0,0 +1,8 @@ +# Azure SDK for Rust - Azure Kusto crate + +## The Kusto crate. + +`azure-data-kusto` offers functionality needed to interact with Azure Data Explorer (Kusto) from Rust. +As an abstraction over the [Azure Data Explorer REST API](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/rest/) + +For usage have a look at the [examples](https://github.com/Azure/azure-sdk-for-rust/tree/main/sdk/data_kusto/examples). diff --git a/azure-kusto-data/examples/query.rs b/azure-kusto-data/examples/query.rs new file mode 100644 index 0000000..38d7728 --- /dev/null +++ b/azure-kusto-data/examples/query.rs @@ -0,0 +1,52 @@ +use azure_kusto_data::prelude::*; +use std::error::Error; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let service_url = std::env::args() + .nth(1) + .expect("please specify service url name as first command line parameter"); + + let database = std::env::args() + .nth(2) + .expect("please specify database name as second command line parameter"); + + let query = std::env::args() + .nth(3) + .expect("please specify query as third command line parameter"); + + let client_id = + std::env::var("AZURE_CLIENT_ID").expect("Set env variable AZURE_CLIENT_ID first!"); + let client_secret = + std::env::var("AZURE_CLIENT_SECRET").expect("Set env variable AZURE_CLIENT_SECRET first!"); + let authority_id = + std::env::var("AZURE_TENANT_ID").expect("Set env variable AZURE_TENANT_ID first!"); + + let kcsb = ConnectionStringBuilder::new_with_aad_application_key_authentication( + &service_url, + &authority_id, + &client_id, + &client_secret, + ); + + let client = KustoClient::try_from(kcsb).unwrap(); + + let response = client + .execute_query(database, query) + .into_future() + .await + .unwrap(); + + for table in &response.tables { + match table { + ResultTable::DataSetHeader(header) => println!("header: {:?}", header), + ResultTable::DataTable(table) => println!("table: {:?}", table), + ResultTable::DataSetCompletion(completion) => println!("completion: {:?}", completion), + } + } + + let primary_results = response.into_primary_results().collect::>(); + println!("primary results: {:?}", primary_results); + + Ok(()) +} diff --git a/azure-kusto-data/src/arrow.rs b/azure-kusto-data/src/arrow.rs new file mode 100644 index 0000000..b24e7f3 --- /dev/null +++ b/azure-kusto-data/src/arrow.rs @@ -0,0 +1,272 @@ +use crate::operations::query::*; +use arrow::{ + array::{ + ArrayRef, BooleanArray, DurationNanosecondArray, Float64Array, Int32Array, Int64Array, + StringArray, + }, + compute::cast, + datatypes::{DataType, Field, Schema, TimeUnit}, + record_batch::RecordBatch, +}; +use std::sync::Arc; + +const SECOND_TO_NANOSECONDS: i64 = 1000000000; +const MINUTES_TO_SECONDS: i64 = 60; +const HOURS_TO_SECONDS: i64 = 60 * MINUTES_TO_SECONDS; +const DAYS_TO_SECONDS: i64 = 24 * HOURS_TO_SECONDS; +const TICK_TO_NANOSECONDS: i64 = 100; + +#[inline] +fn to_nanoseconds(days: i64, hours: i64, minutes: i64, seconds: i64, ticks: i64) -> i64 { + let d_secs = days * DAYS_TO_SECONDS; + let h_secs = hours * HOURS_TO_SECONDS; + let m_secs = minutes * MINUTES_TO_SECONDS; + let total_secs = d_secs + h_secs + m_secs + seconds; + let rest_in_ns = ticks * TICK_TO_NANOSECONDS; + + total_secs * SECOND_TO_NANOSECONDS + rest_in_ns +} + +fn parse_segment(seg: &str) -> i64 { + let trimmed = seg.trim_start_matches('0'); + if !trimmed.is_empty() { + trimmed.parse::().unwrap() + } else { + 0 + } +} + +fn destructure_time(dur: &str) -> (i64, i64, i64) { + let parts = dur.split(':').collect::>(); + match parts.as_slice() { + [hours, minutes, seconds] => ( + parse_segment(hours), + parse_segment(minutes), + parse_segment(seconds), + ), + _ => (0, 0, 0), + } +} + +/// The timespan format Kusto returns is 'd.hh:mm:ss.ssssss' or 'hh:mm:ss.ssssss' or 'hh:mm:ss' +/// Kusto also stores fractions in ticks: 1 tick = 100 ns +pub fn string_to_duration_i64(dur: Option<&str>) -> Option { + let dur = dur?; + let factor = if dur.starts_with('-') { -1 } else { 1 }; + let parts: Vec<&str> = dur.trim_start_matches('-').split('.').collect(); + let ns = match parts.as_slice() { + [days, hours, ticks] => { + let days_ = parse_segment(days); + let ticks_ = parse_segment(ticks); + let (hours, minutes, seconds) = destructure_time(hours); + to_nanoseconds(days_, hours, minutes, seconds, ticks_) + } + [first, ticks] => { + let ticks_ = parse_segment(ticks); + let (hours, minutes, seconds) = destructure_time(first); + to_nanoseconds(0, hours, minutes, seconds, ticks_) + } + [one] => { + let (hours, minutes, seconds) = destructure_time(one); + to_nanoseconds(0, hours, minutes, seconds, 0) + } + _ => 0, + }; + Some(factor * ns) +} + +fn convert_array_string(values: Vec) -> ArrayRef { + let strings: Vec> = + serde_json::from_value(serde_json::Value::Array(values)).unwrap(); + let strings: Vec> = strings.iter().map(|opt| opt.as_deref()).collect(); + Arc::new(StringArray::from(strings)) +} + +// TODO provide a safe variant for datetime conversions (chrono panics) +fn convert_array_datetime_unsafe(values: Vec) -> ArrayRef { + let strings: Vec> = + serde_json::from_value(serde_json::Value::Array(values)).unwrap(); + let strings: Vec> = strings.iter().map(|opt| opt.as_deref()).collect(); + let string_array: ArrayRef = Arc::new(StringArray::from(strings)); + cast( + &string_array, + &DataType::Timestamp(TimeUnit::Nanosecond, None), + ) + .unwrap() +} + +fn safe_map_f64(value: serde_json::Value) -> Option { + match value { + serde_json::Value::String(val) if val == "NaN" => None, + serde_json::Value::String(val) if val == "Infinity" => Some(f64::INFINITY), + serde_json::Value::String(val) if val == "-Infinity" => Some(-f64::INFINITY), + _ => serde_json::from_value(value).unwrap(), + } +} + +fn convert_array_float(values: Vec) -> ArrayRef { + let reals: Vec> = values.into_iter().map(safe_map_f64).collect(); + Arc::new(Float64Array::from(reals)) +} + +fn convert_array_timespan(values: Vec) -> ArrayRef { + let strings: Vec> = + serde_json::from_value(serde_json::Value::Array(values)).unwrap(); + let durations: Vec> = strings + .iter() + .map(|opt| opt.as_deref()) + .map(string_to_duration_i64) + .collect(); + Arc::new(DurationNanosecondArray::from(durations)) +} + +fn convert_array_bool(values: Vec) -> ArrayRef { + let bools: Vec> = + serde_json::from_value(serde_json::Value::Array(values)).unwrap(); + Arc::new(BooleanArray::from(bools)) +} + +fn convert_array_i32(values: Vec) -> ArrayRef { + let ints: Vec> = serde_json::from_value(serde_json::Value::Array(values)).unwrap(); + Arc::new(Int32Array::from(ints)) +} + +fn convert_array_i64(values: Vec) -> ArrayRef { + let ints: Vec> = serde_json::from_value(serde_json::Value::Array(values)).unwrap(); + Arc::new(Int64Array::from(ints)) +} + +pub fn convert_column(data: Vec, column: Column) -> (Field, ArrayRef) { + match column.column_type { + ColumnType::String => ( + Field::new(column.column_name.as_str(), DataType::Utf8, true), + convert_array_string(data), + ), + ColumnType::Bool | ColumnType::Boolean => ( + Field::new(column.column_name.as_str(), DataType::Boolean, true), + convert_array_bool(data), + ), + ColumnType::Int => ( + Field::new(column.column_name.as_str(), DataType::Int32, true), + convert_array_i32(data), + ), + ColumnType::Long => ( + Field::new(column.column_name.as_str(), DataType::Int64, true), + convert_array_i64(data), + ), + ColumnType::Real => ( + Field::new(column.column_name.as_str(), DataType::Float64, true), + convert_array_float(data), + ), + ColumnType::Datetime => ( + Field::new( + column.column_name.as_str(), + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + convert_array_datetime_unsafe(data), + ), + ColumnType::Timespan => ( + Field::new( + column.column_name.as_str(), + DataType::Duration(TimeUnit::Nanosecond), + true, + ), + convert_array_timespan(data), + ), + _ => todo!(), + } +} + +pub fn convert_table(table: DataTable) -> RecordBatch { + let mut buffer: Vec> = Vec::with_capacity(table.columns.len()); + let mut fields: Vec = Vec::with_capacity(table.columns.len()); + let mut columns: Vec = Vec::with_capacity(table.columns.len()); + + for _ in 0..table.columns.len() { + buffer.push(Vec::with_capacity(table.rows.len())); + } + table.rows.into_iter().for_each(|row| { + row.into_iter() + .enumerate() + .for_each(|(idx, value)| buffer[idx].push(value)) + }); + + buffer + .into_iter() + .zip(table.columns.into_iter()) + .map(|(data, column)| convert_column(data, column)) + .for_each(|(field, array)| { + fields.push(field); + columns.push(array); + }); + + RecordBatch::try_new(Arc::new(Schema::new(fields)), columns).unwrap() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deserialize_column() { + let data = r#" { + "ColumnName": "int_col", + "ColumnType": "int" + } "#; + + let c: Column = serde_json::from_str(data).expect("deserialize error"); + let ref_col = Column { + column_name: "int_col".to_string(), + column_type: ColumnType::Int, + }; + assert_eq!(c, ref_col) + } + + #[test] + fn deserialize_table() { + let data = r#" { + "FrameType": "DataTable", + "TableId": 1, + "TableName": "Deft", + "TableKind": "PrimaryResult", + "Columns": [ + { + "ColumnName": "int_col", + "ColumnType": "int" + } + ], + "Rows": [] + } "#; + + let t: DataTable = serde_json::from_str(data).expect("deserialize error"); + let ref_tbl = DataTable { + table_id: 1, + table_name: "Deft".to_string(), + table_kind: TableKind::PrimaryResult, + columns: vec![Column { + column_name: "int_col".to_string(), + column_type: ColumnType::Int, + }], + rows: vec![], + }; + assert_eq!(t, ref_tbl) + } + + #[test] + fn string_conversion() { + let refs: Vec<(&str, i64)> = vec![ + ("1.00:00:00.0000000", 86400000000000), + ("01:00:00.0000000", 3600000000000), + ("01:00:00", 3600000000000), + ("00:05:00.0000000", 300000000000), + ("00:00:00.0000001", 100), + ("-01:00:00", -3600000000000), + ("-1.00:00:00.0000000", -86400000000000), + ]; + + for (from, to) in refs { + assert_eq!(string_to_duration_i64(Some(from)), Some(to)); + } + } +} diff --git a/azure-kusto-data/src/authorization_policy.rs b/azure-kusto-data/src/authorization_policy.rs new file mode 100644 index 0000000..990fc30 --- /dev/null +++ b/azure-kusto-data/src/authorization_policy.rs @@ -0,0 +1,55 @@ +use azure_core::{auth::TokenCredential, Context, Policy, PolicyResult, Request}; +use http::header::AUTHORIZATION; +use http::HeaderValue; +use std::sync::Arc; + +#[derive(Clone)] +pub struct AuthorizationPolicy { + credential: Arc, + resource: String, +} + +impl std::fmt::Debug for AuthorizationPolicy { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("AuthorizationPolicy") + .field("credential", &"TokenCredential") + .field("resource", &self.resource) + .finish() + } +} + +impl AuthorizationPolicy { + pub(crate) fn new(credential: Arc, resource: T) -> Self + where + T: Into, + { + Self { + credential, + resource: resource.into(), + } + } +} + +#[async_trait::async_trait] +impl Policy for AuthorizationPolicy { + async fn send( + &self, + ctx: &Context, + request: &mut Request, + next: &[Arc], + ) -> PolicyResult { + assert!( + !next.is_empty(), + "Authorization policies cannot be the last policy of a pipeline" + ); + + let token = self.credential.get_token(&self.resource).await?; + let auth_header_value = format!("Bearer {}", token.token.secret().clone()); + + request + .headers_mut() + .insert(AUTHORIZATION, HeaderValue::from_str(&auth_header_value)?); + + next[0].send(ctx, request, &next[1..]).await + } +} diff --git a/azure-kusto-data/src/client.rs b/azure-kusto-data/src/client.rs new file mode 100644 index 0000000..c0544c4 --- /dev/null +++ b/azure-kusto-data/src/client.rs @@ -0,0 +1,182 @@ +use crate::authorization_policy::AuthorizationPolicy; +use crate::connection_string::{ConnectionString, ConnectionStringBuilder}; +use crate::error::Result; +use crate::operations::query::ExecuteQueryBuilder; +use azure_core::auth::TokenCredential; +use azure_core::prelude::*; +use azure_core::{ClientOptions, Context, Pipeline, Request}; +use azure_identity::token_credentials::{ + AzureCliCredential, ClientSecretCredential, DefaultAzureCredential, + ImdsManagedIdentityCredential, TokenCredentialOptions, +}; +use std::convert::TryFrom; +use std::fmt::Debug; +use std::sync::Arc; + +const API_VERSION: &str = "2019-02-13"; + +/// Options for specifying how a Kusto client will behave +#[derive(Clone, Default)] +pub struct KustoClientOptions { + options: ClientOptions, +} + +impl KustoClientOptions { + /// Create new options + pub fn new() -> Self { + Self::default() + } + + #[cfg(feature = "mock_transport_framework")] + /// Create new options with a given transaction name + pub fn new_with_transaction_name>(name: T) -> Self { + Self { + options: ClientOptions::new_with_transaction_name(name.into()), + } + } +} + +fn new_pipeline_from_options( + credential: Arc, + resource: &str, + options: KustoClientOptions, +) -> Pipeline { + let auth_policy = Arc::new(AuthorizationPolicy::new(credential, resource)); + // take care of adding the AuthorizationPolicy as **last** retry policy. + let per_retry_policies: Vec> = vec![auth_policy]; + + Pipeline::new( + option_env!("CARGO_PKG_NAME"), + option_env!("CARGO_PKG_VERSION"), + options.options, + Vec::new(), + per_retry_policies, + ) +} + +/// Kusto client for Rust. +/// The client is a wrapper around the Kusto REST API. +/// To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/ +/// +/// The primary methods are: +/// `execute_query`: executes a KQL query against the Kusto service. +#[derive(Clone, Debug)] +pub struct KustoClient { + pipeline: Pipeline, + query_url: String, + management_url: String, +} + +impl KustoClient { + pub fn new_with_options( + url: T, + credential: Arc, + options: KustoClientOptions, + ) -> Result + where + T: Into, + { + let service_url: String = url.into(); + let service_url = service_url.trim_end_matches('/'); + let query_url = format!("{}/v2/rest/query", service_url); + let management_url = format!("{}/v1/rest/mgmt", service_url); + let pipeline = new_pipeline_from_options(credential, service_url, options); + + Ok(Self { + pipeline, + query_url, + management_url, + }) + } + + pub(crate) fn query_url(&self) -> &str { + &self.query_url + } + + pub fn management_url(&self) -> &str { + &self.management_url + } + + /// Execute a KQL query. + /// To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/ + /// + /// # Arguments + /// + /// * `database` - Name of the database in scope that is the target of the query + /// * `query` - Text of the query to execute + pub fn execute_query(&self, database: DB, query: Q) -> ExecuteQueryBuilder + where + DB: Into, + Q: Into, + { + ExecuteQueryBuilder::new(self.clone(), database.into(), query.into(), Context::new()) + } + + pub(crate) fn prepare_request(&self, uri: &str, http_method: http::Method) -> Request { + let mut request = Request::new(uri.parse().unwrap(), http_method); + request.insert_headers(&Version::from(API_VERSION)); + request.insert_headers(&Accept::from("application/json")); + request.insert_headers(&ContentType::new("application/json; charset=utf-8")); + request.insert_headers(&AcceptEncoding::from("gzip")); + request.insert_headers(&ClientVersion::from(format!( + "Kusto.Rust.Client:{}", + env!("CARGO_PKG_VERSION"), + ))); + request + } + + pub(crate) fn pipeline(&self) -> &Pipeline { + &self.pipeline + } +} + +impl<'a> TryFrom> for KustoClient { + type Error = crate::error::Error; + + fn try_from(value: ConnectionString) -> Result { + let service_url = value + .data_source + .expect("A data source / service url must always be specified"); + + let credential: Arc = match value { + ConnectionString { + application_client_id: Some(client_id), + application_key: Some(client_secret), + authority_id: Some(tenant_id), + .. + } => Arc::new(ClientSecretCredential::new( + tenant_id.to_string(), + client_id.to_string(), + client_secret.to_string(), + TokenCredentialOptions::default(), + )), + ConnectionString { + msi_auth: Some(true), + .. + } => Arc::new(ImdsManagedIdentityCredential {}), + ConnectionString { + az_cli: Some(true), .. + } => Arc::new(AzureCliCredential {}), + _ => Arc::new(DefaultAzureCredential::default()), + }; + Self::new_with_options(service_url, credential, KustoClientOptions::new()) + } +} + +impl TryFrom for KustoClient { + type Error = crate::error::Error; + + fn try_from(value: String) -> Result { + let connection_string = ConnectionString::new(value.as_str())?; + Self::try_from(connection_string) + } +} + +impl<'a> TryFrom> for KustoClient { + type Error = crate::error::Error; + + fn try_from(value: ConnectionStringBuilder) -> Result { + let connection_string = value.build(); + Self::try_from(connection_string) + } +} diff --git a/azure-kusto-data/src/connection_string.rs b/azure-kusto-data/src/connection_string.rs new file mode 100644 index 0000000..70a73c3 --- /dev/null +++ b/azure-kusto-data/src/connection_string.rs @@ -0,0 +1,480 @@ +// Set of properties that can be use in a connection string provided to KustoConnectionStringBuilder. +// For a complete list of properties go to https://docs.microsoft.com/en-us/azure/kusto/api/connection-strings/kusto + +use hashbrown::HashMap; +use lazy_static::lazy_static; + +enum ConnectionStringKey { + DataSource, + FederatedSecurity, + UserId, + Password, + ApplicationClientId, + ApplicationKey, + ApplicationCertificate, + ApplicationCertificateThumbprint, + AuthorityId, + ApplicationToken, + UserToken, + MsiAuth, + MsiParams, + AzCli, +} + +impl ConnectionStringKey { + fn to_str(&self) -> &'static str { + match self { + ConnectionStringKey::DataSource => "Data Source", + ConnectionStringKey::FederatedSecurity => "AAD Federated Security", + ConnectionStringKey::UserId => "AAD User ID", + ConnectionStringKey::Password => "Password", + ConnectionStringKey::ApplicationClientId => "Application Client Id", + ConnectionStringKey::ApplicationKey => "Application Key", + ConnectionStringKey::ApplicationCertificate => "ApplicationCertificate", + ConnectionStringKey::ApplicationCertificateThumbprint => { + "Application Certificate Thumbprint" + } + ConnectionStringKey::AuthorityId => "Authority Id", + ConnectionStringKey::ApplicationToken => "ApplicationToken", + ConnectionStringKey::UserToken => "UserToken", + ConnectionStringKey::MsiAuth => "MSI Authentication", + ConnectionStringKey::MsiParams => "MSI Params", + ConnectionStringKey::AzCli => "AZ CLI", + } + } +} + +lazy_static! { + static ref ALIAS_MAP: HashMap<&'static str, ConnectionStringKey> = { + let mut m = HashMap::new(); + m.insert("data source", ConnectionStringKey::DataSource); + m.insert("addr", ConnectionStringKey::DataSource); + m.insert("address", ConnectionStringKey::DataSource); + m.insert("network address", ConnectionStringKey::DataSource); + m.insert("server", ConnectionStringKey::DataSource); + + m.insert( + "aad federated security", + ConnectionStringKey::FederatedSecurity, + ); + m.insert("federated security", ConnectionStringKey::FederatedSecurity); + m.insert("federated", ConnectionStringKey::FederatedSecurity); + m.insert("fed", ConnectionStringKey::FederatedSecurity); + m.insert("aadfed", ConnectionStringKey::FederatedSecurity); + + m.insert("aad user id", ConnectionStringKey::UserId); + m.insert("user id", ConnectionStringKey::UserId); + m.insert("uid", ConnectionStringKey::UserId); + m.insert("user", ConnectionStringKey::UserId); + + m.insert("password", ConnectionStringKey::Password); + m.insert("pwd", ConnectionStringKey::Password); + + m.insert( + "application client id", + ConnectionStringKey::ApplicationClientId, + ); + m.insert("appclientid", ConnectionStringKey::ApplicationClientId); + + m.insert("application key", ConnectionStringKey::ApplicationKey); + m.insert("appkey", ConnectionStringKey::ApplicationKey); + + m.insert( + "application certificate", + ConnectionStringKey::ApplicationCertificate, + ); + + m.insert( + "application certificate thumbprint", + ConnectionStringKey::ApplicationCertificateThumbprint, + ); + m.insert( + "appcert", + ConnectionStringKey::ApplicationCertificateThumbprint, + ); + + m.insert("authority id", ConnectionStringKey::AuthorityId); + m.insert("authorityid", ConnectionStringKey::AuthorityId); + m.insert("authority", ConnectionStringKey::AuthorityId); + m.insert("tenantid", ConnectionStringKey::AuthorityId); + m.insert("tenant", ConnectionStringKey::AuthorityId); + m.insert("tid", ConnectionStringKey::AuthorityId); + + m.insert("application token", ConnectionStringKey::ApplicationToken); + m.insert("apptoken", ConnectionStringKey::ApplicationToken); + + m.insert("user token", ConnectionStringKey::UserToken); + m.insert("usertoken", ConnectionStringKey::UserToken); + + m.insert("msi auth", ConnectionStringKey::MsiAuth); + m.insert("msi_auth", ConnectionStringKey::MsiAuth); + m.insert("msi", ConnectionStringKey::MsiAuth); + + m.insert("msi params", ConnectionStringKey::MsiParams); + m.insert("msi_params", ConnectionStringKey::MsiParams); + m.insert("msi_type", ConnectionStringKey::MsiParams); + + m.insert("az cli", ConnectionStringKey::AzCli); + + m + }; +} + +// TODO: when available +// pub const PUBLIC_APPLICATION_CERTIFICATE_NAME: &str = "Public Application Certificate"; +// pub const INTERACTIVE_LOGIN_NAME: &str = "Interactive Login"; +// pub const LOGIN_HINT_NAME: &str = "Login Hint"; +// pub const DOMAIN_HINT_NAME: &str = "Domain Hint"; +/* + + m.insert("application certificate private key", ConnectionStringKey::ApplicationCertificatePrivateKey); + m.insert("application certificate x5c", ConnectionStringKey::ApplicationCertificateX5C); + m.insert("application certificate send public certificate", ConnectionStringKey::ApplicationCertificateX5C); + m.insert("application certificate sendx5c", ConnectionStringKey::ApplicationCertificateX5C); + m.insert("sendx5c", ConnectionStringKey::ApplicationCertificateX5C); + ConnectionStringKey::ApplicationCertificatePrivateKey => "Application Certificate PrivateKey", + ConnectionStringKey::ApplicationCertificateX5C => "Application Certificate x5c", +*/ + +#[derive(Debug, thiserror::Error)] +pub enum ConnectionStringError { + #[error("Missing value for key '{}'", key)] + MissingValue { key: String }, + #[error("Unexpected key '{}'", key)] + UnexpectedKey { key: String }, + #[error("Parsing error: {}", msg)] + ParsingError { msg: String }, +} + +/// Build a connection string to connect to a Kusto service instance. +/// +/// For more information on Kusto connection strings visit: +/// https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto +#[derive(Default)] +pub struct ConnectionStringBuilder<'a>(ConnectionString<'a>); + +impl<'a> ConnectionStringBuilder<'a> { + /// Creates a ConnectionStringBuilder with no configuration options set + pub fn new() -> Self { + Self(ConnectionString::default()) + } + + /// Creates a ConnectionStringBuilder that will authenticate with AAD application and key. + /// + /// # Arguments + /// + /// * `service_url` - Kusto service url should be of the format: https://.kusto.windows.net + /// * `authority_id` - Authority id (aka Tenant id) must be provided + /// * `client_id` - AAD application ID. + /// * `client_secret` - Corresponding key of the AAD application. + pub fn new_with_aad_application_key_authentication( + service_url: &'a str, + authority_id: &'a str, + client_id: &'a str, + client_secret: &'a str, + ) -> Self { + Self(ConnectionString { + data_source: Some(service_url), + federated_security: Some(true), + application_client_id: Some(client_id), + application_key: Some(client_secret), + authority_id: Some(authority_id), + ..ConnectionString::default() + }) + } + + pub fn build(&self) -> String { + let mut kv_pairs = Vec::new(); + + if let Some(data_source) = self.0.data_source { + kv_pairs.push(format!( + "{}={}", + ConnectionStringKey::DataSource.to_str(), + data_source + )); + } + if let Some(user_id) = self.0.user_id { + kv_pairs.push(format!( + "{}={}", + ConnectionStringKey::UserId.to_str(), + user_id + )); + } + if let Some(application_client_id) = self.0.application_client_id { + kv_pairs.push(format!( + "{}={}", + ConnectionStringKey::ApplicationClientId.to_str(), + application_client_id + )); + } + if let Some(application_key) = self.0.application_key { + kv_pairs.push(format!( + "{}={}", + ConnectionStringKey::ApplicationKey.to_str(), + application_key + )); + } + if let Some(application_token) = self.0.application_token { + kv_pairs.push(format!( + "{}={}", + ConnectionStringKey::ApplicationToken.to_str(), + application_token + )); + } + if let Some(authority_id) = self.0.authority_id { + kv_pairs.push(format!( + "{}={}", + ConnectionStringKey::AuthorityId.to_str(), + authority_id + )); + } + + kv_pairs.join(";") + } + + pub fn data_source(&'a mut self, data_source: &'a str) -> &'a mut Self { + self.0.data_source = Some(data_source); + self + } + + pub fn user_id(&'a mut self, user_id: &'a str) -> &'a mut Self { + self.0.user_id = Some(user_id); + self + } + + pub fn application_client_id(&'a mut self, application_client_id: &'a str) -> &'a mut Self { + self.0.application_client_id = Some(application_client_id); + self + } + + pub fn application_token(&'a mut self, application_token: &'a str) -> &'a mut Self { + self.0.application_token = Some(application_token); + self + } + + pub fn application_key(&'a mut self, application_key: &'a str) -> &'a mut Self { + self.0.application_key = Some(application_key); + self + } + + pub fn authority_id(&'a mut self, authority_id: &'a str) -> &'a mut Self { + self.0.authority_id = Some(authority_id); + self + } +} + +/// A Kusto service connection string. +/// +/// For more information on Kusto connection strings visit: +/// https://docs.microsoft.com/en-us/azure/kusto/api/connection-strings/kusto +#[derive(Debug, Default)] +pub struct ConnectionString<'a> { + /// The URI specifying the Kusto service endpoint. + /// For example, https://mycluster.kusto.windows.net or net.tcp://localhost + pub data_source: Option<&'a str>, + /// A Boolean value that instructs the client to perform Azure Active Directory login. + pub federated_security: Option, + /// A String value that instructs the client to perform user authentication with the indicated user name. + pub user_id: Option<&'a str>, + pub user_token: Option<&'a str>, + /// ... + pub password: Option<&'a str>, + /// A String value that provides the application client ID to use when authenticating. + pub application_client_id: Option<&'a str>, + /// A String value that provides the application key to use when authenticating using an application secret flow. + pub application_key: Option<&'a str>, + /// A String value that instructs the client to perform application authenticating with the specified bearer token. + pub application_token: Option<&'a str>, + /// ... + pub application_certificate: Option<&'a str>, + /// A String value that provides the thumbprint of the client + /// certificate to use when using an application client certificate authenticating flow. + pub application_certificate_thumbprint: Option<&'a str>, + /// A String value that provides the name or ID of the tenant in which the application is registered. + pub authority_id: Option<&'a str>, + /// Denotes if MSI authorization should be used + pub msi_auth: Option, + pub msi_params: Option<&'a str>, + pub az_cli: Option, +} + +impl<'a> PartialEq for ConnectionString<'a> { + fn eq(&self, other: &Self) -> bool { + self.data_source == other.data_source + && self.federated_security == other.federated_security + && self.user_id == other.user_id + && self.user_token == other.user_token + && self.password == other.password + && self.application_client_id == other.application_client_id + && self.application_key == other.application_key + && self.application_token == other.application_token + && self.application_certificate == other.application_certificate + && self.application_certificate_thumbprint == other.application_certificate_thumbprint + && self.authority_id == other.authority_id + && self.msi_auth == other.msi_auth + && self.msi_params == other.msi_params + && self.az_cli == other.az_cli + } +} + +impl<'a> ConnectionString<'a> { + pub fn new(connection_string: &'a str) -> Result { + let mut data_source = None; + let mut federated_security = None; + let mut user_id = None; + let mut user_token = None; + let mut password = None; + let mut application_client_id = None; + let mut application_token = None; + let mut application_key = None; + let mut application_certificate = None; + let mut application_certificate_thumbprint = None; + let mut authority_id = None; + let mut msi_auth = None; + let mut msi_params = None; + let mut az_cli = None; + + let kv_str_pairs = connection_string + .split(';') + .filter(|s| !s.chars().all(char::is_whitespace)); + + for kv_pair_str in kv_str_pairs { + let mut kv = kv_pair_str.trim().split('='); + let k = match kv.next().filter(|k| !k.chars().all(char::is_whitespace)) { + None => { + return Err(ConnectionStringError::ParsingError { + msg: "No key found".to_owned(), + }); + } + Some(k) => k, + }; + let v = match kv.next().filter(|k| !k.chars().all(char::is_whitespace)) { + None => return Err(ConnectionStringError::MissingValue { key: k.to_owned() }), + Some(v) => v, + }; + + if let Some(key) = ALIAS_MAP.get(&*k.to_ascii_lowercase()) { + match key { + ConnectionStringKey::DataSource => data_source = Some(v), + e @ ConnectionStringKey::FederatedSecurity => { + federated_security = Some(parse_boolean(v, e.to_str())?) + } + ConnectionStringKey::UserId => user_id = Some(v), + ConnectionStringKey::UserToken => user_token = Some(v), + ConnectionStringKey::Password => password = Some(v), + ConnectionStringKey::ApplicationClientId => application_client_id = Some(v), + ConnectionStringKey::ApplicationToken => application_token = Some(v), + ConnectionStringKey::ApplicationKey => application_key = Some(v), + ConnectionStringKey::ApplicationCertificate => { + application_certificate = Some(v) + } + ConnectionStringKey::ApplicationCertificateThumbprint => { + application_certificate_thumbprint = Some(v) + } + ConnectionStringKey::AuthorityId => authority_id = Some(v), + e @ ConnectionStringKey::MsiAuth => { + msi_auth = Some(parse_boolean(v, e.to_str())?) + } + ConnectionStringKey::MsiParams => msi_params = Some(v), + e @ ConnectionStringKey::AzCli => az_cli = Some(parse_boolean(v, e.to_str())?), + } + } else { + return Err(ConnectionStringError::UnexpectedKey { key: k.to_owned() }); + } + } + + Ok(Self { + data_source, + federated_security, + user_id, + user_token, + password, + application_client_id, + application_key, + application_token, + application_certificate, + application_certificate_thumbprint, + authority_id, + msi_auth, + msi_params, + az_cli, + }) + } +} + +fn parse_boolean(term: &str, name: &str) -> Result { + match term.to_lowercase().as_str() { + "true" => Ok(true), + "false" => Ok(false), + _ => Err(ConnectionStringError::ParsingError { + msg: format!( + "Unexpected value for {}: {}. Please specify either 'true' or 'false'.", + name, term + ), + }), + } +} + +#[cfg(test)] +mod tests { + #[allow(unused_imports)] + use super::*; + + #[test] + fn it_parses_empty_connection_string() { + assert_eq!( + ConnectionString::new("").unwrap(), + ConnectionString::default() + ); + } + + #[test] + fn it_returns_expected_errors() { + assert!(matches!( + ConnectionString::new("Data Source="), + Err(ConnectionStringError::MissingValue { key }) if key == "Data Source" + )); + assert!(matches!( + ConnectionString::new("="), + Err(ConnectionStringError::ParsingError { msg: _ }) + )); + assert!(matches!( + ConnectionString::new("x=123;"), + Err(ConnectionStringError::UnexpectedKey { key }) if key == "x" + )); + } + + #[test] + fn it_parses_basic_cases() { + assert!(matches!( + ConnectionString::new("Data Source=ds"), + Ok(ConnectionString { + data_source: Some("ds"), + .. + }) + )); + assert!(matches!( + ConnectionString::new("addr=ds"), + Ok(ConnectionString { + data_source: Some("ds"), + .. + }) + )); + assert!(matches!( + ConnectionString::new("Application Client Id=cid;Application Key=key"), + Ok(ConnectionString { + application_client_id: Some("cid"), + application_key: Some("key"), + .. + }) + )); + assert!(matches!( + ConnectionString::new("Federated=True;AppToken=token"), + Ok(ConnectionString { + federated_security: Some(true), + application_token: Some("token"), + .. + }) + )); + } +} diff --git a/azure-kusto-data/src/error.rs b/azure-kusto-data/src/error.rs new file mode 100644 index 0000000..6da4fb2 --- /dev/null +++ b/azure-kusto-data/src/error.rs @@ -0,0 +1,47 @@ +//! Defines `KustoRsError` for representing failures in various operations. +use std::fmt::Debug; +use thiserror; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Error converting Kusto response for {0}")] + ConversionError(String), + + /// Error in external crate + #[error("Error in external crate {0}")] + ExternalError(String), + + /// Error raised when an invalid argument / option is provided. + #[error("Type conversion not available")] + InvalidArgumentError(String), + + /// Error raised when specific functionality is not (yet) implemented + #[error("Feature not implemented")] + NotImplemented(String), + + /// Error relating to (de-)serialization of JSON data + #[error(transparent)] + JsonError(#[from] serde_json::Error), + + /// Error occurring within core azure crates + #[error(transparent)] + AzureError(#[from] azure_core::Error), + + /// Errors raised when parsing connection information + #[error("Configuration error: {0}")] + ConfigurationError(#[from] crate::connection_string::ConnectionStringError), +} + +pub type Result = std::result::Result; + +impl From for Error { + fn from(err: azure_core::error::Error) -> Self { + Self::AzureError(err.into()) + } +} + +impl From for Error { + fn from(error: azure_core::StreamError) -> Self { + Self::AzureError(azure_core::Error::Stream(error)) + } +} diff --git a/azure-kusto-data/src/lib.rs b/azure-kusto-data/src/lib.rs index 15799d8..883cabe 100644 --- a/azure-kusto-data/src/lib.rs +++ b/azure-kusto-data/src/lib.rs @@ -1,14 +1,8 @@ -fn nothing() -> u32 { - return 42; -} - -#[cfg(test)] -mod tests { - use crate::nothing; - - #[test] - fn it_works() { - let result = 2 + nothing(); - assert_eq!(result, 44); - } -} +#[cfg(feature = "arrow")] +mod arrow; +pub mod authorization_policy; +pub mod client; +pub mod connection_string; +pub mod error; +mod operations; +pub mod prelude; diff --git a/azure-kusto-data/src/operations/mod.rs b/azure-kusto-data/src/operations/mod.rs new file mode 100644 index 0000000..67350db --- /dev/null +++ b/azure-kusto-data/src/operations/mod.rs @@ -0,0 +1 @@ +pub mod query; diff --git a/azure-kusto-data/src/operations/query.rs b/azure-kusto-data/src/operations/query.rs new file mode 100644 index 0000000..2a75db1 --- /dev/null +++ b/azure-kusto-data/src/operations/query.rs @@ -0,0 +1,216 @@ +#[cfg(feature = "arrow")] +use crate::arrow::convert_table; +use crate::client::KustoClient; +#[cfg(feature = "arrow")] +use arrow::record_batch::RecordBatch; +use async_convert::TryFrom; +use azure_core::prelude::*; +use azure_core::setters; +use azure_core::{collect_pinned_stream, Response as HttpResponse}; +use futures::future::BoxFuture; +use serde::{Deserialize, Serialize}; + +type ExecuteQuery = BoxFuture<'static, crate::error::Result>; + +#[derive(Debug, Serialize, Deserialize)] +struct QueryBody { + /// Name of the database in scope that is the target of the query or control command + db: String, + /// Text of the query or control command to execute + csl: String, +} + +#[derive(Debug, Clone)] +pub struct ExecuteQueryBuilder { + client: KustoClient, + database: String, + query: String, + client_request_id: Option, + app: Option, + user: Option, + context: Context, +} + +impl ExecuteQueryBuilder { + pub(crate) fn new( + client: KustoClient, + database: String, + query: String, + context: Context, + ) -> Self { + Self { + client, + database, + query: query.trim().into(), + client_request_id: None, + app: None, + user: None, + context, + } + } + + setters! { + client_request_id: ClientRequestId => Some(client_request_id), + app: App => Some(app), + user: User => Some(user), + query: String => query, + database: String => database, + context: Context => context, + } + + pub fn into_future(self) -> ExecuteQuery { + let this = self.clone(); + let ctx = self.context.clone(); + + Box::pin(async move { + let url = this.client.query_url(); + let mut request = this.client.prepare_request(url, http::Method::POST); + + if let Some(request_id) = &this.client_request_id { + request.insert_headers(request_id); + }; + if let Some(app) = &this.app { + request.insert_headers(app); + }; + if let Some(user) = &this.user { + request.insert_headers(user); + }; + + let body = QueryBody { + db: this.database, + csl: this.query, + }; + let bytes = bytes::Bytes::from(serde_json::to_string(&body)?); + request.insert_headers(&ContentLength::new(bytes.len() as i32)); + request.set_body(bytes.into()); + + let response = self + .client + .pipeline() + .send(&mut ctx.clone(), &mut request) + .await?; + + >::try_from(response).await + }) + } +} + +// TODO enable once in stable +// #[cfg(feature = "into_future")] +// impl std::future::IntoFuture for ExecuteQueryBuilder { +// type IntoFuture = ExecuteQuery; +// type Output = ::Output; +// fn into_future(self) -> Self::IntoFuture { +// Self::into_future(self) +// } +// } + +#[derive(Debug, Clone)] +pub struct KustoResponseDataSetV2 { + pub tables: Vec, +} + +#[async_convert::async_trait] +impl async_convert::TryFrom for KustoResponseDataSetV2 { + type Error = crate::error::Error; + + async fn try_from(response: HttpResponse) -> Result { + let (_status_code, _header_map, pinned_stream) = response.deconstruct(); + let data = collect_pinned_stream(pinned_stream).await?; + let tables: Vec = serde_json::from_slice(&data.to_vec())?; + Ok(Self { tables }) + } +} + +impl KustoResponseDataSetV2 { + pub fn table_count(&self) -> usize { + self.tables.len() + } + + /// Consumes the response into an iterator over all PrimaryResult tables within the response dataset + pub fn into_primary_results(self) -> impl Iterator { + self.tables + .into_iter() + .filter(|t| matches!(t, ResultTable::DataTable(tbl) if tbl.table_kind == TableKind::PrimaryResult)) + .map(|t| match t { + ResultTable::DataTable(tbl) => tbl, + _ => unreachable!("All other variants are excluded by filter"), + }) + } + + #[cfg(feature = "arrow")] + pub fn into_record_batches(self) -> impl Iterator { + self.into_primary_results().map(convert_table) + } +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(rename_all = "PascalCase", tag = "FrameType")] +#[allow(clippy::enum_variant_names)] +pub enum ResultTable { + DataSetHeader(DataSetHeader), + DataTable(DataTable), + DataSetCompletion(DataSetCompletion), +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct DataSetHeader { + pub is_progressive: bool, + pub version: String, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct DataTable { + pub table_id: i32, + pub table_name: String, + pub table_kind: TableKind, + pub columns: Vec, + pub rows: Vec>, +} + +/// Categorizes data tables according to the role they play in the data set that a Kusto query returns. +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +pub enum TableKind { + PrimaryResult, + QueryCompletionInformation, + QueryTraceLog, + QueryPerfLog, + TableOfContents, + QueryProperties, + QueryPlan, + Unknown, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct Column { + pub column_name: String, + pub column_type: ColumnType, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[serde(rename_all = "snake_case")] +pub enum ColumnType { + Bool, + Boolean, + Datetime, + Date, + Dynamic, + Guid, + Int, + Long, + Real, + String, + Timespan, + Time, + Decimal, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] +#[serde(rename_all = "PascalCase")] +pub struct DataSetCompletion { + pub has_errors: bool, + pub cancelled: bool, +} diff --git a/azure-kusto-data/src/prelude.rs b/azure-kusto-data/src/prelude.rs new file mode 100644 index 0000000..91ebb23 --- /dev/null +++ b/azure-kusto-data/src/prelude.rs @@ -0,0 +1,16 @@ +//! The kusto prelude. +//! +//! The prelude re-exports most commonly used items from this crate. +//! +//! # Examples +//! +//! Import the prelude with: +//! +//! ``` +//! # #[allow(unused_imports)] +//! use azure_kusto_data::prelude::*; +//! ``` + +pub use crate::client::{KustoClient, KustoClientOptions}; +pub use crate::connection_string::ConnectionStringBuilder; +pub use crate::operations::query::{KustoResponseDataSetV2, ResultTable}; diff --git a/azure-kusto-data/tests/arrow.rs b/azure-kusto-data/tests/arrow.rs new file mode 100644 index 0000000..fd47449 --- /dev/null +++ b/azure-kusto-data/tests/arrow.rs @@ -0,0 +1,16 @@ +#![cfg(feature = "arrow")] + +use azure_kusto_data::prelude::{KustoResponseDataSetV2, ResultTable}; +use std::path::PathBuf; + +#[test] +fn asd() { + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("tests/inputs/dataframe.json"); + + let data = std::fs::read_to_string(path).unwrap(); + let tables: Vec = serde_json::from_str(&data).unwrap(); + let response = KustoResponseDataSetV2 { tables }; + let record_batches = response.into_record_batches().collect::>(); + println!("{:?}", record_batches) +} diff --git a/azure-kusto-data/tests/inputs/adminthenquery.json b/azure-kusto-data/tests/inputs/adminthenquery.json new file mode 100644 index 0000000..ca248df --- /dev/null +++ b/azure-kusto-data/tests/inputs/adminthenquery.json @@ -0,0 +1,194 @@ +{ + "Tables": [ + { + "TableName": "Table_0", + "Columns": [ + { + "ColumnName": "DatabaseName", + "DataType": "String" + }, + { + "ColumnName": "TableName", + "DataType": "String" + } + ], + "Rows": [ + ["Kuskus", "KustoLogs"], + ["Kuskus", "LiorTmp"] + ] + }, + { + "TableName": "Table_1", + "Columns": [ + { + "ColumnName": "Value", + "DataType": "String" + } + ], + "Rows": [ + [ + "{\"Visualization\": null,\"Title\": null,\"XColumn\": null,\"Series\": null,\"YColumns\": null,\"XTitle\": null,\"YTitle\": null,\"XAxis\": null,\"YAxis\": null,\"Legend\": null,\"YSplit\": null,\"Accumulate\": false,\"IsQuerySorted\": false,\"Kind\": null}" + ] + ] + }, + { + "TableName": "Table_2", + "Columns": [ + { + "ColumnName": "Timestamp", + "DataType": "DateTime" + }, + { + "ColumnName": "Severity", + "DataType": "Int32" + }, + { + "ColumnName": "SeverityName", + "DataType": "String" + }, + { + "ColumnName": "StatusCode", + "DataType": "Int32" + }, + { + "ColumnName": "StatusDescription", + "DataType": "String" + }, + { + "ColumnName": "Count", + "DataType": "Int32" + }, + { + "ColumnName": "RequestId", + "DataType": "Guid" + }, + { + "ColumnName": "ActivityId", + "DataType": "Guid" + }, + { + "ColumnName": "SubActivityId", + "DataType": "Guid" + }, + { + "ColumnName": "ClientActivityId", + "DataType": "String" + } + ], + "Rows": [ + [ + "2018-08-12T09:13:19.5200972Z", + 4, + "Info", + 0, + "Querycompletedsuccessfully", + 1, + "b6651693-9325-41c8-a5bf-dcc21202cdf2", + "b6651693-9325-41c8-a5bf-dcc21202cdf2", + "1cfa59c2-7f29-4e58-aef8-590d4609818f", + "KPC.execute;721add30-f61a-44d0-ad89-812d06736260" + ], + [ + "2018-08-12T09:13:19.5200972Z", + 6, + "Stats", + 0, + { + "ExecutionTime": 0.0, + "resource_usage": { + "cache": { + "memory": { + "hits": 0, + "misses": 0, + "total": 0 + }, + "disk": { + "hits": 0, + "misses": 0, + "total": 0 + } + }, + "cpu": { + "user": "00:00:00", + "kernel": "00:00:00", + "total cpu": "00:00:00" + }, + "memory": { + "peak_per_node": 0 + } + }, + "input_dataset_statistics": { + "extents": { + "total": 0, + "scanned": 0 + }, + "rows": { + "total": 0, + "scanned": 0 + } + }, + "dataset_statistics": [ + { + "table_row_count": 2, + "table_size": 46 + } + ] + }, + 1, + "b6651693-9325-41c8-a5bf-dcc21202cdf2", + "b6651693-9325-41c8-a5bf-dcc21202cdf2", + "1cfa59c2-7f29-4e58-aef8-590d4609818f", + "KPC.execute;721add30-f61a-44d0-ad89-812d06736260" + ] + ] + }, + { + "TableName": "Table_3", + "Columns": [ + { + "ColumnName": "Ordinal", + "DataType": "Int64" + }, + { + "ColumnName": "Kind", + "DataType": "String" + }, + { + "ColumnName": "Name", + "DataType": "String" + }, + { + "ColumnName": "Id", + "DataType": "String" + }, + { + "ColumnName": "PrettyName", + "DataType": "String" + } + ], + "Rows": [ + [ + 0, + "QueryResult", + "PrimaryResult", + "d6331ef2-d9f7-4d8c-8268-99f574babc82", + "" + ], + [ + 1, + "QueryProperties", + "@ExtendedProperties", + "876ccb1a-818e-431f-9147-6b72547cca3d", + "" + ], + [ + 2, + "QueryStatus", + "QueryStatus", + "00000000-0000-0000-0000-000000000000", + "" + ] + ] + } + ] +} diff --git a/azure-kusto-data/tests/inputs/dataframe.json b/azure-kusto-data/tests/inputs/dataframe.json new file mode 100644 index 0000000..b4a1fb7 --- /dev/null +++ b/azure-kusto-data/tests/inputs/dataframe.json @@ -0,0 +1,216 @@ +[ + { + "FrameType": "DataSetHeader", + "IsProgressive": false, + "Version": "v2.0" + }, + { + "FrameType": "DataTable", + "TableId": 0, + "TableName": "@ExtendedProperties", + "TableKind": "QueryProperties", + "Columns": [ + { + "ColumnName": "TableId", + "ColumnType": "int" + }, + { + "ColumnName": "Key", + "ColumnType": "string" + }, + { + "ColumnName": "Value", + "ColumnType": "dynamic" + } + ], + "Rows": [ + [ + 1, + "Visualization", + "{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null}" + ] + ] + }, + { + "FrameType": "DataTable", + "TableId": 1, + "TableName": "temp", + "TableKind": "PrimaryResult", + "Columns": [ + { + "ColumnName": "RecordName", + "ColumnType": "string" + }, + { + "ColumnName": "RecordTime", + "ColumnType": "datetime" + }, + { + "ColumnName": "RecordOffset", + "ColumnType": "timespan" + }, + { + "ColumnName": "RecordBool", + "ColumnType": "bool" + }, + { + "ColumnName": "RecordInt", + "ColumnType": "int" + }, + { + "ColumnName": "RecordReal", + "ColumnType": "real" + } + ], + "Rows": [ + [ + "now", + "2021-12-22T11:43:00Z", + "1.01:01:01.0", + true, + 5678, + 3.14159 + ], + [ + "earliest datetime", + "1677-09-21T00:12:44Z", + "1.01:01:01.0", + true, + 5678, + "NaN" + ], + [ + "latest datetime", + "2021-12-31T23:59:59Z", + "1.01:01:01.0", + true, + 5678, + "Infinity" + ], + [ + "earliest arrow datetime", + "1677-09-21T00:12:44Z", + "1.01:01:01.0", + true, + 5678, + "-Infinity" + ], + [ + "latest pandas datetime", + "2262-04-11T23:47:16Z", + "1.01:01:01.0", + true, + 5678, + 3.14159 + ], + [ + "timedelta ticks", + "2021-12-22T11:43:00Z", + "1.01:01:01.0", + true, + 5678, + 3.14159 + ], + [ + "timedelta string", + "2021-12-22T11:43:00Z", + "1.01:01:01.0", + true, + 5678, + 3.14159 + ], + [null, "", "", false, 0, 0] + ] + }, + { + "FrameType": "DataTable", + "TableId": 2, + "TableName": "QueryCompletionInformation", + "TableKind": "QueryCompletionInformation", + "Columns": [ + { + "ColumnName": "Timestamp", + "ColumnType": "datetime" + }, + { + "ColumnName": "ClientRequestId", + "ColumnType": "string" + }, + { + "ColumnName": "ActivityId", + "ColumnType": "guid" + }, + { + "ColumnName": "SubActivityId", + "ColumnType": "guid" + }, + { + "ColumnName": "ParentActivityId", + "ColumnType": "guid" + }, + { + "ColumnName": "Level", + "ColumnType": "int" + }, + { + "ColumnName": "LevelName", + "ColumnType": "string" + }, + { + "ColumnName": "StatusCode", + "ColumnType": "int" + }, + { + "ColumnName": "StatusCodeName", + "ColumnType": "string" + }, + { + "ColumnName": "EventType", + "ColumnType": "int" + }, + { + "ColumnName": "EventTypeName", + "ColumnType": "string" + }, + { + "ColumnName": "Payload", + "ColumnType": "string" + } + ], + "Rows": [ + [ + "2018-05-01T09:32:38.916566Z", + "unspecified;e8e72755-786b-4bdc-835d-ea49d63d09fd", + "5935a050-e466-48a0-991d-0ec26bd61c7e", + "8182b177-7a80-4158-aca8-ff4fd8e7d3f8", + "6f3c1072-2739-461c-8aa7-3cfc8ff528a8", + 4, + "Info", + 0, + "S_OK (0)", + 4, + "QueryInfo", + "{\"Count\":1,\"Text\":\"Querycompletedsuccessfully\"}" + ], + [ + "2018-05-01T09:32:38.916566Z", + "unspecified;e8e72755-786b-4bdc-835d-ea49d63d09fd", + "5935a050-e466-48a0-991d-0ec26bd61c7e", + "8182b177-7a80-4158-aca8-ff4fd8e7d3f8", + "6f3c1072-2739-461c-8aa7-3cfc8ff528a8", + 6, + "Stats", + 0, + "S_OK (0)", + 0, + "QueryResourceConsumption", + "{\"ExecutionTime\":0.0156222,\"resource_usage\":{\"cache\":{\"memory\":{\"hits\":13,\"misses\":0,\"total\":13},\"disk\":{\"hits\":0,\"misses\":0,\"total\":0}},\"cpu\":{\"user\":\"00: 00: 00\",\"kernel\":\"00: 00: 00\",\"totalcpu\":\"00: 00: 00\"},\"memory\":{\"peak_per_node\":16777312}},\"dataset_statistics\":[{\"table_row_count\":3,\"table_size\":191}]}" + ] + ] + }, + { + "FrameType": "DataSetCompletion", + "HasErrors": false, + "Cancelled": false + } +] diff --git a/codecov.yml b/codecov.yml index e69de29..bfdc987 100644 --- a/codecov.yml +++ b/codecov.yml @@ -0,0 +1,8 @@ +coverage: + status: + project: + default: + informational: true + patch: + default: + informational: true