Re-did connection strings to be more rubost and simple.

Added docs.
Added benchmarking infostructure.
This commit is contained in:
AsafMah 2022-07-28 15:04:35 +03:00
Родитель 125a753d03
Коммит d06b818581
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: AD0D1680EEE7A4FF
11 изменённых файлов: 1048 добавлений и 392 удалений

Просмотреть файл

@ -47,9 +47,14 @@ env_logger = "0.9"
tokio = { version = "1.19.2", features = ["macros"] }
chrono = "0.4.19"
oauth2 = "4.2.0"
criterion = "0.3"
[features]
default = ["arrow"]
mock_transport_framework = ["azure_core/mock_transport_framework"]
#into_future = [] TODO - properly turn it on
test_e2e = []
[[bench]]
name = "connection_string"
harness = false

Просмотреть файл

@ -0,0 +1,14 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn parse_app_key() -> () {
use azure_kusto_data::error::Error;
use azure_kusto_data::prelude::*;
ConnectionString::from_raw_connection_string("Data Source=localhost ; Application Client Id=f6f295b1-0ce0-41f1-bba3-735accac0c69; Appkey =1234;Authority Id= 25184ef2-1dc0-4b05-84ae-f505bf7964f4 ; aad federated security = True").unwrap();
}
fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("parse app key", |b| b.iter(|| parse_app_key()));
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

Просмотреть файл

