-Clippy suggestions (all levels enabled) - removing bad code, having const fns, adding must_use, etc
-Upgraded all packages to their latest versions, including azure
-Removing all unwrap()s, now they are replaced with something better, or except() for more info.
-Replaced lazy_static with once_cell
This commit is contained in:
AsafMah 2022-06-23 19:10:54 +03:00
Родитель 0f81cc0cc4
Коммит 9e45fff548
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: AD0D1680EEE7A4FF
14 изменённых файлов: 278 добавлений и 282 удалений

Просмотреть файл

@ -12,25 +12,24 @@ keywords = ["sdk", "azure", "kusto", "azure-data-explorer"]
categories = ["api-bindings"]
[dependencies]
arrow = { version = "13", optional = true }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "66db4b485ce56b68be148708d9c810960a50be51", features = [
arrow = { version = "15.0.0", optional = true }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "8586a66b20fba463c39156f0390e583ec305ab2d", features = [
"enable_reqwest",
"enable_reqwest_gzip",
] }
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "66db4b485ce56b68be148708d9c810960a50be51" }
async-trait = "0.1"
async-convert = "1"
bytes = "1"
futures = "0.3"
http = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = { version = "1.12.0", features = ["json"] }
thiserror = "1"
lazy_static = "1.4.0"
hashbrown = "0.12.0"
regex = "1.5.5"
time = { version = "0.3.9", features = [
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust", rev = "8586a66b20fba463c39156f0390e583ec305ab2d" }
async-trait = "0.1.56"
async-convert = "1.0.0"
bytes = "1.1.0"
futures = "0.3.21"
http = "0.2.8"
serde = { version = "1.0.137", features = ["derive"] }
serde_json = "1.0.81"
serde_with = { version = "1.12.1", features = ["json"] }
thiserror = "1.0.31"
hashbrown = "0.12.1"
regex = "1.5.6"
time = { version = "0.3.11", features = [
"serde",
"parsing",
"formatting",
@ -38,14 +37,15 @@ time = { version = "0.3.9", features = [
"serde-well-known",
] }
derive_builder = "0.11.2"
once_cell = "1.12.0"
[dev-dependencies]
arrow = { version = "13", features = ["prettyprint"] }
dotenv = "*"
arrow = { version = "15.0.0", features = ["prettyprint"] }
dotenv = "0.15.0"
env_logger = "0.9"
tokio = { version = "1", features = ["macros"] }
chrono = "*"
oauth2 = "*"
tokio = { version = "1.19.2", features = ["macros"] }
chrono = "0.4.19"
oauth2 = "4.2.0"
[features]
default = ["arrow"]

Просмотреть файл

@ -29,13 +29,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
&client_secret,
);
let client = KustoClient::try_from(kcsb).unwrap();
let client = KustoClient::try_from(kcsb).expect("Failed to create Kusto client");
let response = client
.execute_command(database, query)
.into_future()
.await
.unwrap();
.expect("Failed to execute query");
println!("command response: {:?}", response);

Просмотреть файл

@ -29,13 +29,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
&client_secret,
);
let client = KustoClient::try_from(kcsb).unwrap();
let client = KustoClient::try_from(kcsb).expect("Failed to create Kusto client");
let response = client
.execute_query(database, query)
.into_future()
.await
.unwrap();
.expect("Failed to execute query");
for table in &response.tables {
match table {

Просмотреть файл

@ -16,12 +16,12 @@ use azure_core::error::{ErrorKind, ResultExt};
use crate::error::Result;
use crate::models::ColumnType;
use crate::models::*;
use crate::models::{Column, DataTable};
use crate::types::{KustoDateTime, KustoDuration};
fn convert_array_string(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let strings: Vec<Option<String>> = serde_json::from_value(serde_json::Value::Array(values))?;
let strings: Vec<Option<&str>> = strings.iter().map(|opt| opt.as_deref()).collect();
let strings: Vec<Option<&str>> = strings.iter().map(Option::as_deref).collect();
Ok(Arc::new(StringArray::from(strings)))
}
@ -85,42 +85,23 @@ fn convert_array_i64(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
Ok(Arc::new(Int64Array::from(ints)))
}
pub fn convert_column(data: Vec<serde_json::Value>, column: Column) -> Result<(Field, ArrayRef)> {
pub fn convert_column(data: Vec<serde_json::Value>, column: &Column) -> Result<(Field, ArrayRef)> {
let column_name = &column.column_name;
match column.column_type {
ColumnType::String => convert_array_string(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Utf8, true),
data,
)
}),
ColumnType::Bool | ColumnType::Boolean => convert_array_bool(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Boolean, true),
data,
)
}),
ColumnType::Int => convert_array_i32(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Int32, true),
data,
)
}),
ColumnType::Long => convert_array_i64(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Int64, true),
data,
)
}),
ColumnType::Real => convert_array_float(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Float64, true),
data,
)
}),
ColumnType::String => convert_array_string(data)
.map(|data| (Field::new(column_name, DataType::Utf8, true), data)),
ColumnType::Bool | ColumnType::Boolean => convert_array_bool(data)
.map(|data| (Field::new(column_name, DataType::Boolean, true), data)),
ColumnType::Int => convert_array_i32(data)
.map(|data| (Field::new(column_name, DataType::Int32, true), data)),
ColumnType::Long => convert_array_i64(data)
.map(|data| (Field::new(column_name, DataType::Int64, true), data)),
ColumnType::Real => convert_array_float(data)
.map(|data| (Field::new(column_name, DataType::Float64, true), data)),
ColumnType::Datetime => convert_array_datetime(data).map(|data| {
(
Field::new(
column.column_name.as_str(),
column_name,
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
@ -129,11 +110,7 @@ pub fn convert_column(data: Vec<serde_json::Value>, column: Column) -> Result<(F
}),
ColumnType::Timespan => convert_array_timespan(data).map(|data| {
(
Field::new(
column.column_name.as_str(),
DataType::Duration(TimeUnit::Nanosecond),
true,
),
Field::new(column_name, DataType::Duration(TimeUnit::Nanosecond), true),
data,
)
}),
@ -158,7 +135,7 @@ pub fn convert_table(table: DataTable) -> Result<RecordBatch> {
buffer
.into_iter()
.zip(table.columns.into_iter())
.map(|(data, column)| convert_column(data, column))
.map(|(data, column)| convert_column(data, &column))
.try_for_each::<_, Result<()>>(|result| {
let (field, data) = result?;
fields.push(field);
@ -173,6 +150,7 @@ pub fn convert_table(table: DataTable) -> Result<RecordBatch> {
#[cfg(test)]
mod tests {
use super::*;
use crate::models::TableKind;
use crate::operations::query::{KustoResponseDataSetV2, ResultTable};
use std::path::PathBuf;
@ -188,7 +166,7 @@ mod tests {
column_name: "int_col".to_string(),
column_type: ColumnType::Int,
};
assert_eq!(c, ref_col)
assert_eq!(c, ref_col);
}
#[test]
@ -218,7 +196,7 @@ mod tests {
}],
rows: vec![],
};
assert_eq!(t, ref_tbl)
assert_eq!(t, ref_tbl);
}
#[test]
@ -226,15 +204,16 @@ mod tests {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests/inputs/dataframe.json");
let data = std::fs::read_to_string(path).unwrap();
let tables: Vec<ResultTable> = serde_json::from_str(&data).unwrap();
let data = std::fs::read_to_string(path).expect("Failed to read file");
let tables: Vec<ResultTable> =
serde_json::from_str(&data).expect("Failed to deserialize result table");
let response = KustoResponseDataSetV2 { tables };
let record_batches = response
.into_record_batches()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
.expect("Failed to convert to record batches");
assert!(record_batches[0].num_columns() > 0);
assert!(record_batches[0].num_rows() > 0)
assert!(record_batches[0].num_rows() > 0);
}
}

Просмотреть файл

@ -1,6 +1,5 @@
use azure_core::headers::{HeaderValue, AUTHORIZATION};
use azure_core::{auth::TokenCredential, Context, Policy, PolicyResult, Request};
use http::header::AUTHORIZATION;
use http::HeaderValue;
use std::sync::Arc;
#[derive(Clone)]
@ -44,11 +43,9 @@ impl Policy for AuthorizationPolicy {
);
let token = self.credential.get_token(&self.resource).await?;
let auth_header_value = format!("Bearer {}", token.token.secret().clone());
let auth_header_value = format!("Bearer {}", token.token.secret());
request
.headers_mut()
.insert(AUTHORIZATION, HeaderValue::from_str(&auth_header_value)?);
request.insert_header(AUTHORIZATION, HeaderValue::from(auth_header_value));
next[0].send(ctx, request, &next[1..]).await
}

Просмотреть файл

@ -3,19 +3,17 @@ use crate::connection_string::{ConnectionString, ConnectionStringBuilder};
use crate::error::Result;
use crate::operations::query::{QueryRunner, QueryRunnerBuilder, V1QueryRunner, V2QueryRunner};
use azure_core::auth::TokenCredential;
use azure_core::prelude::*;
use azure_core::{ClientOptions, Context, Pipeline, Request};
use azure_identity::token_credentials::{
use azure_core::{ClientOptions, Context, Pipeline};
use azure_identity::{
AzureCliCredential, ClientSecretCredential, DefaultAzureCredential,
ImdsManagedIdentityCredential, TokenCredentialOptions,
};
use http::Uri;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;
const API_VERSION: &str = "2019-02-13";
/// Options for specifying how a Kusto client will behave
#[derive(Clone, Default)]
pub struct KustoClientOptions {
@ -24,6 +22,7 @@ pub struct KustoClientOptions {
impl KustoClientOptions {
/// Create new options
#[must_use]
pub fn new() -> Self {
Self::default()
}
@ -57,7 +56,7 @@ fn new_pipeline_from_options(
/// Kusto client for Rust.
/// The client is a wrapper around the Kusto REST API.
/// To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/
/// To read more about it, go to [https://docs.microsoft.com/en-us/azure/kusto/api/rest/](https://docs.microsoft.com/en-us/azure/kusto/api/rest/)
///
/// The primary methods are:
/// `execute_query`: executes a KQL query against the Kusto service.
@ -116,11 +115,11 @@ impl KustoClient {
.with_query(query.into())
.with_context(Context::new())
.build()
.unwrap()
.expect("Unexpected error when building query runner - please report this issue to the Kusto team")
}
/// Execute a KQL query.
/// To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/
/// To learn more about KQL go to [https://docs.microsoft.com/en-us/azure/kusto/query/](https://docs.microsoft.com/en-us/azure/kusto/query)
///
/// # Arguments
///
@ -142,20 +141,7 @@ impl KustoClient {
V1QueryRunner(self.execute(database, query, QueryKind::Management))
}
pub(crate) fn prepare_request(&self, uri: Uri, http_method: http::Method) -> Request {
let mut request = Request::new(uri, http_method);
request.insert_headers(&Version::from(API_VERSION));
request.insert_headers(&Accept::from("application/json"));
request.insert_headers(&ContentType::new("application/json; charset=utf-8"));
request.insert_headers(&AcceptEncoding::from("gzip"));
request.insert_headers(&ClientVersion::from(format!(
"Kusto.Rust.Client:{}",
env!("CARGO_PKG_VERSION"),
)));
request
}
pub(crate) fn pipeline(&self) -> &Pipeline {
pub(crate) const fn pipeline(&self) -> &Pipeline {
&self.pipeline
}
}
@ -183,7 +169,7 @@ impl<'a> TryFrom<ConnectionString<'a>> for KustoClient {
ConnectionString {
msi_auth: Some(true),
..
} => Arc::new(ImdsManagedIdentityCredential {}),
} => Arc::new(ImdsManagedIdentityCredential::default()),
ConnectionString {
az_cli: Some(true), ..
} => Arc::new(AzureCliCredential {}),

Просмотреть файл

@ -1,8 +1,9 @@
// Set of properties that can be use in a connection string provided to KustoConnectionStringBuilder.
// For a complete list of properties go to https://docs.microsoft.com/en-us/azure/kusto/api/connection-strings/kusto
use crate::error::ConnectionStringError;
use hashbrown::HashMap;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
enum ConnectionStringKey {
DataSource,
@ -22,7 +23,7 @@ enum ConnectionStringKey {
}
impl ConnectionStringKey {
fn to_str(&self) -> &'static str {
const fn to_str(&self) -> &'static str {
match self {
ConnectionStringKey::DataSource => "Data Source",
ConnectionStringKey::FederatedSecurity => "AAD Federated Security",
@ -44,81 +45,79 @@ impl ConnectionStringKey {
}
}
lazy_static! {
static ref ALIAS_MAP: HashMap<&'static str, ConnectionStringKey> = {
let mut m = HashMap::new();
m.insert("data source", ConnectionStringKey::DataSource);
m.insert("addr", ConnectionStringKey::DataSource);
m.insert("address", ConnectionStringKey::DataSource);
m.insert("network address", ConnectionStringKey::DataSource);
m.insert("server", ConnectionStringKey::DataSource);
static ALIAS_MAP: Lazy<HashMap<&'static str, ConnectionStringKey>> = Lazy::new(|| {
let mut m = HashMap::new();
m.insert("data source", ConnectionStringKey::DataSource);
m.insert("addr", ConnectionStringKey::DataSource);
m.insert("address", ConnectionStringKey::DataSource);
m.insert("network address", ConnectionStringKey::DataSource);
m.insert("server", ConnectionStringKey::DataSource);
m.insert(
"aad federated security",
ConnectionStringKey::FederatedSecurity,
);
m.insert("federated security", ConnectionStringKey::FederatedSecurity);
m.insert("federated", ConnectionStringKey::FederatedSecurity);
m.insert("fed", ConnectionStringKey::FederatedSecurity);
m.insert("aadfed", ConnectionStringKey::FederatedSecurity);
m.insert(
"aad federated security",
ConnectionStringKey::FederatedSecurity,
);
m.insert("federated security", ConnectionStringKey::FederatedSecurity);
m.insert("federated", ConnectionStringKey::FederatedSecurity);
m.insert("fed", ConnectionStringKey::FederatedSecurity);
m.insert("aadfed", ConnectionStringKey::FederatedSecurity);
m.insert("aad user id", ConnectionStringKey::UserId);
m.insert("user id", ConnectionStringKey::UserId);
m.insert("uid", ConnectionStringKey::UserId);
m.insert("user", ConnectionStringKey::UserId);
m.insert("aad user id", ConnectionStringKey::UserId);
m.insert("user id", ConnectionStringKey::UserId);
m.insert("uid", ConnectionStringKey::UserId);
m.insert("user", ConnectionStringKey::UserId);
m.insert("password", ConnectionStringKey::Password);
m.insert("pwd", ConnectionStringKey::Password);
m.insert("password", ConnectionStringKey::Password);
m.insert("pwd", ConnectionStringKey::Password);
m.insert(
"application client id",
ConnectionStringKey::ApplicationClientId,
);
m.insert("appclientid", ConnectionStringKey::ApplicationClientId);
m.insert(
"application client id",
ConnectionStringKey::ApplicationClientId,
);
m.insert("appclientid", ConnectionStringKey::ApplicationClientId);
m.insert("application key", ConnectionStringKey::ApplicationKey);
m.insert("appkey", ConnectionStringKey::ApplicationKey);
m.insert("application key", ConnectionStringKey::ApplicationKey);
m.insert("appkey", ConnectionStringKey::ApplicationKey);
m.insert(
"application certificate",
ConnectionStringKey::ApplicationCertificate,
);
m.insert(
"application certificate",
ConnectionStringKey::ApplicationCertificate,
);
m.insert(
"application certificate thumbprint",
ConnectionStringKey::ApplicationCertificateThumbprint,
);
m.insert(
"appcert",
ConnectionStringKey::ApplicationCertificateThumbprint,
);
m.insert(
"application certificate thumbprint",
ConnectionStringKey::ApplicationCertificateThumbprint,
);
m.insert(
"appcert",
ConnectionStringKey::ApplicationCertificateThumbprint,
);
m.insert("authority id", ConnectionStringKey::AuthorityId);
m.insert("authorityid", ConnectionStringKey::AuthorityId);
m.insert("authority", ConnectionStringKey::AuthorityId);
m.insert("tenantid", ConnectionStringKey::AuthorityId);
m.insert("tenant", ConnectionStringKey::AuthorityId);
m.insert("tid", ConnectionStringKey::AuthorityId);
m.insert("authority id", ConnectionStringKey::AuthorityId);
m.insert("authorityid", ConnectionStringKey::AuthorityId);
m.insert("authority", ConnectionStringKey::AuthorityId);
m.insert("tenantid", ConnectionStringKey::AuthorityId);
m.insert("tenant", ConnectionStringKey::AuthorityId);
m.insert("tid", ConnectionStringKey::AuthorityId);
m.insert("application token", ConnectionStringKey::ApplicationToken);
m.insert("apptoken", ConnectionStringKey::ApplicationToken);
m.insert("application token", ConnectionStringKey::ApplicationToken);
m.insert("apptoken", ConnectionStringKey::ApplicationToken);
m.insert("user token", ConnectionStringKey::UserToken);
m.insert("usertoken", ConnectionStringKey::UserToken);
m.insert("user token", ConnectionStringKey::UserToken);
m.insert("usertoken", ConnectionStringKey::UserToken);
m.insert("msi auth", ConnectionStringKey::MsiAuth);
m.insert("msi_auth", ConnectionStringKey::MsiAuth);
m.insert("msi", ConnectionStringKey::MsiAuth);
m.insert("msi auth", ConnectionStringKey::MsiAuth);
m.insert("msi_auth", ConnectionStringKey::MsiAuth);
m.insert("msi", ConnectionStringKey::MsiAuth);
m.insert("msi params", ConnectionStringKey::MsiParams);
m.insert("msi_params", ConnectionStringKey::MsiParams);
m.insert("msi_type", ConnectionStringKey::MsiParams);
m.insert("msi params", ConnectionStringKey::MsiParams);
m.insert("msi_params", ConnectionStringKey::MsiParams);
m.insert("msi_type", ConnectionStringKey::MsiParams);
m.insert("az cli", ConnectionStringKey::AzCli);
m.insert("az cli", ConnectionStringKey::AzCli);
m
};
}
m
});
// TODO: when available
// pub const PUBLIC_APPLICATION_CERTIFICATE_NAME: &str = "Public Application Certificate";
@ -136,30 +135,21 @@ lazy_static! {
ConnectionStringKey::ApplicationCertificateX5C => "Application Certificate x5c",
*/
#[derive(Debug, thiserror::Error)]
pub enum ConnectionStringError {
#[error("Missing value for key '{}'", key)]
MissingValue { key: String },
#[error("Unexpected key '{}'", key)]
UnexpectedKey { key: String },
#[error("Parsing error: {}", msg)]
ParsingError { msg: String },
}
/// Build a connection string to connect to a Kusto service instance.
///
/// For more information on Kusto connection strings visit:
/// https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto
/// [https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto)
#[derive(Default)]
pub struct ConnectionStringBuilder<'a>(ConnectionString<'a>);
impl<'a> ConnectionStringBuilder<'a> {
/// Creates a ConnectionStringBuilder with no configuration options set
/// Creates a `ConnectionStringBuilder` with no configuration options set
#[must_use]
pub fn new() -> Self {
Self(ConnectionString::default())
}
/// Creates a ConnectionStringBuilder that will authenticate with AAD application and key.
/// Creates a `ConnectionStringBuilder` that will authenticate with AAD application and key.
///
/// # Arguments
///
@ -167,6 +157,7 @@ impl<'a> ConnectionStringBuilder<'a> {
/// * `authority_id` - Authority id (aka Tenant id) must be provided
/// * `client_id` - AAD application ID.
/// * `client_secret` - Corresponding key of the AAD application.
#[must_use]
pub fn new_with_aad_application_key_authentication(
service_url: &'a str,
authority_id: &'a str,
@ -183,6 +174,7 @@ impl<'a> ConnectionStringBuilder<'a> {
})
}
#[must_use]
pub fn build(&self) -> String {
let mut kv_pairs = Vec::new();
@ -266,7 +258,7 @@ impl<'a> ConnectionStringBuilder<'a> {
/// A Kusto service connection string.
///
/// For more information on Kusto connection strings visit:
/// https://docs.microsoft.com/en-us/azure/kusto/api/connection-strings/kusto
/// [https://docs.microsoft.com/en-us/azure/kusto/api/connection-strings/kusto](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto)
#[derive(Debug, Default)]
pub struct ConnectionString<'a> {
/// The URI specifying the Kusto service endpoint.
@ -422,10 +414,7 @@ mod tests {
#[test]
fn it_parses_empty_connection_string() {
assert_eq!(
ConnectionString::new("").unwrap(),
ConnectionString::default()
);
assert_eq!(ConnectionString::new(""), Ok(ConnectionString::default()));
}
#[test]

Просмотреть файл

@ -1,6 +1,6 @@
//! Defines `KustoRsError` for representing failures in various operations.
use http::uri::InvalidUri;
use std::fmt::Debug;
use std::num::TryFromIntError;
use thiserror;
#[derive(thiserror::Error, Debug)]
@ -21,24 +21,34 @@ pub enum Error {
NotImplemented(String),
/// Error relating to (de-)serialization of JSON data
#[error(transparent)]
#[error("Error in JSON serialization/deserialization: {0}")]
JsonError(#[from] serde_json::Error),
/// Error occurring within core azure crates
#[error(transparent)]
#[error("Error in azure-core: {0}")]
AzureError(#[from] azure_core::error::Error),
/// Errors raised when parsing connection information
#[error("Configuration error: {0}")]
ConfigurationError(#[from] crate::connection_string::ConnectionStringError),
#[error("Connection string error: {0}")]
ConnectionStringError(#[from] ConnectionStringError),
}
#[derive(thiserror::Error, Debug)]
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum InvalidArgumentError {
#[error(transparent)]
InvalidUri(#[from] InvalidUri),
#[error("{0} is not a valid duration")]
InvalidDuration(String),
#[error("{0} is too large to fit in a u32")]
PayloadTooLarge(#[from] TryFromIntError),
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum ConnectionStringError {
#[error("Missing value for key '{}'", key)]
MissingValue { key: String },
#[error("Unexpected key '{}'", key)]
UnexpectedKey { key: String },
#[error("Parsing error: {}", msg)]
ParsingError { msg: String },
}
pub type Result<T> = std::result::Result<T, Error>;

Просмотреть файл

@ -1,6 +1,7 @@
#[cfg(feature = "arrow")]
use crate::arrow::convert_table;
use crate::client::{KustoClient, QueryKind};
use crate::error::{Error, InvalidArgumentError};
use crate::models::{
DataSetCompletion, DataSetHeader, DataTable, QueryBody, RequestProperties, TableKind, TableV1,
@ -9,8 +10,9 @@ use crate::request_options::RequestOptions;
#[cfg(feature = "arrow")]
use arrow::record_batch::RecordBatch;
use async_convert::TryFrom;
use azure_core::error::Error as CoreError;
use azure_core::prelude::*;
use azure_core::{collect_pinned_stream, Response as HttpResponse};
use azure_core::{collect_pinned_stream, Request, Response as HttpResponse, Url};
use futures::future::BoxFuture;
use futures::TryFutureExt;
use serde::{Deserialize, Serialize};
@ -45,15 +47,25 @@ pub struct V2QueryRunner(pub QueryRunner);
impl V1QueryRunner {
pub fn into_future(self) -> V1QueryRun {
let V1QueryRunner(query_runner) = self;
Box::pin(query_runner.into_future().map_ok(|e| e.try_into().unwrap()))
Box::pin(async {
let V1QueryRunner(query_runner) = self;
let future = query_runner.into_future().await?;
Ok(
std::convert::TryInto::try_into(future).expect("Unexpected conversion error from KustoResponse to KustoResponseDataSetV1 - please report this issue to the Kusto team")
)
})
}
}
impl V2QueryRunner {
pub fn into_future(self) -> V2QueryRun {
let V2QueryRunner(query_runner) = self;
Box::pin(query_runner.into_future().map_ok(|e| e.try_into().unwrap()))
Box::pin(async {
let V2QueryRunner(query_runner) = self;
let future = query_runner.into_future().await?;
Ok(
std::convert::TryInto::try_into(future).expect("Unexpected conversion error from KustoResponse to KustoResponseDataSetV2 - please report this issue to the Kusto team")
)
})
}
}
@ -67,10 +79,8 @@ impl QueryRunner {
QueryKind::Management => this.client.management_url(),
QueryKind::Query => this.client.query_url(),
};
let mut request = this.client.prepare_request(
url.parse().map_err(InvalidArgumentError::InvalidUri)?,
http::Method::POST,
);
let mut request =
prepare_request(url.parse().map_err(CoreError::from)?, http::Method::POST);
if let Some(request_id) = &this.client_request_id {
request.insert_headers(request_id);
@ -91,8 +101,10 @@ impl QueryRunner {
}),
};
let bytes = bytes::Bytes::from(serde_json::to_string(&body)?);
request.insert_headers(&ContentLength::new(bytes.len() as i32));
request.set_body(bytes.into());
request.insert_headers(&ContentLength::new(
std::convert::TryInto::try_into(bytes.len()).map_err(InvalidArgumentError::from)?,
));
request.set_body(bytes);
let response = self
.client
@ -159,11 +171,12 @@ impl std::convert::TryFrom<KustoResponse> for KustoResponseDataSetV1 {
}
impl KustoResponseDataSetV2 {
#[must_use]
pub fn table_count(&self) -> usize {
self.tables.len()
}
/// Consumes the response into an iterator over all PrimaryResult tables within the response dataset
/// Consumes the response into an iterator over all `PrimaryResult` tables within the response dataset
pub fn into_primary_results(self) -> impl Iterator<Item = DataTable> {
self.tables.into_iter().filter_map(|table| match table {
ResultTable::DataTable(table) if table.table_kind == TableKind::PrimaryResult => {
@ -186,6 +199,7 @@ pub struct KustoResponseDataSetV1 {
}
impl KustoResponseDataSetV1 {
#[must_use]
pub fn table_count(&self) -> usize {
self.tables.len()
}
@ -224,6 +238,21 @@ impl TryFrom<HttpResponse> for KustoResponseDataSetV1 {
// }
// }
pub fn prepare_request(uri: Url, http_method: http::Method) -> Request {
const API_VERSION: &str = "2019-02-13";
let mut request = Request::new(uri, http_method);
request.insert_headers(&Version::from(API_VERSION));
request.insert_headers(&Accept::from("application/json"));
request.insert_headers(&ContentType::new("application/json; charset=utf-8"));
request.insert_headers(&AcceptEncoding::from("gzip"));
request.insert_headers(&ClientVersion::from(format!(
"Kusto.Rust.Client:{}",
env!("CARGO_PKG_VERSION"),
)));
request
}
#[cfg(test)]
mod tests {
use super::*;
@ -244,7 +273,7 @@ mod tests {
}"#;
let parsed = serde_json::from_str::<KustoResponseDataSetV1>(data);
assert!(parsed.is_ok())
assert!(parsed.is_ok());
}
#[test]
@ -252,9 +281,11 @@ mod tests {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests/inputs/adminthenquery.json");
let data = std::fs::read_to_string(path).unwrap();
let data = std::fs::read_to_string(&path)
.unwrap_or_else(|_| panic!("Failed to read {}", path.display()));
let parsed = serde_json::from_str::<KustoResponseDataSetV1>(&data).unwrap();
assert_eq!(parsed.table_count(), 4)
let parsed = serde_json::from_str::<KustoResponseDataSetV1>(&data)
.expect("Failed to parse response");
assert_eq!(parsed.table_count(), 4);
}
}

Просмотреть файл

@ -17,8 +17,8 @@ pub use crate::operations::query::{
KustoResponse, KustoResponseDataSetV1, KustoResponseDataSetV2, ResultTable,
};
// Token credentials are re-exported for user convenience
pub use azure_identity::token_credentials::{
pub use azure_identity::{
AutoRefreshingTokenCredential, AzureCliCredential, ClientSecretCredential,
DefaultAzureCredential, DefaultAzureCredentialBuilder, EnvironmentCredential,
ManagedIdentityCredentialError, TokenCredentialOptions,
TokenCredentialOptions,
};

Просмотреть файл

@ -1,5 +1,5 @@
use azure_core::error::{ErrorKind, ResultExt};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use regex::{Captures, Regex};
use serde_with::{DeserializeFromStr, SerializeDisplay};
use std::fmt::{Debug, Display, Formatter};
@ -42,7 +42,7 @@ impl Debug for KustoDateTime {
impl From<OffsetDateTime> for KustoDateTime {
fn from(time: OffsetDateTime) -> Self {
KustoDateTime(time)
Self(time)
}
}
@ -59,7 +59,7 @@ pub struct KustoDuration(pub Duration);
impl From<Duration> for KustoDuration {
fn from(duration: Duration) -> Self {
KustoDuration(duration)
Self(duration)
}
}
@ -74,37 +74,38 @@ impl Deref for KustoDuration {
fn parse_regex_segment(captures: &Captures, name: &str) -> i64 {
captures
.name(name)
.map(|m| m.as_str().parse::<i64>().unwrap())
.unwrap_or(0)
.map_or(0, |m| m.as_str().parse::<i64>().expect("Failed to parse regex segment as i64 - this is a bug - please report this issue to the Kusto team"))
}
static KUSTO_DURATION_REGEX: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"^(?P<neg>-)?((?P<days>\d+)\.)?(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)(\.(?P<nanos>\d+))?$")
.expect("Failed to compile KustoDuration regex, this should never happen - please report this issue to the Kusto team")
});
impl FromStr for KustoDuration {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
lazy_static! {
static ref RE: Regex = Regex::new(r"^(?P<neg>\-)?((?P<days>\d+)\.)?(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)(\.(?P<nanos>\d+))?$").unwrap();
}
if let Some(captures) = RE.captures(s) {
let neg = match captures.name("neg") {
None => 1,
Some(_) => -1,
};
let days = parse_regex_segment(&captures, "days");
let hours = parse_regex_segment(&captures, "hours");
let minutes = parse_regex_segment(&captures, "minutes");
let seconds = parse_regex_segment(&captures, "seconds");
let nanos = parse_regex_segment(&captures, "nanos");
let duration = neg
* (Duration::days(days)
+ Duration::hours(hours)
+ Duration::minutes(minutes)
+ Duration::seconds(seconds)
+ Duration::nanoseconds(nanos * 100)); // Ticks
Ok(KustoDuration(duration))
} else {
Err(InvalidArgumentError::InvalidDuration(s.to_string()).into())
}
KUSTO_DURATION_REGEX
.captures(s)
.map(|captures| {
let neg = match captures.name("neg") {
None => 1,
Some(_) => -1,
};
let days = parse_regex_segment(&captures, "days");
let hours = parse_regex_segment(&captures, "hours");
let minutes = parse_regex_segment(&captures, "minutes");
let seconds = parse_regex_segment(&captures, "seconds");
let nanos = parse_regex_segment(&captures, "nanos");
let duration = neg
* (Duration::days(days)
+ Duration::hours(hours)
+ Duration::minutes(minutes)
+ Duration::seconds(seconds)
+ Duration::nanoseconds(nanos * 100)); // Ticks
Self(duration)
})
.ok_or_else(|| InvalidArgumentError::InvalidDuration(s.to_string()).into())
}
}
@ -125,7 +126,8 @@ impl Display for KustoDuration {
neg * (self.whole_hours() - self.whole_days() * 24),
neg * (self.whole_minutes() - self.whole_hours() * 60),
neg * (self.whole_seconds() - self.whole_minutes() * 60),
neg as i128 * (self.whole_nanoseconds() - self.whole_seconds() as i128 * 1_000_000_000)
i128::from(neg)
* (self.whole_nanoseconds() - i128::from(self.whole_seconds()) * 1_000_000_000)
/ 100 // Ticks
)?;
@ -146,20 +148,22 @@ mod tests {
#[test]
fn string_conversion() {
let refs: Vec<(&str, i64)> = vec![
("1.00:00:00.0000000", 86400000000000),
("01:00:00.0000000", 3600000000000),
("01:00:00", 3600000000000),
("00:05:00.0000000", 300000000000),
("1.00:00:00.0000000", 86_400_000_000_000),
("01:00:00.0000000", 3_600_000_000_000),
("01:00:00", 3_600_000_000_000),
("00:05:00.0000000", 300_000_000_000),
("00:00:00.0000001", 100),
("-01:00:00", -3600000000000),
("-1.00:00:00.0000000", -86400000000000),
("00:00:00.1234567", 123456700),
("-01:00:00", -3_600_000_000_000),
("-1.00:00:00.0000000", -86_400_000_000_000),
("00:00:00.1234567", 123_456_700),
];
for (from, to) in refs {
assert_eq!(
KustoDuration::from_str(from).unwrap().whole_nanoseconds(),
to as i128
KustoDuration::from_str(from)
.unwrap_or_else(|_| panic!("Failed to parse duration {}", from))
.whole_nanoseconds(),
i128::from(to)
);
}
}
@ -176,7 +180,8 @@ mod tests {
];
for duration in refs {
let parsed = KustoDuration::from_str(duration).unwrap();
let parsed = KustoDuration::from_str(duration)
.unwrap_or_else(|_| panic!("Failed to parse duration {}", duration));
assert_eq!(format!("{:?}", parsed), duration);
}
}

Просмотреть файл

@ -25,9 +25,7 @@ macro_rules! assert_batches_eq {
#[tokio::test]
async fn arrow_roundtrip() {
let (client, database) = setup::create_kusto_client("data_arrow_roundtrip")
.await
.unwrap();
let (client, database) = setup::create_kusto_client("data_arrow_roundtrip");
let query = "
datatable(
@ -48,11 +46,11 @@ async fn arrow_roundtrip() {
.execute_query(&database, query)
.into_future()
.await
.unwrap();
.expect("Failed to run query");
let batches = response
.into_record_batches()
.collect::<Result<Vec<_>, _>>()
.unwrap();
.expect("Failed to collect batches");
let expected_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, true),
@ -83,7 +81,9 @@ async fn arrow_roundtrip() {
assert_batches_eq!(
expected,
// we have to de-select the duration column, since pretty printing is not supported in arrow
&[batches[0].project(&[0, 1, 2, 3, 4, 5, 6]).unwrap()]
&[batches[0]
.project(&[0, 1, 2, 3, 4, 5, 6])
.expect("Failed to project numbers")]
);
assert_eq!(expected_schema, batches[0].schema())
assert_eq!(expected_schema, batches[0].schema());
}

Просмотреть файл

@ -3,16 +3,14 @@ mod setup;
#[tokio::test]
async fn create_query_delete_table() {
let (client, database) = setup::create_kusto_client("data_create_query_delete_table")
.await
.unwrap();
let (client, database) = setup::create_kusto_client("data_create_query_delete_table");
let query = ".set KustoRsTest <| let text=\"Hello, World!\"; print str=text";
let response = client
.execute_command(&database, query)
.into_future()
.await
.unwrap();
.expect("Failed to run query");
assert_eq!(response.table_count(), 1);
@ -21,7 +19,7 @@ async fn create_query_delete_table() {
.execute_command(&database, query)
.into_future()
.await
.unwrap();
.expect("Failed to run query");
assert_eq!(response.table_count(), 4);
@ -30,7 +28,7 @@ async fn create_query_delete_table() {
.execute_query(&database, query)
.into_future()
.await
.unwrap();
.expect("Failed to run query");
let results = response.into_primary_results().collect::<Vec<_>>();
assert_eq!(results[0].rows.len(), 1);
@ -40,7 +38,7 @@ async fn create_query_delete_table() {
.execute_command(&database, query)
.into_future()
.await
.unwrap();
.expect("Failed to run query");
assert_eq!(response.tables[0].rows.len(), 0)
assert_eq!(response.tables[0].rows.len(), 0);
}

Просмотреть файл

@ -1,11 +1,9 @@
#![cfg(feature = "mock_transport_framework")]
use azure_core::auth::{TokenCredential, TokenResponse};
use azure_core::Error as CoreError;
use azure_core::auth::{AccessToken, TokenCredential, TokenResponse};
use azure_core::error::Error as CoreError;
use azure_kusto_data::prelude::*;
use chrono::Utc;
use dotenv::dotenv;
use oauth2::AccessToken;
use std::error::Error;
use std::path::Path;
use std::sync::Arc;
@ -21,12 +19,10 @@ impl TokenCredential for DummyCredential {
}
}
pub async fn create_kusto_client(
transaction_name: &str,
) -> Result<(KustoClient, String), Box<dyn Error + Send + Sync>> {
let transaction_path = Path::new(&workspace_root().unwrap())
pub fn create_kusto_client(transaction_name: &str) -> (KustoClient, String) {
let transaction_path = Path::new(&workspace_root().expect("Failed to get workspace root"))
.join(format!("test/transactions/{}", transaction_name));
std::fs::create_dir_all(&transaction_path).unwrap();
std::fs::create_dir_all(&transaction_path).expect("Failed to create transaction directory");
let db_path = transaction_path.join("_db");
let (service_url, credential, database): (String, Arc<dyn TokenCredential>, String) =
@ -48,27 +44,32 @@ pub async fn create_kusto_client(
// Wee need to persist the database name as well, since it may change per recording run depending on who
// records it, is part of the request, and as such validated against.
std::fs::write(db_path, &database).unwrap();
std::fs::write(db_path, &database).expect("Failed to write database name to file");
let credential = Arc::new(ClientSecretCredential::new(
tenant_id.to_string(),
client_id.to_string(),
client_secret.to_string(),
tenant_id,
client_id,
client_secret,
TokenCredentialOptions::default(),
));
(service_url, credential, database)
} else {
let credential = Arc::new(DummyCredential {});
let database = String::from_utf8_lossy(&std::fs::read(db_path).unwrap()).to_string();
let database = String::from_utf8_lossy(
&std::fs::read(&db_path)
.expect(&format!("Could not read db path {}", db_path.display())),
)
.to_string();
(String::new(), credential, database)
};
let options = KustoClientOptions::new_with_transaction_name(transaction_name.to_string());
Ok((
KustoClient::new_with_options(service_url, credential, options).unwrap(),
(
KustoClient::new_with_options(service_url, credential, options)
.expect("Failed to create KustoClient"),
database,
))
)
}
/// Run cargo to get the root of the workspace
@ -82,10 +83,10 @@ fn workspace_root() -> Result<String, Box<dyn std::error::Error>> {
let key = "workspace_root\":\"";
let index = output
.find(key)
.ok_or_else(|| format!("workspace_root key not found in metadata"))?;
.ok_or_else(|| "workspace_root key not found in metadata".to_string())?;
let value = &output[index + key.len()..];
let end = value
.find("\"")
.ok_or_else(|| format!("workspace_root value was malformed"))?;
.find('\"')
.ok_or_else(|| "workspace_root value was malformed".to_string())?;
Ok(value[..end].into())
}