diff --git a/azure-kusto-data/Cargo.toml b/azure-kusto-data/Cargo.toml index a34cc4d..68a461f 100644 --- a/azure-kusto-data/Cargo.toml +++ b/azure-kusto-data/Cargo.toml @@ -12,25 +12,24 @@ keywords = ["sdk", "azure", "kusto", "azure-data-explorer"] categories = ["api-bindings"] [dependencies] -arrow = { version = "13", optional = true } -azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "66db4b485ce56b68be148708d9c810960a50be51", features = [ +arrow = { version = "15.0.0", optional = true } +azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "8586a66b20fba463c39156f0390e583ec305ab2d", features = [ "enable_reqwest", "enable_reqwest_gzip", ] } -azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "66db4b485ce56b68be148708d9c810960a50be51" } -async-trait = "0.1" -async-convert = "1" -bytes = "1" -futures = "0.3" -http = "0.2" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1" -serde_with = { version = "1.12.0", features = ["json"] } -thiserror = "1" -lazy_static = "1.4.0" -hashbrown = "0.12.0" -regex = "1.5.5" -time = { version = "0.3.9", features = [ +azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "8586a66b20fba463c39156f0390e583ec305ab2d" } +async-trait = "0.1.56" +async-convert = "1.0.0" +bytes = "1.1.0" +futures = "0.3.21" +http = "0.2.8" +serde = { version = "1.0.137", features = ["derive"] } +serde_json = "1.0.81" +serde_with = { version = "1.12.1", features = ["json"] } +thiserror = "1.0.31" +hashbrown = "0.12.1" +regex = "1.5.6" +time = { version = "0.3.11", features = [ "serde", "parsing", "formatting", @@ -38,14 +37,15 @@ time = { version = "0.3.9", features = [ "serde-well-known", ] } derive_builder = "0.11.2" +once_cell = "1.12.0" [dev-dependencies] -arrow = { version = "13", features = ["prettyprint"] } -dotenv = "*" +arrow = { version = "15.0.0", features = ["prettyprint"] } +dotenv = "0.15.0" env_logger = "0.9" -tokio = { version = "1", features = ["macros"] } -chrono = "*" -oauth2 = "*" +tokio = { version = "1.19.2", features = ["macros"] } +chrono = "0.4.19" +oauth2 = "4.2.0" [features] default = ["arrow"] diff --git a/azure-kusto-data/examples/management.rs b/azure-kusto-data/examples/management.rs index 7fc470d..fcab846 100644 --- a/azure-kusto-data/examples/management.rs +++ b/azure-kusto-data/examples/management.rs @@ -29,13 +29,13 @@ async fn main() -> Result<(), Box> { &client_secret, ); - let client = KustoClient::try_from(kcsb).unwrap(); + let client = KustoClient::try_from(kcsb).expect("Failed to create Kusto client"); let response = client .execute_command(database, query) .into_future() .await - .unwrap(); + .expect("Failed to execute query"); println!("command response: {:?}", response); diff --git a/azure-kusto-data/examples/query.rs b/azure-kusto-data/examples/query.rs index 38d7728..d9587a2 100644 --- a/azure-kusto-data/examples/query.rs +++ b/azure-kusto-data/examples/query.rs @@ -29,13 +29,13 @@ async fn main() -> Result<(), Box> { &client_secret, ); - let client = KustoClient::try_from(kcsb).unwrap(); + let client = KustoClient::try_from(kcsb).expect("Failed to create Kusto client"); let response = client .execute_query(database, query) .into_future() .await - .unwrap(); + .expect("Failed to execute query"); for table in &response.tables { match table { diff --git a/azure-kusto-data/src/arrow.rs b/azure-kusto-data/src/arrow.rs index 36d1206..81b76d9 100644 --- a/azure-kusto-data/src/arrow.rs +++ b/azure-kusto-data/src/arrow.rs @@ -16,12 +16,12 @@ use azure_core::error::{ErrorKind, ResultExt}; use crate::error::Result; use crate::models::ColumnType; -use crate::models::*; +use crate::models::{Column, DataTable}; use crate::types::{KustoDateTime, KustoDuration}; fn convert_array_string(values: Vec) -> Result { let strings: Vec> = serde_json::from_value(serde_json::Value::Array(values))?; - let strings: Vec> = strings.iter().map(|opt| opt.as_deref()).collect(); + let strings: Vec> = strings.iter().map(Option::as_deref).collect(); Ok(Arc::new(StringArray::from(strings))) } @@ -85,42 +85,23 @@ fn convert_array_i64(values: Vec) -> Result { Ok(Arc::new(Int64Array::from(ints))) } -pub fn convert_column(data: Vec, column: Column) -> Result<(Field, ArrayRef)> { +pub fn convert_column(data: Vec, column: &Column) -> Result<(Field, ArrayRef)> { + let column_name = &column.column_name; match column.column_type { - ColumnType::String => convert_array_string(data).map(|data| { - ( - Field::new(column.column_name.as_str(), DataType::Utf8, true), - data, - ) - }), - ColumnType::Bool | ColumnType::Boolean => convert_array_bool(data).map(|data| { - ( - Field::new(column.column_name.as_str(), DataType::Boolean, true), - data, - ) - }), - ColumnType::Int => convert_array_i32(data).map(|data| { - ( - Field::new(column.column_name.as_str(), DataType::Int32, true), - data, - ) - }), - ColumnType::Long => convert_array_i64(data).map(|data| { - ( - Field::new(column.column_name.as_str(), DataType::Int64, true), - data, - ) - }), - ColumnType::Real => convert_array_float(data).map(|data| { - ( - Field::new(column.column_name.as_str(), DataType::Float64, true), - data, - ) - }), + ColumnType::String => convert_array_string(data) + .map(|data| (Field::new(column_name, DataType::Utf8, true), data)), + ColumnType::Bool | ColumnType::Boolean => convert_array_bool(data) + .map(|data| (Field::new(column_name, DataType::Boolean, true), data)), + ColumnType::Int => convert_array_i32(data) + .map(|data| (Field::new(column_name, DataType::Int32, true), data)), + ColumnType::Long => convert_array_i64(data) + .map(|data| (Field::new(column_name, DataType::Int64, true), data)), + ColumnType::Real => convert_array_float(data) + .map(|data| (Field::new(column_name, DataType::Float64, true), data)), ColumnType::Datetime => convert_array_datetime(data).map(|data| { ( Field::new( - column.column_name.as_str(), + column_name, DataType::Timestamp(TimeUnit::Nanosecond, None), true, ), @@ -129,11 +110,7 @@ pub fn convert_column(data: Vec, column: Column) -> Result<(F }), ColumnType::Timespan => convert_array_timespan(data).map(|data| { ( - Field::new( - column.column_name.as_str(), - DataType::Duration(TimeUnit::Nanosecond), - true, - ), + Field::new(column_name, DataType::Duration(TimeUnit::Nanosecond), true), data, ) }), @@ -158,7 +135,7 @@ pub fn convert_table(table: DataTable) -> Result { buffer .into_iter() .zip(table.columns.into_iter()) - .map(|(data, column)| convert_column(data, column)) + .map(|(data, column)| convert_column(data, &column)) .try_for_each::<_, Result<()>>(|result| { let (field, data) = result?; fields.push(field); @@ -173,6 +150,7 @@ pub fn convert_table(table: DataTable) -> Result { #[cfg(test)] mod tests { use super::*; + use crate::models::TableKind; use crate::operations::query::{KustoResponseDataSetV2, ResultTable}; use std::path::PathBuf; @@ -188,7 +166,7 @@ mod tests { column_name: "int_col".to_string(), column_type: ColumnType::Int, }; - assert_eq!(c, ref_col) + assert_eq!(c, ref_col); } #[test] @@ -218,7 +196,7 @@ mod tests { }], rows: vec![], }; - assert_eq!(t, ref_tbl) + assert_eq!(t, ref_tbl); } #[test] @@ -226,15 +204,16 @@ mod tests { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); path.push("tests/inputs/dataframe.json"); - let data = std::fs::read_to_string(path).unwrap(); - let tables: Vec = serde_json::from_str(&data).unwrap(); + let data = std::fs::read_to_string(path).expect("Failed to read file"); + let tables: Vec = + serde_json::from_str(&data).expect("Failed to deserialize result table"); let response = KustoResponseDataSetV2 { tables }; let record_batches = response .into_record_batches() .collect::, _>>() - .unwrap(); + .expect("Failed to convert to record batches"); assert!(record_batches[0].num_columns() > 0); - assert!(record_batches[0].num_rows() > 0) + assert!(record_batches[0].num_rows() > 0); } } diff --git a/azure-kusto-data/src/authorization_policy.rs b/azure-kusto-data/src/authorization_policy.rs index 990fc30..6a01477 100644 --- a/azure-kusto-data/src/authorization_policy.rs +++ b/azure-kusto-data/src/authorization_policy.rs @@ -1,6 +1,5 @@ +use azure_core::headers::{HeaderValue, AUTHORIZATION}; use azure_core::{auth::TokenCredential, Context, Policy, PolicyResult, Request}; -use http::header::AUTHORIZATION; -use http::HeaderValue; use std::sync::Arc; #[derive(Clone)] @@ -44,11 +43,9 @@ impl Policy for AuthorizationPolicy { ); let token = self.credential.get_token(&self.resource).await?; - let auth_header_value = format!("Bearer {}", token.token.secret().clone()); + let auth_header_value = format!("Bearer {}", token.token.secret()); - request - .headers_mut() - .insert(AUTHORIZATION, HeaderValue::from_str(&auth_header_value)?); + request.insert_header(AUTHORIZATION, HeaderValue::from(auth_header_value)); next[0].send(ctx, request, &next[1..]).await } diff --git a/azure-kusto-data/src/client.rs b/azure-kusto-data/src/client.rs index 55d12ac..5c596e6 100644 --- a/azure-kusto-data/src/client.rs +++ b/azure-kusto-data/src/client.rs @@ -3,19 +3,17 @@ use crate::connection_string::{ConnectionString, ConnectionStringBuilder}; use crate::error::Result; use crate::operations::query::{QueryRunner, QueryRunnerBuilder, V1QueryRunner, V2QueryRunner}; use azure_core::auth::TokenCredential; -use azure_core::prelude::*; -use azure_core::{ClientOptions, Context, Pipeline, Request}; -use azure_identity::token_credentials::{ + +use azure_core::{ClientOptions, Context, Pipeline}; +use azure_identity::{ AzureCliCredential, ClientSecretCredential, DefaultAzureCredential, ImdsManagedIdentityCredential, TokenCredentialOptions, }; -use http::Uri; + use std::convert::TryFrom; use std::fmt::Debug; use std::sync::Arc; -const API_VERSION: &str = "2019-02-13"; - /// Options for specifying how a Kusto client will behave #[derive(Clone, Default)] pub struct KustoClientOptions { @@ -24,6 +22,7 @@ pub struct KustoClientOptions { impl KustoClientOptions { /// Create new options + #[must_use] pub fn new() -> Self { Self::default() } @@ -57,7 +56,7 @@ fn new_pipeline_from_options( /// Kusto client for Rust. /// The client is a wrapper around the Kusto REST API. -/// To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/ +/// To read more about it, go to [https://docs.microsoft.com/en-us/azure/kusto/api/rest/](https://docs.microsoft.com/en-us/azure/kusto/api/rest/) /// /// The primary methods are: /// `execute_query`: executes a KQL query against the Kusto service. @@ -116,11 +115,11 @@ impl KustoClient { .with_query(query.into()) .with_context(Context::new()) .build() - .unwrap() + .expect("Unexpected error when building query runner - please report this issue to the Kusto team") } /// Execute a KQL query. - /// To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/ + /// To learn more about KQL go to [https://docs.microsoft.com/en-us/azure/kusto/query/](https://docs.microsoft.com/en-us/azure/kusto/query) /// /// # Arguments /// @@ -142,20 +141,7 @@ impl KustoClient { V1QueryRunner(self.execute(database, query, QueryKind::Management)) } - pub(crate) fn prepare_request(&self, uri: Uri, http_method: http::Method) -> Request { - let mut request = Request::new(uri, http_method); - request.insert_headers(&Version::from(API_VERSION)); - request.insert_headers(&Accept::from("application/json")); - request.insert_headers(&ContentType::new("application/json; charset=utf-8")); - request.insert_headers(&AcceptEncoding::from("gzip")); - request.insert_headers(&ClientVersion::from(format!( - "Kusto.Rust.Client:{}", - env!("CARGO_PKG_VERSION"), - ))); - request - } - - pub(crate) fn pipeline(&self) -> &Pipeline { + pub(crate) const fn pipeline(&self) -> &Pipeline { &self.pipeline } } @@ -183,7 +169,7 @@ impl<'a> TryFrom> for KustoClient { ConnectionString { msi_auth: Some(true), .. - } => Arc::new(ImdsManagedIdentityCredential {}), + } => Arc::new(ImdsManagedIdentityCredential::default()), ConnectionString { az_cli: Some(true), .. } => Arc::new(AzureCliCredential {}), diff --git a/azure-kusto-data/src/connection_string.rs b/azure-kusto-data/src/connection_string.rs index 70a73c3..957f487 100644 --- a/azure-kusto-data/src/connection_string.rs +++ b/azure-kusto-data/src/connection_string.rs @@ -1,8 +1,9 @@ // Set of properties that can be use in a connection string provided to KustoConnectionStringBuilder. // For a complete list of properties go to https://docs.microsoft.com/en-us/azure/kusto/api/connection-strings/kusto +use crate::error::ConnectionStringError; use hashbrown::HashMap; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; enum ConnectionStringKey { DataSource, @@ -22,7 +23,7 @@ enum ConnectionStringKey { } impl ConnectionStringKey { - fn to_str(&self) -> &'static str { + const fn to_str(&self) -> &'static str { match self { ConnectionStringKey::DataSource => "Data Source", ConnectionStringKey::FederatedSecurity => "AAD Federated Security", @@ -44,81 +45,79 @@ impl ConnectionStringKey { } } -lazy_static! { - static ref ALIAS_MAP: HashMap<&'static str, ConnectionStringKey> = { - let mut m = HashMap::new(); - m.insert("data source", ConnectionStringKey::DataSource); - m.insert("addr", ConnectionStringKey::DataSource); - m.insert("address", ConnectionStringKey::DataSource); - m.insert("network address", ConnectionStringKey::DataSource); - m.insert("server", ConnectionStringKey::DataSource); +static ALIAS_MAP: Lazy> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert("data source", ConnectionStringKey::DataSource); + m.insert("addr", ConnectionStringKey::DataSource); + m.insert("address", ConnectionStringKey::DataSource); + m.insert("network address", ConnectionStringKey::DataSource); + m.insert("server", ConnectionStringKey::DataSource); - m.insert( - "aad federated security", - ConnectionStringKey::FederatedSecurity, - ); - m.insert("federated security", ConnectionStringKey::FederatedSecurity); - m.insert("federated", ConnectionStringKey::FederatedSecurity); - m.insert("fed", ConnectionStringKey::FederatedSecurity); - m.insert("aadfed", ConnectionStringKey::FederatedSecurity); + m.insert( + "aad federated security", + ConnectionStringKey::FederatedSecurity, + ); + m.insert("federated security", ConnectionStringKey::FederatedSecurity); + m.insert("federated", ConnectionStringKey::FederatedSecurity); + m.insert("fed", ConnectionStringKey::FederatedSecurity); + m.insert("aadfed", ConnectionStringKey::FederatedSecurity); - m.insert("aad user id", ConnectionStringKey::UserId); - m.insert("user id", ConnectionStringKey::UserId); - m.insert("uid", ConnectionStringKey::UserId); - m.insert("user", ConnectionStringKey::UserId); + m.insert("aad user id", ConnectionStringKey::UserId); + m.insert("user id", ConnectionStringKey::UserId); + m.insert("uid", ConnectionStringKey::UserId); + m.insert("user", ConnectionStringKey::UserId); - m.insert("password", ConnectionStringKey::Password); - m.insert("pwd", ConnectionStringKey::Password); + m.insert("password", ConnectionStringKey::Password); + m.insert("pwd", ConnectionStringKey::Password); - m.insert( - "application client id", - ConnectionStringKey::ApplicationClientId, - ); - m.insert("appclientid", ConnectionStringKey::ApplicationClientId); + m.insert( + "application client id", + ConnectionStringKey::ApplicationClientId, + ); + m.insert("appclientid", ConnectionStringKey::ApplicationClientId); - m.insert("application key", ConnectionStringKey::ApplicationKey); - m.insert("appkey", ConnectionStringKey::ApplicationKey); + m.insert("application key", ConnectionStringKey::ApplicationKey); + m.insert("appkey", ConnectionStringKey::ApplicationKey); - m.insert( - "application certificate", - ConnectionStringKey::ApplicationCertificate, - ); + m.insert( + "application certificate", + ConnectionStringKey::ApplicationCertificate, + ); - m.insert( - "application certificate thumbprint", - ConnectionStringKey::ApplicationCertificateThumbprint, - ); - m.insert( - "appcert", - ConnectionStringKey::ApplicationCertificateThumbprint, - ); + m.insert( + "application certificate thumbprint", + ConnectionStringKey::ApplicationCertificateThumbprint, + ); + m.insert( + "appcert", + ConnectionStringKey::ApplicationCertificateThumbprint, + ); - m.insert("authority id", ConnectionStringKey::AuthorityId); - m.insert("authorityid", ConnectionStringKey::AuthorityId); - m.insert("authority", ConnectionStringKey::AuthorityId); - m.insert("tenantid", ConnectionStringKey::AuthorityId); - m.insert("tenant", ConnectionStringKey::AuthorityId); - m.insert("tid", ConnectionStringKey::AuthorityId); + m.insert("authority id", ConnectionStringKey::AuthorityId); + m.insert("authorityid", ConnectionStringKey::AuthorityId); + m.insert("authority", ConnectionStringKey::AuthorityId); + m.insert("tenantid", ConnectionStringKey::AuthorityId); + m.insert("tenant", ConnectionStringKey::AuthorityId); + m.insert("tid", ConnectionStringKey::AuthorityId); - m.insert("application token", ConnectionStringKey::ApplicationToken); - m.insert("apptoken", ConnectionStringKey::ApplicationToken); + m.insert("application token", ConnectionStringKey::ApplicationToken); + m.insert("apptoken", ConnectionStringKey::ApplicationToken); - m.insert("user token", ConnectionStringKey::UserToken); - m.insert("usertoken", ConnectionStringKey::UserToken); + m.insert("user token", ConnectionStringKey::UserToken); + m.insert("usertoken", ConnectionStringKey::UserToken); - m.insert("msi auth", ConnectionStringKey::MsiAuth); - m.insert("msi_auth", ConnectionStringKey::MsiAuth); - m.insert("msi", ConnectionStringKey::MsiAuth); + m.insert("msi auth", ConnectionStringKey::MsiAuth); + m.insert("msi_auth", ConnectionStringKey::MsiAuth); + m.insert("msi", ConnectionStringKey::MsiAuth); - m.insert("msi params", ConnectionStringKey::MsiParams); - m.insert("msi_params", ConnectionStringKey::MsiParams); - m.insert("msi_type", ConnectionStringKey::MsiParams); + m.insert("msi params", ConnectionStringKey::MsiParams); + m.insert("msi_params", ConnectionStringKey::MsiParams); + m.insert("msi_type", ConnectionStringKey::MsiParams); - m.insert("az cli", ConnectionStringKey::AzCli); + m.insert("az cli", ConnectionStringKey::AzCli); - m - }; -} + m +}); // TODO: when available // pub const PUBLIC_APPLICATION_CERTIFICATE_NAME: &str = "Public Application Certificate"; @@ -136,30 +135,21 @@ lazy_static! { ConnectionStringKey::ApplicationCertificateX5C => "Application Certificate x5c", */ -#[derive(Debug, thiserror::Error)] -pub enum ConnectionStringError { - #[error("Missing value for key '{}'", key)] - MissingValue { key: String }, - #[error("Unexpected key '{}'", key)] - UnexpectedKey { key: String }, - #[error("Parsing error: {}", msg)] - ParsingError { msg: String }, -} - /// Build a connection string to connect to a Kusto service instance. /// /// For more information on Kusto connection strings visit: -/// https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto +/// [https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto) #[derive(Default)] pub struct ConnectionStringBuilder<'a>(ConnectionString<'a>); impl<'a> ConnectionStringBuilder<'a> { - /// Creates a ConnectionStringBuilder with no configuration options set + /// Creates a `ConnectionStringBuilder` with no configuration options set + #[must_use] pub fn new() -> Self { Self(ConnectionString::default()) } - /// Creates a ConnectionStringBuilder that will authenticate with AAD application and key. + /// Creates a `ConnectionStringBuilder` that will authenticate with AAD application and key. /// /// # Arguments /// @@ -167,6 +157,7 @@ impl<'a> ConnectionStringBuilder<'a> { /// * `authority_id` - Authority id (aka Tenant id) must be provided /// * `client_id` - AAD application ID. /// * `client_secret` - Corresponding key of the AAD application. + #[must_use] pub fn new_with_aad_application_key_authentication( service_url: &'a str, authority_id: &'a str, @@ -183,6 +174,7 @@ impl<'a> ConnectionStringBuilder<'a> { }) } + #[must_use] pub fn build(&self) -> String { let mut kv_pairs = Vec::new(); @@ -266,7 +258,7 @@ impl<'a> ConnectionStringBuilder<'a> { /// A Kusto service connection string. /// /// For more information on Kusto connection strings visit: -/// https://docs.microsoft.com/en-us/azure/kusto/api/connection-strings/kusto +/// [https://docs.microsoft.com/en-us/azure/kusto/api/connection-strings/kusto](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto) #[derive(Debug, Default)] pub struct ConnectionString<'a> { /// The URI specifying the Kusto service endpoint. @@ -422,10 +414,7 @@ mod tests { #[test] fn it_parses_empty_connection_string() { - assert_eq!( - ConnectionString::new("").unwrap(), - ConnectionString::default() - ); + assert_eq!(ConnectionString::new(""), Ok(ConnectionString::default())); } #[test] diff --git a/azure-kusto-data/src/error.rs b/azure-kusto-data/src/error.rs index 09e23a2..d3d3e07 100644 --- a/azure-kusto-data/src/error.rs +++ b/azure-kusto-data/src/error.rs @@ -1,6 +1,6 @@ //! Defines `KustoRsError` for representing failures in various operations. -use http::uri::InvalidUri; use std::fmt::Debug; +use std::num::TryFromIntError; use thiserror; #[derive(thiserror::Error, Debug)] @@ -21,24 +21,34 @@ pub enum Error { NotImplemented(String), /// Error relating to (de-)serialization of JSON data - #[error(transparent)] + #[error("Error in JSON serialization/deserialization: {0}")] JsonError(#[from] serde_json::Error), /// Error occurring within core azure crates - #[error(transparent)] + #[error("Error in azure-core: {0}")] AzureError(#[from] azure_core::error::Error), /// Errors raised when parsing connection information - #[error("Configuration error: {0}")] - ConfigurationError(#[from] crate::connection_string::ConnectionStringError), + #[error("Connection string error: {0}")] + ConnectionStringError(#[from] ConnectionStringError), } -#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] pub enum InvalidArgumentError { - #[error(transparent)] - InvalidUri(#[from] InvalidUri), #[error("{0} is not a valid duration")] InvalidDuration(String), + #[error("{0} is too large to fit in a u32")] + PayloadTooLarge(#[from] TryFromIntError), +} + +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +pub enum ConnectionStringError { + #[error("Missing value for key '{}'", key)] + MissingValue { key: String }, + #[error("Unexpected key '{}'", key)] + UnexpectedKey { key: String }, + #[error("Parsing error: {}", msg)] + ParsingError { msg: String }, } pub type Result = std::result::Result; diff --git a/azure-kusto-data/src/operations/query.rs b/azure-kusto-data/src/operations/query.rs index a9d6228..34aeeb9 100644 --- a/azure-kusto-data/src/operations/query.rs +++ b/azure-kusto-data/src/operations/query.rs @@ -1,6 +1,7 @@ #[cfg(feature = "arrow")] use crate::arrow::convert_table; use crate::client::{KustoClient, QueryKind}; + use crate::error::{Error, InvalidArgumentError}; use crate::models::{ DataSetCompletion, DataSetHeader, DataTable, QueryBody, RequestProperties, TableKind, TableV1, @@ -9,8 +10,9 @@ use crate::request_options::RequestOptions; #[cfg(feature = "arrow")] use arrow::record_batch::RecordBatch; use async_convert::TryFrom; +use azure_core::error::Error as CoreError; use azure_core::prelude::*; -use azure_core::{collect_pinned_stream, Response as HttpResponse}; +use azure_core::{collect_pinned_stream, Request, Response as HttpResponse, Url}; use futures::future::BoxFuture; use futures::TryFutureExt; use serde::{Deserialize, Serialize}; @@ -45,15 +47,25 @@ pub struct V2QueryRunner(pub QueryRunner); impl V1QueryRunner { pub fn into_future(self) -> V1QueryRun { - let V1QueryRunner(query_runner) = self; - Box::pin(query_runner.into_future().map_ok(|e| e.try_into().unwrap())) + Box::pin(async { + let V1QueryRunner(query_runner) = self; + let future = query_runner.into_future().await?; + Ok( + std::convert::TryInto::try_into(future).expect("Unexpected conversion error from KustoResponse to KustoResponseDataSetV1 - please report this issue to the Kusto team") + ) + }) } } impl V2QueryRunner { pub fn into_future(self) -> V2QueryRun { - let V2QueryRunner(query_runner) = self; - Box::pin(query_runner.into_future().map_ok(|e| e.try_into().unwrap())) + Box::pin(async { + let V2QueryRunner(query_runner) = self; + let future = query_runner.into_future().await?; + Ok( + std::convert::TryInto::try_into(future).expect("Unexpected conversion error from KustoResponse to KustoResponseDataSetV2 - please report this issue to the Kusto team") + ) + }) } } @@ -67,10 +79,8 @@ impl QueryRunner { QueryKind::Management => this.client.management_url(), QueryKind::Query => this.client.query_url(), }; - let mut request = this.client.prepare_request( - url.parse().map_err(InvalidArgumentError::InvalidUri)?, - http::Method::POST, - ); + let mut request = + prepare_request(url.parse().map_err(CoreError::from)?, http::Method::POST); if let Some(request_id) = &this.client_request_id { request.insert_headers(request_id); @@ -91,8 +101,10 @@ impl QueryRunner { }), }; let bytes = bytes::Bytes::from(serde_json::to_string(&body)?); - request.insert_headers(&ContentLength::new(bytes.len() as i32)); - request.set_body(bytes.into()); + request.insert_headers(&ContentLength::new( + std::convert::TryInto::try_into(bytes.len()).map_err(InvalidArgumentError::from)?, + )); + request.set_body(bytes); let response = self .client @@ -159,11 +171,12 @@ impl std::convert::TryFrom for KustoResponseDataSetV1 { } impl KustoResponseDataSetV2 { + #[must_use] pub fn table_count(&self) -> usize { self.tables.len() } - /// Consumes the response into an iterator over all PrimaryResult tables within the response dataset + /// Consumes the response into an iterator over all `PrimaryResult` tables within the response dataset pub fn into_primary_results(self) -> impl Iterator { self.tables.into_iter().filter_map(|table| match table { ResultTable::DataTable(table) if table.table_kind == TableKind::PrimaryResult => { @@ -186,6 +199,7 @@ pub struct KustoResponseDataSetV1 { } impl KustoResponseDataSetV1 { + #[must_use] pub fn table_count(&self) -> usize { self.tables.len() } @@ -224,6 +238,21 @@ impl TryFrom for KustoResponseDataSetV1 { // } // } +pub fn prepare_request(uri: Url, http_method: http::Method) -> Request { + const API_VERSION: &str = "2019-02-13"; + + let mut request = Request::new(uri, http_method); + request.insert_headers(&Version::from(API_VERSION)); + request.insert_headers(&Accept::from("application/json")); + request.insert_headers(&ContentType::new("application/json; charset=utf-8")); + request.insert_headers(&AcceptEncoding::from("gzip")); + request.insert_headers(&ClientVersion::from(format!( + "Kusto.Rust.Client:{}", + env!("CARGO_PKG_VERSION"), + ))); + request +} + #[cfg(test)] mod tests { use super::*; @@ -244,7 +273,7 @@ mod tests { }"#; let parsed = serde_json::from_str::(data); - assert!(parsed.is_ok()) + assert!(parsed.is_ok()); } #[test] @@ -252,9 +281,11 @@ mod tests { let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); path.push("tests/inputs/adminthenquery.json"); - let data = std::fs::read_to_string(path).unwrap(); + let data = std::fs::read_to_string(&path) + .unwrap_or_else(|_| panic!("Failed to read {}", path.display())); - let parsed = serde_json::from_str::(&data).unwrap(); - assert_eq!(parsed.table_count(), 4) + let parsed = serde_json::from_str::(&data) + .expect("Failed to parse response"); + assert_eq!(parsed.table_count(), 4); } } diff --git a/azure-kusto-data/src/prelude.rs b/azure-kusto-data/src/prelude.rs index 2a7c1d2..15f47a1 100644 --- a/azure-kusto-data/src/prelude.rs +++ b/azure-kusto-data/src/prelude.rs @@ -17,8 +17,8 @@ pub use crate::operations::query::{ KustoResponse, KustoResponseDataSetV1, KustoResponseDataSetV2, ResultTable, }; // Token credentials are re-exported for user convenience -pub use azure_identity::token_credentials::{ +pub use azure_identity::{ AutoRefreshingTokenCredential, AzureCliCredential, ClientSecretCredential, DefaultAzureCredential, DefaultAzureCredentialBuilder, EnvironmentCredential, - ManagedIdentityCredentialError, TokenCredentialOptions, + TokenCredentialOptions, }; diff --git a/azure-kusto-data/src/types.rs b/azure-kusto-data/src/types.rs index a51da70..677884d 100644 --- a/azure-kusto-data/src/types.rs +++ b/azure-kusto-data/src/types.rs @@ -1,5 +1,5 @@ use azure_core::error::{ErrorKind, ResultExt}; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use regex::{Captures, Regex}; use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::fmt::{Debug, Display, Formatter}; @@ -42,7 +42,7 @@ impl Debug for KustoDateTime { impl From for KustoDateTime { fn from(time: OffsetDateTime) -> Self { - KustoDateTime(time) + Self(time) } } @@ -59,7 +59,7 @@ pub struct KustoDuration(pub Duration); impl From for KustoDuration { fn from(duration: Duration) -> Self { - KustoDuration(duration) + Self(duration) } } @@ -74,37 +74,38 @@ impl Deref for KustoDuration { fn parse_regex_segment(captures: &Captures, name: &str) -> i64 { captures .name(name) - .map(|m| m.as_str().parse::().unwrap()) - .unwrap_or(0) + .map_or(0, |m| m.as_str().parse::().expect("Failed to parse regex segment as i64 - this is a bug - please report this issue to the Kusto team")) } +static KUSTO_DURATION_REGEX: Lazy = Lazy::new(|| { + Regex::new(r"^(?P-)?((?P\d+)\.)?(?P\d+):(?P\d+):(?P\d+)(\.(?P\d+))?$") + .expect("Failed to compile KustoDuration regex, this should never happen - please report this issue to the Kusto team") +}); impl FromStr for KustoDuration { type Err = Error; fn from_str(s: &str) -> Result { - lazy_static! { - static ref RE: Regex = Regex::new(r"^(?P\-)?((?P\d+)\.)?(?P\d+):(?P\d+):(?P\d+)(\.(?P\d+))?$").unwrap(); - } - if let Some(captures) = RE.captures(s) { - let neg = match captures.name("neg") { - None => 1, - Some(_) => -1, - }; - let days = parse_regex_segment(&captures, "days"); - let hours = parse_regex_segment(&captures, "hours"); - let minutes = parse_regex_segment(&captures, "minutes"); - let seconds = parse_regex_segment(&captures, "seconds"); - let nanos = parse_regex_segment(&captures, "nanos"); - let duration = neg - * (Duration::days(days) - + Duration::hours(hours) - + Duration::minutes(minutes) - + Duration::seconds(seconds) - + Duration::nanoseconds(nanos * 100)); // Ticks - Ok(KustoDuration(duration)) - } else { - Err(InvalidArgumentError::InvalidDuration(s.to_string()).into()) - } + KUSTO_DURATION_REGEX + .captures(s) + .map(|captures| { + let neg = match captures.name("neg") { + None => 1, + Some(_) => -1, + }; + let days = parse_regex_segment(&captures, "days"); + let hours = parse_regex_segment(&captures, "hours"); + let minutes = parse_regex_segment(&captures, "minutes"); + let seconds = parse_regex_segment(&captures, "seconds"); + let nanos = parse_regex_segment(&captures, "nanos"); + let duration = neg + * (Duration::days(days) + + Duration::hours(hours) + + Duration::minutes(minutes) + + Duration::seconds(seconds) + + Duration::nanoseconds(nanos * 100)); // Ticks + Self(duration) + }) + .ok_or_else(|| InvalidArgumentError::InvalidDuration(s.to_string()).into()) } } @@ -125,7 +126,8 @@ impl Display for KustoDuration { neg * (self.whole_hours() - self.whole_days() * 24), neg * (self.whole_minutes() - self.whole_hours() * 60), neg * (self.whole_seconds() - self.whole_minutes() * 60), - neg as i128 * (self.whole_nanoseconds() - self.whole_seconds() as i128 * 1_000_000_000) + i128::from(neg) + * (self.whole_nanoseconds() - i128::from(self.whole_seconds()) * 1_000_000_000) / 100 // Ticks )?; @@ -146,20 +148,22 @@ mod tests { #[test] fn string_conversion() { let refs: Vec<(&str, i64)> = vec![ - ("1.00:00:00.0000000", 86400000000000), - ("01:00:00.0000000", 3600000000000), - ("01:00:00", 3600000000000), - ("00:05:00.0000000", 300000000000), + ("1.00:00:00.0000000", 86_400_000_000_000), + ("01:00:00.0000000", 3_600_000_000_000), + ("01:00:00", 3_600_000_000_000), + ("00:05:00.0000000", 300_000_000_000), ("00:00:00.0000001", 100), - ("-01:00:00", -3600000000000), - ("-1.00:00:00.0000000", -86400000000000), - ("00:00:00.1234567", 123456700), + ("-01:00:00", -3_600_000_000_000), + ("-1.00:00:00.0000000", -86_400_000_000_000), + ("00:00:00.1234567", 123_456_700), ]; for (from, to) in refs { assert_eq!( - KustoDuration::from_str(from).unwrap().whole_nanoseconds(), - to as i128 + KustoDuration::from_str(from) + .unwrap_or_else(|_| panic!("Failed to parse duration {}", from)) + .whole_nanoseconds(), + i128::from(to) ); } } @@ -176,7 +180,8 @@ mod tests { ]; for duration in refs { - let parsed = KustoDuration::from_str(duration).unwrap(); + let parsed = KustoDuration::from_str(duration) + .unwrap_or_else(|_| panic!("Failed to parse duration {}", duration)); assert_eq!(format!("{:?}", parsed), duration); } } diff --git a/azure-kusto-data/tests/arrow.rs b/azure-kusto-data/tests/arrow.rs index 939ded6..76063e7 100644 --- a/azure-kusto-data/tests/arrow.rs +++ b/azure-kusto-data/tests/arrow.rs @@ -25,9 +25,7 @@ macro_rules! assert_batches_eq { #[tokio::test] async fn arrow_roundtrip() { - let (client, database) = setup::create_kusto_client("data_arrow_roundtrip") - .await - .unwrap(); + let (client, database) = setup::create_kusto_client("data_arrow_roundtrip"); let query = " datatable( @@ -48,11 +46,11 @@ async fn arrow_roundtrip() { .execute_query(&database, query) .into_future() .await - .unwrap(); + .expect("Failed to run query"); let batches = response .into_record_batches() .collect::, _>>() - .unwrap(); + .expect("Failed to collect batches"); let expected_schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int32, true), @@ -83,7 +81,9 @@ async fn arrow_roundtrip() { assert_batches_eq!( expected, // we have to de-select the duration column, since pretty printing is not supported in arrow - &[batches[0].project(&[0, 1, 2, 3, 4, 5, 6]).unwrap()] + &[batches[0] + .project(&[0, 1, 2, 3, 4, 5, 6]) + .expect("Failed to project numbers")] ); - assert_eq!(expected_schema, batches[0].schema()) + assert_eq!(expected_schema, batches[0].schema()); } diff --git a/azure-kusto-data/tests/e2e.rs b/azure-kusto-data/tests/e2e.rs index 50cbfad..1fd6c7d 100644 --- a/azure-kusto-data/tests/e2e.rs +++ b/azure-kusto-data/tests/e2e.rs @@ -3,16 +3,14 @@ mod setup; #[tokio::test] async fn create_query_delete_table() { - let (client, database) = setup::create_kusto_client("data_create_query_delete_table") - .await - .unwrap(); + let (client, database) = setup::create_kusto_client("data_create_query_delete_table"); let query = ".set KustoRsTest <| let text=\"Hello, World!\"; print str=text"; let response = client .execute_command(&database, query) .into_future() .await - .unwrap(); + .expect("Failed to run query"); assert_eq!(response.table_count(), 1); @@ -21,7 +19,7 @@ async fn create_query_delete_table() { .execute_command(&database, query) .into_future() .await - .unwrap(); + .expect("Failed to run query"); assert_eq!(response.table_count(), 4); @@ -30,7 +28,7 @@ async fn create_query_delete_table() { .execute_query(&database, query) .into_future() .await - .unwrap(); + .expect("Failed to run query"); let results = response.into_primary_results().collect::>(); assert_eq!(results[0].rows.len(), 1); @@ -40,7 +38,7 @@ async fn create_query_delete_table() { .execute_command(&database, query) .into_future() .await - .unwrap(); + .expect("Failed to run query"); - assert_eq!(response.tables[0].rows.len(), 0) + assert_eq!(response.tables[0].rows.len(), 0); } diff --git a/azure-kusto-data/tests/setup.rs b/azure-kusto-data/tests/setup.rs index 9a0b480..3518f16 100644 --- a/azure-kusto-data/tests/setup.rs +++ b/azure-kusto-data/tests/setup.rs @@ -1,11 +1,9 @@ #![cfg(feature = "mock_transport_framework")] -use azure_core::auth::{TokenCredential, TokenResponse}; -use azure_core::Error as CoreError; +use azure_core::auth::{AccessToken, TokenCredential, TokenResponse}; +use azure_core::error::Error as CoreError; use azure_kusto_data::prelude::*; use chrono::Utc; use dotenv::dotenv; -use oauth2::AccessToken; -use std::error::Error; use std::path::Path; use std::sync::Arc; @@ -21,12 +19,10 @@ impl TokenCredential for DummyCredential { } } -pub async fn create_kusto_client( - transaction_name: &str, -) -> Result<(KustoClient, String), Box> { - let transaction_path = Path::new(&workspace_root().unwrap()) +pub fn create_kusto_client(transaction_name: &str) -> (KustoClient, String) { + let transaction_path = Path::new(&workspace_root().expect("Failed to get workspace root")) .join(format!("test/transactions/{}", transaction_name)); - std::fs::create_dir_all(&transaction_path).unwrap(); + std::fs::create_dir_all(&transaction_path).expect("Failed to create transaction directory"); let db_path = transaction_path.join("_db"); let (service_url, credential, database): (String, Arc, String) = @@ -48,27 +44,32 @@ pub async fn create_kusto_client( // Wee need to persist the database name as well, since it may change per recording run depending on who // records it, is part of the request, and as such validated against. - std::fs::write(db_path, &database).unwrap(); + std::fs::write(db_path, &database).expect("Failed to write database name to file"); let credential = Arc::new(ClientSecretCredential::new( - tenant_id.to_string(), - client_id.to_string(), - client_secret.to_string(), + tenant_id, + client_id, + client_secret, TokenCredentialOptions::default(), )); (service_url, credential, database) } else { let credential = Arc::new(DummyCredential {}); - let database = String::from_utf8_lossy(&std::fs::read(db_path).unwrap()).to_string(); + let database = String::from_utf8_lossy( + &std::fs::read(&db_path) + .expect(&format!("Could not read db path {}", db_path.display())), + ) + .to_string(); (String::new(), credential, database) }; let options = KustoClientOptions::new_with_transaction_name(transaction_name.to_string()); - Ok(( - KustoClient::new_with_options(service_url, credential, options).unwrap(), + ( + KustoClient::new_with_options(service_url, credential, options) + .expect("Failed to create KustoClient"), database, - )) + ) } /// Run cargo to get the root of the workspace @@ -82,10 +83,10 @@ fn workspace_root() -> Result> { let key = "workspace_root\":\""; let index = output .find(key) - .ok_or_else(|| format!("workspace_root key not found in metadata"))?; + .ok_or_else(|| "workspace_root key not found in metadata".to_string())?; let value = &output[index + key.len()..]; let end = value - .find("\"") - .ok_or_else(|| format!("workspace_root value was malformed"))?; + .find('\"') + .ok_or_else(|| "workspace_root value was malformed".to_string())?; Ok(value[..end].into()) }