@ -22,11 +22,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let authority_id =
std::env::var("AZURE_TENANT_ID").expect("Set env variable AZURE_TENANT_ID first!");
let kcsb = ConnectionStringBuilder::new_with_aad_application_key_authentication(
&service_url,
&authority_id,
&client_id,
&client_secret,
let kcsb = ConnectionString::from_application_auth(
service_url,
client_id,
client_secret,
authority_id,
);
let client = KustoClient::try_from(kcsb).expect("Failed to create Kusto client");

Просмотреть файл

@ -35,11 +35,11 @@ struct Args {
async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse();
let kcsb = ConnectionStringBuilder::new_with_aad_application_key_authentication(
&args.endpoint,
&args.tenant_id,
&args.application_id,
&args.application_key,
let kcsb = ConnectionString::from_application_auth(
args.endpoint,
args.application_id,
args.tenant_id,
args.application_key,
);
let client = KustoClient::try_from(kcsb).unwrap();

Просмотреть файл

@ -3,7 +3,7 @@ use azure_core::{auth::TokenCredential, Context, Policy, PolicyResult, Request};
use std::sync::Arc;
#[derive(Clone)]
pub struct AuthorizationPolicy {
pub(crate) struct AuthorizationPolicy {
credential: Arc<dyn TokenCredential>,
resource: String,
}

Просмотреть файл

@ -1,14 +1,10 @@
use crate::authorization_policy::AuthorizationPolicy;
use crate::connection_string::{ConnectionString, ConnectionStringBuilder};
use crate::connection_string::ConnectionString;
use crate::error::Result;
use crate::operations::query::{QueryRunner, QueryRunnerBuilder, V1QueryRunner, V2QueryRunner};
use azure_core::auth::TokenCredential;
use azure_core::{ClientOptions, Context, Pipeline};
use azure_identity::{
AzureCliCredential, ClientSecretCredential, DefaultAzureCredential,
ImdsManagedIdentityCredential, TokenCredentialOptions,
};
use crate::request_options::RequestOptions;
use std::convert::TryFrom;
@ -68,26 +64,34 @@ pub struct KustoClient {
management_url: Arc<String>,
}
/// Denotes what kind of query is being executed.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueryKind {
/// A Management query. The returned type is [`KustoResponse::V1`](crate::query::KustoResponse::V1)
Management,
/// A KQL query. The returned type is [`KustoResponse::V2`](crate::query::KustoResponse::V2)
Query,
}
impl KustoClient {
pub fn new_with_options<T>(
url: T,
credential: Arc<dyn TokenCredential>,
options: KustoClientOptions,
) -> Result<Self>
where
T: Into<String>,
{
let service_url: String = url.into();
let service_url = service_url.trim_end_matches('/');
/// Create a new Kusto client.
/// This method accepts a connection string, that includes the Kusto cluster and the authentication information for the cluster.
/// # Example
/// ```rust
/// use azure_kusto_data::prelude::*;
///
/// let client = KustoClient::new(
/// ConnectionString::from_default_auth("https://mycluster.region.kusto.windows.net/".to_string()),
/// KustoClientOptions::default());
///
/// assert!(client.is_ok());
/// ```
pub fn new(connection_string: ConnectionString, options: KustoClientOptions) -> Result<Self> {
let (data_source, credentials) = connection_string.into_data_source_and_credentials();
let service_url = data_source.trim_end_matches('/');
let query_url = format!("{}/v2/rest/query", service_url);
let management_url = format!("{}/v1/rest/mgmt", service_url);
let pipeline = new_pipeline_from_options(credential, service_url, options);
let pipeline = new_pipeline_from_options(credentials, service_url, options);
Ok(Self {
pipeline: pipeline.into(),
@ -104,137 +108,165 @@ impl KustoClient {
&self.management_url
}
pub fn execute_with_options<DB, Q>(
pub(crate) fn pipeline(&self) -> &Pipeline {
&self.pipeline
}
/// Execute a query against the Kusto cluster.
/// The [kind] parameter determines whether the request is a query (retrieves data from the tables) or a management query (commands to monitor and manage the cluster).
/// This method should only be used if the query kind is not known at compile time, otherwise use [execute](#method.execute) or [execute_command](#method.execute_command).
/// # Example
/// ```no_run
/// use azure_kusto_data::prelude::*;
/// # #[tokio::main] async fn main() -> Result<(), Error> {
///
/// let client = KustoClient::new(
/// ConnectionString::from_default_auth("https://mycluster.region.kusto.windows.net/".to_string()),
/// KustoClientOptions::default())?;
///
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
/// let result = client.execute_with_options("some_database".into(), ".show version".into(), QueryKind::Management, None).into_future().await?;
///
/// assert!(matches!(result, KustoResponse::V1(..)));
/// # Ok(())}
/// ```
pub fn execute_with_options(
&self,
database: DB,
query: Q,
database: String,
query: String,
kind: QueryKind,
options: Option<RequestOptions>,
) -> QueryRunner
where
DB: Into<String>,
Q: Into<String>,
{
) -> QueryRunner {
QueryRunnerBuilder::default()
.with_kind(kind)
.with_client(self.clone())
.with_database(database.into())
.with_query(query.into())
.with_database(database)
.with_query(query)
.with_context(Context::new())
.with_options(options)
.build()
.expect("Unexpected error when building query runner - please report this issue to the Kusto team")
}
pub fn execute<DB, Q>(&self, database: DB, query: Q, kind: QueryKind) -> QueryRunner
where
DB: Into<String>,
Q: Into<String>,
{
self.execute_with_options(database, query, kind, None)
/// Execute a KQL query with additional request options.
/// To learn more about KQL go to [https://docs.microsoft.com/en-us/azure/kusto/query/](https://docs.microsoft.com/en-us/azure/kusto/query)
///
/// # Example
/// ```no_run
/// use azure_kusto_data::prelude::*;
/// # #[tokio::main] async fn main() -> Result<(), Error> {
/// use azure_kusto_data::client::QueryKind;
/// use azure_kusto_data::request_options::RequestOptions;
///
/// let client = KustoClient::new(
/// ConnectionString::from_default_auth("https://mycluster.region.kusto.windows.net/".to_string()),
/// KustoClientOptions::default())?;
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
/// let result = client.execute_query_with_options(
/// "some_database".into(),
/// "MyTable | take 10".into(),
/// Some(RequestOptionsBuilder::default().with_request_app_name("app name".to_string()).build().unwrap()))
/// .into_future().await?;
///
/// for table in result.into_primary_results() {
/// println!("{}", table.table_name);
/// }
/// # Ok(())}
/// ```
///
pub fn execute_query_with_options(
&self,
database: String,
query: String,
options: Option<RequestOptions>,
) -> V2QueryRunner {
V2QueryRunner(self.execute_with_options(database, query, QueryKind::Query, options))
}
/// Execute a KQL query.
/// To learn more about KQL go to [https://docs.microsoft.com/en-us/azure/kusto/query/](https://docs.microsoft.com/en-us/azure/kusto/query)
///
/// # Arguments
/// # Example
/// ```no_run
/// use azure_kusto_data::prelude::*;
///
/// * `database` - Name of the database in scope that is the target of the query
/// * `query` - Text of the query to execute
pub fn execute_query_with_options<DB, Q>(
&self,
database: DB,
query: Q,
options: Option<RequestOptions>,
) -> V2QueryRunner
where
DB: Into<String>,
Q: Into<String>,
{
V2QueryRunner(self.execute_with_options(database, query, QueryKind::Query, options))
}
pub fn execute_query<DB, Q>(&self, database: DB, query: Q) -> V2QueryRunner
where
DB: Into<String>,
Q: Into<String>,
{
/// # #[tokio::main] async fn main() -> Result<(), Error> {
/// let client = KustoClient::new(
/// ConnectionString::from_default_auth("https://mycluster.region.kusto.windows.net/".to_string()),
/// KustoClientOptions::default())?;
///
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
/// let result = client.execute_query("some_database".into(), "MyTable | take 10".into()).into_future().await?;
///
/// for table in result.into_primary_results() {
/// println!("{}", table.table_name);
/// }
/// # Ok(())}
/// ```
pub fn execute_query(&self, database: String, query: String) -> V2QueryRunner {
V2QueryRunner(self.execute_with_options(database, query, QueryKind::Query, None))
}
pub fn execute_command_with_options<DB, Q>(
/// Execute a management command with additional options.
/// To learn more about see [commands](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/)
///
/// # Example
/// ```no_run
/// use azure_kusto_data::prelude::*;
/// # #[tokio::main] async fn main() -> Result<(), Error> {
/// let client = KustoClient::new(
/// ConnectionString::from_default_auth("https://mycluster.region.kusto.windows.net/".to_string()),
/// KustoClientOptions::default())?;
///
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
/// let result = client.execute_command_with_options("some_database".into(), ".show version".into(),
/// Some(RequestOptionsBuilder::default().with_request_app_name("app name".to_string()).build().unwrap()))
/// .into_future().await?;
///
/// for table in result.tables {
/// println!("{}", table.table_name);
/// }
/// # Ok(())}
/// ```
pub fn execute_command_with_options(
&self,
database: DB,
query: Q,
database: String,
query: String,
options: Option<RequestOptions>,
) -> V1QueryRunner
where
DB: Into<String>,
Q: Into<String>,
{
) -> V1QueryRunner {
V1QueryRunner(self.execute_with_options(database, query, QueryKind::Management, options))
}
pub fn execute_command<DB, Q>(&self, database: DB, query: Q) -> V1QueryRunner
where
DB: Into<String>,
Q: Into<String>,
{
/// Execute a management command.
/// To learn more about see [commands](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/)
///
/// # Example
/// ```no_run
/// use azure_kusto_data::prelude::*;
///
/// # #[tokio::main] async fn main() -> Result<(), Error> {
///
/// let client = KustoClient::new(
/// ConnectionString::from_default_auth("https://mycluster.region.kusto.windows.net/".to_string()),
/// KustoClientOptions::default())?;
///
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
/// let result = client.execute_command("some_database".into(), ".show version".into()).into_future().await?;
///
/// for table in result.tables {
/// println!("{}", table.table_name);
/// }
/// # Ok(())}
/// ```
pub fn execute_command(&self, database: String, query: String) -> V1QueryRunner {
V1QueryRunner(self.execute_with_options(database, query, QueryKind::Management, None))
}
pub(crate) fn pipeline(&self) -> &Pipeline {
&self.pipeline
}
}
impl<'a> TryFrom<ConnectionString<'a>> for KustoClient {
impl TryFrom<ConnectionString> for KustoClient {
type Error = crate::error::Error;
fn try_from(value: ConnectionString) -> Result<Self> {
let service_url = value
.data_source
.expect("A data source / service url must always be specified");
let credential: Arc<dyn TokenCredential> = match value {
ConnectionString {
application_client_id: Some(client_id),
application_key: Some(client_secret),
authority_id: Some(tenant_id),
..
} => Arc::new(ClientSecretCredential::new(
tenant_id.to_string(),
client_id.to_string(),
client_secret.to_string(),
TokenCredentialOptions::default(),
)),
ConnectionString {
msi_auth: Some(true),
..
} => Arc::new(ImdsManagedIdentityCredential::default()),
ConnectionString {
az_cli: Some(true), ..
} => Arc::new(AzureCliCredential {}),
_ => Arc::new(DefaultAzureCredential::default()),
};
Self::new_with_options(service_url, credential, KustoClientOptions::new())
}
}
impl TryFrom<String> for KustoClient {
type Error = crate::error::Error;
fn try_from(value: String) -> Result<Self> {
let connection_string = ConnectionString::new(value.as_str())?;
Self::try_from(connection_string)
}
}
impl<'a> TryFrom<ConnectionStringBuilder<'a>> for KustoClient {
type Error = crate::error::Error;
fn try_from(value: ConnectionStringBuilder) -> Result<Self> {
let connection_string = value.build();
Self::try_from(connection_string)
Self::new(value, KustoClientOptions::new())
}
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -1,14 +1,16 @@
//! Defines `KustoRsError` for representing failures in various operations.
//! Defines [Error] for representing failures in various operations.
use std::fmt::Debug;
use std::num::TryFromIntError;
use thiserror;
/// Error type for kusto operations.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Raised when failing to convert a kusto response to the expected type.
#[error("Error converting Kusto response for {0}")]
ConversionError(String),
/// Error in external crate
/// Error in an external crate
#[error("Error in external crate {0}")]
ExternalError(String),
@ -37,20 +39,27 @@ pub enum Error {
UnsupportedOperation(String),
}
/// Errors raised when an invalid argument or option is provided.
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum InvalidArgumentError {
/// Error raised when a string denoting a duration is not valid.
#[error("{0} is not a valid duration")]
InvalidDuration(String),
/// Error raised when failing to convert a number to u32.
#[error("{0} is too large to fit in a u32")]
PayloadTooLarge(#[from] TryFromIntError),
}
/// Errors raised when parsing connection strings.
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum ConnectionStringError {
/// Raised when a connection string is missing a required key.
#[error("Missing value for key '{}'", key)]
MissingValue { key: String },
/// Raised when a connection string has an unexpected key.
#[error("Unexpected key '{}'", key)]
UnexpectedKey { key: String },
/// Raised when a connection string has an invalid value.
#[error("Parsing error: {}", msg)]
ParsingError { msg: String },
}

Просмотреть файл

@ -1,6 +1,12 @@
#![warn(missing_docs)]
//! # Azure Data Explorer Client Library
//! Query and explore data from Azure Data Explorer (Kusto).
//! Learn more about Azure Data Explorer at [https://docs.microsoft.com/en-us/azure/data-explorer/](https://docs.microsoft.com/en-us/azure/data-explorer/).
#[cfg(feature = "arrow")]
mod arrow;
pub mod authorization_policy;
mod authorization_policy;
pub mod client;
pub mod connection_string;
pub mod error;

Просмотреть файл

@ -11,10 +11,12 @@
//! use azure_kusto_data::prelude::*;
//! ```
pub use crate::client::{KustoClient, KustoClientOptions};
pub use crate::connection_string::ConnectionStringBuilder;
pub use crate::models::V2QueryResult;
pub use crate::client::{KustoClient, KustoClientOptions, QueryKind};
pub use crate::connection_string::{ConnectionString, ConnectionStringAuth};
pub use crate::error::Error;
pub use crate::models::{DataTable, V2QueryResult};
pub use crate::operations::query::{KustoResponse, KustoResponseDataSetV1, KustoResponseDataSetV2};
pub use crate::request_options::RequestOptions;
// Token credentials are re-exported for user convenience
pub use azure_identity::{

Просмотреть файл

@ -10,6 +10,7 @@ use time::{Duration, OffsetDateTime};
use crate::error::{Error, InvalidArgumentError};
use time::format_description::well_known::Rfc3339;
/// Represents a datetime field for kusto, for serialization and deserialization.
#[derive(PartialEq, Eq, Copy, Clone, DeserializeFromStr, SerializeDisplay)]
pub struct KustoDateTime(pub OffsetDateTime);
@ -54,6 +55,7 @@ impl Deref for KustoDateTime {
}
}
/// Represent a timespan for kusto, for serialization and deserialization.
#[derive(PartialEq, Eq, Copy, Clone, DeserializeFromStr, SerializeDisplay)]
pub struct KustoDuration(pub Duration);