Merge branch 'main' into cloud_info
# Conflicts: # azure-kusto-data/Cargo.toml # azure-kusto-data/src/client.rs # azure-kusto-data/src/connection_string.rs
This commit is contained in:
Коммит
b4057ca91a
|
@ -5,6 +5,7 @@ description = "Rust wrappers around Microsoft Azure REST APIs - Azure Data Explo
|
|||
readme = "README.md"
|
||||
license = "MIT"
|
||||
edition = "2021"
|
||||
rust-version = "1.65"
|
||||
repository = "https://github.com/azure/azure-sdk-for-rust"
|
||||
homepage = "https://github.com/azure/azure-sdk-for-rust"
|
||||
documentation = "https://docs.rs/azure_kusto_data"
|
||||
|
@ -12,44 +13,45 @@ keywords = ["sdk", "azure", "kusto", "azure-data-explorer"]
|
|||
categories = ["api-bindings"]
|
||||
|
||||
[dependencies]
|
||||
arrow = { version = "20.0.0", optional = true }
|
||||
azure_core = { version = "0.4.0", features = [
|
||||
arrow = { version = "28.0.0", optional = true }
|
||||
azure_core = { version = "0.7.0", features = [
|
||||
"enable_reqwest",
|
||||
"enable_reqwest_gzip",
|
||||
] }
|
||||
azure_identity = { version = "0.5.0" }
|
||||
async-trait = "0.1.57"
|
||||
azure_identity = { version = "0.8.0" }
|
||||
async-trait = "0.1.59"
|
||||
async-convert = "1.0.0"
|
||||
bytes = "1.2.1"
|
||||
futures = "0.3.23"
|
||||
serde = { version = "1.0.143", features = ["derive"] }
|
||||
serde_json = "1.0.81"
|
||||
serde_with = { version = "2.0.0", features = ["json"] }
|
||||
thiserror = "1.0.32"
|
||||
hashbrown = "0.12.3"
|
||||
regex = "1.6.0"
|
||||
time = { version = "0.3.13", features = [
|
||||
bytes = "1.3.0"
|
||||
futures = "0.3.25"
|
||||
serde = { version = "1.0.148", features = ["derive"] }
|
||||
serde_json = "1.0.89"
|
||||
serde_with = { version = "2.1.0", features = ["json"] }
|
||||
thiserror = "1.0.37"
|
||||
hashbrown = "0.13.1"
|
||||
regex = "1.7.0"
|
||||
time = { version = "0.3.17", features = [
|
||||
"serde",
|
||||
"parsing",
|
||||
"formatting",
|
||||
"macros",
|
||||
"serde-well-known",
|
||||
] }
|
||||
derive_builder = "0.11.2"
|
||||
once_cell = "1.13.0"
|
||||
derive_builder = "0.12"
|
||||
once_cell = "1.16.0"
|
||||
|
||||
[dev-dependencies]
|
||||
arrow = { version = "20.0.0", features = ["prettyprint"] }
|
||||
arrow = { version = "28.0.0", features = ["prettyprint"] }
|
||||
dotenv = "0.15.0"
|
||||
env_logger = "0.9"
|
||||
tokio = { version = "1.20.1", features = ["macros"] }
|
||||
oauth2 = "4.2.3"
|
||||
criterion = "0.3.6"
|
||||
clap = { version = "3.2.17", features = ["derive", "env"] }
|
||||
env_logger = "0.10.0"
|
||||
tokio = { version = "1.22.0", features = ["macros"] }
|
||||
oauth2 = "4.3.0"
|
||||
criterion = "0.4.0"
|
||||
clap = { version = "4", features = ["derive", "env"] }
|
||||
decimal = "2.0.0"
|
||||
uuid = { version = "1.2.2", features = [ "serde"] }
|
||||
|
||||
[features]
|
||||
default = ["arrow"]
|
||||
#into_future = [] TODO - properly turn it on
|
||||
test_e2e = []
|
||||
|
||||
[[bench]]
|
||||
|
|
|
@ -33,7 +33,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
|||
|
||||
let response = client
|
||||
.execute_command(database, query)
|
||||
.into_future()
|
||||
.await
|
||||
.expect("Failed to execute query");
|
||||
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
use azure_kusto_data::models::V2QueryResult;
|
||||
use azure_kusto_data::prelude::*;
|
||||
use azure_kusto_data::request_options::RequestOptionsBuilder;
|
||||
use azure_kusto_data::types::{KustoDateTime, KustoDuration};
|
||||
use clap::Parser;
|
||||
use futures::{pin_mut, TryStreamExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::error::Error;
|
||||
|
||||
/// Simple program to greet a person
|
||||
#[derive(Parser, Debug)]
|
||||
#[derive(Parser, Debug, Clone)]
|
||||
#[clap(author, version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// Kusto cluster endpoint
|
||||
|
@ -36,59 +39,30 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
let args = Args::parse();
|
||||
|
||||
let kcsb = ConnectionString::with_application_auth(
|
||||
args.endpoint,
|
||||
args.application_id,
|
||||
args.application_key,
|
||||
args.tenant_id,
|
||||
args.endpoint.clone(),
|
||||
args.application_id.clone(),
|
||||
args.application_key.clone(),
|
||||
args.tenant_id.clone(),
|
||||
);
|
||||
|
||||
let client = KustoClient::try_from(kcsb).unwrap();
|
||||
|
||||
println!("Querying {} with regular client", args.query);
|
||||
non_progressive(&args, &client).await;
|
||||
|
||||
let response = client
|
||||
.execute_query_with_options(
|
||||
args.database.clone(),
|
||||
args.query.clone(),
|
||||
Some(
|
||||
RequestOptionsBuilder::default()
|
||||
.with_results_progressive_enabled(false) // change to true to enable progressive results
|
||||
.build()
|
||||
.expect("Failed to create request options"),
|
||||
),
|
||||
)
|
||||
.into_future()
|
||||
.await
|
||||
.unwrap();
|
||||
progressive(&args, &client).await?;
|
||||
|
||||
println!("All results:");
|
||||
to_struct(&args, &client).await?;
|
||||
|
||||
for table in &response.results {
|
||||
match table {
|
||||
V2QueryResult::DataSetHeader(header) => println!("header: {:#?}", header),
|
||||
V2QueryResult::DataTable(table) => println!("table: {:#?}", table),
|
||||
V2QueryResult::DataSetCompletion(completion) => {
|
||||
println!("completion: {:#?}", completion)
|
||||
}
|
||||
V2QueryResult::TableHeader(header) => println!("header: {:#?}", header),
|
||||
V2QueryResult::TableFragment(fragment) => println!("fragment: {:#?}", fragment),
|
||||
V2QueryResult::TableProgress(progress) => println!("progress: {:#?}", progress),
|
||||
V2QueryResult::TableCompletion(completion) => {
|
||||
println!("completion: {:#?}", completion)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Print the primary tables
|
||||
let primary_results = response.into_primary_results().collect::<Vec<_>>();
|
||||
println!("primary results: {:#?}", primary_results);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn progressive(args: &Args, client: &KustoClient) -> Result<(), Box<dyn Error>> {
|
||||
println!("Querying {} with streaming client", args.query);
|
||||
|
||||
let stream = client
|
||||
.execute_query_with_options(
|
||||
args.database,
|
||||
args.query,
|
||||
args.database.clone(),
|
||||
args.query.clone(),
|
||||
Some(
|
||||
RequestOptionsBuilder::default()
|
||||
.with_results_progressive_enabled(true)
|
||||
|
@ -121,3 +95,87 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn non_progressive(args: &Args, client: &KustoClient) {
|
||||
println!("Querying {} with regular client", args.query);
|
||||
|
||||
let response = client
|
||||
.execute_query_with_options(
|
||||
args.database.clone(),
|
||||
args.query.clone(),
|
||||
Some(
|
||||
RequestOptionsBuilder::default()
|
||||
.with_results_progressive_enabled(false) // change to true to enable progressive results
|
||||
.build()
|
||||
.expect("Failed to create request options"),
|
||||
),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
println!("All results:");
|
||||
|
||||
for table in &response.results {
|
||||
match table {
|
||||
V2QueryResult::DataSetHeader(header) => println!("header: {:#?}", header),
|
||||
V2QueryResult::DataTable(table) => println!("table: {:#?}", table),
|
||||
V2QueryResult::DataSetCompletion(completion) => {
|
||||
println!("completion: {:#?}", completion)
|
||||
}
|
||||
V2QueryResult::TableHeader(header) => println!("header: {:#?}", header),
|
||||
V2QueryResult::TableFragment(fragment) => println!("fragment: {:#?}", fragment),
|
||||
V2QueryResult::TableProgress(progress) => println!("progress: {:#?}", progress),
|
||||
V2QueryResult::TableCompletion(completion) => {
|
||||
println!("completion: {:#?}", completion)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Print the primary tables
|
||||
let primary_results = response.into_primary_results().collect::<Vec<_>>();
|
||||
println!("primary results: {:#?}", primary_results);
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
struct Item {
|
||||
vnum: i32,
|
||||
vdec: String, // optionally, you can use a decimal type here
|
||||
vdate: KustoDateTime,
|
||||
vspan: KustoDuration,
|
||||
vobj: Value,
|
||||
vb: bool,
|
||||
vreal: f64,
|
||||
vstr: String,
|
||||
vlong: i64,
|
||||
vguid: String, // optionally, you can use a guid type here
|
||||
}
|
||||
|
||||
async fn to_struct(args: &Args, client: &KustoClient) -> Result<(), Box<dyn Error>> {
|
||||
let query = r#"datatable(vnum:int, vdec:decimal, vdate:datetime, vspan:timespan, vobj:dynamic, vb:bool, vreal:real, vstr:string, vlong:long, vguid:guid)
|
||||
[
|
||||
1, decimal(2.00000000000001), datetime(2020-03-04T14:05:01.3109965Z), time(01:23:45.6789000), dynamic({
|
||||
"moshe": "value"
|
||||
}), true, 0.01, "asdf", 9223372036854775807, guid(74be27de-1e4e-49d9-b579-fe0b331d3642),
|
||||
2, decimal(5.00000000000005), datetime(2022-05-06T16:07:03.1234300Z), time(04:56:59.9120000), dynamic({
|
||||
"moshe": "value2"
|
||||
}), false, 0.05, "qwerty", 9223372036854775806, guid(f6e97f76-8b73-45c0-b9ef-f68e8f897713),
|
||||
3, decimal(9.9999999999999), datetime(2023-07-08T18:09:05.5678000Z), time(07:43:12.3456000), dynamic({
|
||||
"moshe": "value3"
|
||||
}), true, 0.99, "zxcv", 9223372036854775805, guid(d8e3575c-a7a0-47b3-8c73-9a7a6aaabc12),
|
||||
]"#;
|
||||
|
||||
let response = client.execute_query(args.database.clone(), query).await?;
|
||||
|
||||
let results = response
|
||||
.into_primary_results()
|
||||
.next()
|
||||
.ok_or_else(|| "Expected to get a primary result, but got none".to_string())?;
|
||||
|
||||
let rows = results.rows;
|
||||
|
||||
let items = serde_json::from_value::<Vec<Item>>(Value::Array(rows))?;
|
||||
|
||||
println!("items: {:#?}", items);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -13,20 +13,21 @@ use arrow::{
|
|||
record_batch::RecordBatch,
|
||||
};
|
||||
use azure_core::error::{ErrorKind, ResultExt};
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::models::ColumnType;
|
||||
use crate::models::{Column, DataTable};
|
||||
use crate::types::{KustoDateTime, KustoDuration};
|
||||
|
||||
fn convert_array_string(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let strings: Vec<Option<String>> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_string(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let strings: Vec<Option<String>> = serde_json::from_value(Value::Array(values))?;
|
||||
let strings: Vec<Option<&str>> = strings.iter().map(Option::as_deref).collect();
|
||||
Ok(Arc::new(StringArray::from(strings)))
|
||||
}
|
||||
|
||||
fn convert_array_datetime(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let dates: Vec<String> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_datetime(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let dates: Vec<String> = serde_json::from_value(Value::Array(values))?;
|
||||
let timestamps = dates
|
||||
.into_iter()
|
||||
.map(|d| {
|
||||
|
@ -40,16 +41,16 @@ fn convert_array_datetime(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
|||
Ok(dates_array)
|
||||
}
|
||||
|
||||
fn safe_map_f64(value: serde_json::Value) -> Result<Option<f64>> {
|
||||
fn safe_map_f64(value: Value) -> Result<Option<f64>> {
|
||||
match value {
|
||||
serde_json::Value::String(val) if val == "NaN" => Ok(None),
|
||||
serde_json::Value::String(val) if val == "Infinity" => Ok(Some(f64::INFINITY)),
|
||||
serde_json::Value::String(val) if val == "-Infinity" => Ok(Some(-f64::INFINITY)),
|
||||
Value::String(val) if val == "NaN" => Ok(None),
|
||||
Value::String(val) if val == "Infinity" => Ok(Some(f64::INFINITY)),
|
||||
Value::String(val) if val == "-Infinity" => Ok(Some(-f64::INFINITY)),
|
||||
_ => Ok(serde_json::from_value(value)?),
|
||||
}
|
||||
}
|
||||
|
||||
fn convert_array_float(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
fn convert_array_float(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let reals: Vec<Option<f64>> = values
|
||||
.into_iter()
|
||||
.map(safe_map_f64)
|
||||
|
@ -57,8 +58,8 @@ fn convert_array_float(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
|||
Ok(Arc::new(Float64Array::from(reals)))
|
||||
}
|
||||
|
||||
fn convert_array_timespan(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let strings: Vec<String> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_timespan(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let strings: Vec<String> = serde_json::from_value(Value::Array(values))?;
|
||||
let durations: Vec<Option<i64>> = strings
|
||||
.iter()
|
||||
.map(|s| {
|
||||
|
@ -70,22 +71,22 @@ fn convert_array_timespan(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
|||
Ok(Arc::new(DurationNanosecondArray::from(durations)))
|
||||
}
|
||||
|
||||
fn convert_array_bool(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let bools: Vec<Option<bool>> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_bool(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let bools: Vec<Option<bool>> = serde_json::from_value(Value::Array(values))?;
|
||||
Ok(Arc::new(BooleanArray::from(bools)))
|
||||
}
|
||||
|
||||
fn convert_array_i32(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let ints: Vec<Option<i32>> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_i32(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let ints: Vec<Option<i32>> = serde_json::from_value(Value::Array(values))?;
|
||||
Ok(Arc::new(Int32Array::from(ints)))
|
||||
}
|
||||
|
||||
fn convert_array_i64(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let ints: Vec<Option<i64>> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_i64(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let ints: Vec<Option<i64>> = serde_json::from_value(Value::Array(values))?;
|
||||
Ok(Arc::new(Int64Array::from(ints)))
|
||||
}
|
||||
|
||||
pub fn convert_column(data: Vec<serde_json::Value>, column: &Column) -> Result<(Field, ArrayRef)> {
|
||||
pub fn convert_column(data: Vec<Value>, column: &Column) -> Result<(Field, ArrayRef)> {
|
||||
let column_name = &column.column_name;
|
||||
match column.column_type {
|
||||
ColumnType::String => convert_array_string(data)
|
||||
|
@ -119,17 +120,20 @@ pub fn convert_column(data: Vec<serde_json::Value>, column: &Column) -> Result<(
|
|||
}
|
||||
|
||||
pub fn convert_table(table: DataTable) -> Result<RecordBatch> {
|
||||
let mut buffer: Vec<Vec<serde_json::Value>> = Vec::with_capacity(table.columns.len());
|
||||
let mut buffer: Vec<Vec<Value>> = Vec::with_capacity(table.columns.len());
|
||||
let mut fields: Vec<Field> = Vec::with_capacity(table.columns.len());
|
||||
let mut columns: Vec<ArrayRef> = Vec::with_capacity(table.columns.len());
|
||||
|
||||
for _ in 0..table.columns.len() {
|
||||
buffer.push(Vec::with_capacity(table.rows.len()));
|
||||
}
|
||||
table.rows.into_iter().for_each(|row| {
|
||||
row.into_iter()
|
||||
.enumerate()
|
||||
.for_each(|(idx, value)| buffer[idx].push(value))
|
||||
table.rows.into_iter().for_each(|row| match row {
|
||||
Value::Array(v) => {
|
||||
v.into_iter().enumerate().for_each(|(i, v)| {
|
||||
buffer[i].push(v);
|
||||
});
|
||||
}
|
||||
_ => unreachable!("Must be an array"),
|
||||
});
|
||||
|
||||
buffer
|
||||
|
|
|
@ -2,12 +2,13 @@
|
|||
|
||||
use crate::authorization_policy::AuthorizationPolicy;
|
||||
use crate::connection_string::{ConnectionString, ConnectionStringAuth};
|
||||
use crate::error::Result;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::operations::query::{QueryRunner, QueryRunnerBuilder, V1QueryRunner, V2QueryRunner};
|
||||
|
||||
use azure_core::{ClientOptions, Context, Pipeline};
|
||||
|
||||
use crate::request_options::RequestOptions;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
@ -89,8 +90,8 @@ impl KustoClient {
|
|||
pub fn new(connection_string: ConnectionString, options: KustoClientOptions) -> Result<Self> {
|
||||
let (data_source, credentials) = connection_string.into_data_source_and_auth();
|
||||
let service_url = Arc::new(data_source.trim_end_matches('/').to_string());
|
||||
let query_url = format!("{}/v2/rest/query", service_url);
|
||||
let management_url = format!("{}/v1/rest/mgmt", service_url);
|
||||
let query_url = format!("{service_url}/v2/rest/query");
|
||||
let management_url = format!("{service_url}/v1/rest/mgmt");
|
||||
let pipeline = new_pipeline_from_options(credentials, (*service_url).clone(), options);
|
||||
|
||||
Ok(Self {
|
||||
|
@ -125,8 +126,7 @@ impl KustoClient {
|
|||
/// ConnectionString::with_default_auth("https://mycluster.region.kusto.windows.net/"),
|
||||
/// KustoClientOptions::default())?;
|
||||
///
|
||||
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
|
||||
/// let result = client.execute_with_options("some_database", ".show version", QueryKind::Management, None).into_future().await?;
|
||||
/// let result = client.execute_with_options("some_database", ".show version", QueryKind::Management, None).await?;
|
||||
///
|
||||
/// assert!(matches!(result, KustoResponse::V1(..)));
|
||||
/// # Ok(())}
|
||||
|
@ -163,12 +163,11 @@ impl KustoClient {
|
|||
/// let client = KustoClient::new(
|
||||
/// ConnectionString::with_default_auth("https://mycluster.region.kusto.windows.net/"),
|
||||
/// KustoClientOptions::default())?;
|
||||
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
|
||||
/// let result = client.execute_query_with_options(
|
||||
/// "some_database",
|
||||
/// "MyTable | take 10",
|
||||
/// Some(RequestOptionsBuilder::default().with_request_app_name("app name").build().unwrap()))
|
||||
/// .into_future().await?;
|
||||
/// .await?;
|
||||
///
|
||||
/// for table in result.into_primary_results() {
|
||||
/// println!("{}", table.table_name);
|
||||
|
@ -198,8 +197,7 @@ impl KustoClient {
|
|||
/// ConnectionString::with_default_auth("https://mycluster.region.kusto.windows.net/"),
|
||||
/// KustoClientOptions::default())?;
|
||||
///
|
||||
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
|
||||
/// let result = client.execute_query("some_database", "MyTable | take 10").into_future().await?;
|
||||
/// let result = client.execute_query("some_database", "MyTable | take 10").await?;
|
||||
///
|
||||
/// for table in result.into_primary_results() {
|
||||
/// println!("{}", table.table_name);
|
||||
|
@ -215,6 +213,52 @@ impl KustoClient {
|
|||
V2QueryRunner(self.execute_with_options(database, query, QueryKind::Query, None))
|
||||
}
|
||||
|
||||
/// Execute a KQL query into an array of structs.
|
||||
/// To learn more about KQL go to [https://docs.microsoft.com/en-us/azure/kusto/query/](https://docs.microsoft.com/en-us/azure/kusto/query)
|
||||
///
|
||||
/// This method is the simplest way to just convert your data into a struct.
|
||||
/// It assumes there is one primary result table.
|
||||
///
|
||||
/// Your struct should implement the [serde::DeserializeOwned](https://docs.serde.rs/serde/trait.DeserializeOwned.html) trait.
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// use azure_kusto_data::prelude::*;
|
||||
/// use serde::Deserialize;
|
||||
///
|
||||
/// #[derive(serde::Deserialize, Debug)]
|
||||
/// struct MyStruct {
|
||||
/// name: String,
|
||||
/// age: u32,
|
||||
/// }
|
||||
///
|
||||
/// # #[tokio::main] async fn main() -> Result<(), Error> {
|
||||
/// let client = KustoClient::new(
|
||||
/// ConnectionString::with_default_auth("https://mycluster.region.kusto.windows.net/"),
|
||||
/// KustoClientOptions::default())?;
|
||||
///
|
||||
/// let result: Vec<MyStruct> = client.execute_query_to_struct("some_database", "MyTable | take 10").await?;
|
||||
/// println!("{:?}", result); // prints [MyStruct { name: "foo", age: 42 }, MyStruct { name: "bar", age: 43 }]
|
||||
///
|
||||
/// # Ok(())}
|
||||
/// ```
|
||||
pub async fn execute_query_to_struct<T: DeserializeOwned>(
|
||||
&self,
|
||||
database: impl Into<String>,
|
||||
query: impl Into<String>,
|
||||
) -> Result<Vec<T>> {
|
||||
let response = self.execute_query(database, query).await?;
|
||||
|
||||
let results = response
|
||||
.into_primary_results()
|
||||
.next()
|
||||
.ok_or_else(|| Error::QueryError("No primary results found".into()))?;
|
||||
|
||||
Ok(serde_json::from_value::<Vec<T>>(serde_json::Value::Array(
|
||||
results.rows,
|
||||
))?)
|
||||
}
|
||||
|
||||
/// Execute a management command with additional options.
|
||||
/// To learn more about see [commands](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/)
|
||||
///
|
||||
|
@ -226,10 +270,9 @@ impl KustoClient {
|
|||
/// ConnectionString::with_default_auth("https://mycluster.region.kusto.windows.net/"),
|
||||
/// KustoClientOptions::default())?;
|
||||
///
|
||||
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
|
||||
/// let result = client.execute_command_with_options("some_database", ".show version",
|
||||
/// Some(RequestOptionsBuilder::default().with_request_app_name("app name").build().unwrap()))
|
||||
/// .into_future().await?;
|
||||
/// .await?;
|
||||
///
|
||||
/// for table in result.tables {
|
||||
/// println!("{}", table.table_name);
|
||||
|
@ -259,8 +302,7 @@ impl KustoClient {
|
|||
/// ConnectionString::with_default_auth("https://mycluster.region.kusto.windows.net/"),
|
||||
/// KustoClientOptions::default())?;
|
||||
///
|
||||
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
|
||||
/// let result = client.execute_command("some_database", ".show version").into_future().await?;
|
||||
/// let result = client.execute_command("some_database", ".show version").await?;
|
||||
///
|
||||
/// for table in result.tables {
|
||||
/// println!("{}", table.table_name);
|
||||
|
@ -278,7 +320,7 @@ impl KustoClient {
|
|||
}
|
||||
|
||||
impl TryFrom<ConnectionString> for KustoClient {
|
||||
type Error = crate::error::Error;
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: ConnectionString) -> Result<Self> {
|
||||
Self::new(value, KustoClientOptions::new())
|
||||
|
|
|
@ -440,10 +440,10 @@ impl Debug for ConnectionStringAuth {
|
|||
match self {
|
||||
ConnectionStringAuth::Default => write!(f, "Default"),
|
||||
ConnectionStringAuth::UserAndPassword { user_id, password } => {
|
||||
write!(f, "UserAndPassword({}, {})", user_id, password)
|
||||
write!(f, "UserAndPassword({user_id}, {password})")
|
||||
}
|
||||
ConnectionStringAuth::Token { token, .. } => {
|
||||
write!(f, "Token({})", token)
|
||||
write!(f, "Token({token})")
|
||||
}
|
||||
ConnectionStringAuth::TokenCallback { .. } => write!(f, "TokenCallback"),
|
||||
ConnectionStringAuth::Application {
|
||||
|
@ -452,8 +452,7 @@ impl Debug for ConnectionStringAuth {
|
|||
client_secret,
|
||||
} => write!(
|
||||
f,
|
||||
"Application({}, {}, {})",
|
||||
client_id, client_authority, client_secret
|
||||
"Application({client_id}, {client_authority}, {client_secret})"
|
||||
),
|
||||
ConnectionStringAuth::ApplicationCertificate {
|
||||
client_id,
|
||||
|
@ -1022,8 +1021,7 @@ fn parse_boolean(term: &str, name: &str) -> Result<bool, ConnectionStringError>
|
|||
"true" => Ok(true),
|
||||
"false" => Ok(false),
|
||||
_ => Err(ConnectionStringError::from_parsing_error(format!(
|
||||
"Unexpected value for {}: {}. Please specify either 'true' or 'false'.",
|
||||
name, term
|
||||
"Unexpected value for {name}: {term}. Please specify either 'true' or 'false'."
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,10 @@ pub enum Error {
|
|||
/// Errors raised when the operation is not supported
|
||||
#[error("Operation not supported: {0}")]
|
||||
UnsupportedOperation(String),
|
||||
|
||||
/// Errors raised when the query is invalid
|
||||
#[error("Invalid query: {0}")]
|
||||
QueryError(String),
|
||||
}
|
||||
|
||||
/// Errors raised when an invalid argument or option is provided.
|
||||
|
|
|
@ -143,7 +143,7 @@ pub struct DataTable {
|
|||
/// Columns in the table.
|
||||
pub columns: Vec<Column>,
|
||||
/// Rows in the table. Each row is a list of values, corresponding to the columns in the table.
|
||||
pub rows: Vec<Vec<serde_json::Value>>,
|
||||
pub rows: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// A header of a fragment of a table (in progressive mode).
|
||||
|
@ -180,7 +180,7 @@ pub struct TableFragment {
|
|||
/// The type of the fragment, instructs to how to use it.
|
||||
pub table_fragment_type: TableFragmentType,
|
||||
/// Rows in the table. Each row is a list of values, corresponding to the columns in the TableHeader.
|
||||
pub rows: Vec<Vec<serde_json::Value>>,
|
||||
pub rows: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// Progress report for a table (in progressive mode).
|
||||
|
|
|
@ -18,6 +18,7 @@ use futures::future::BoxFuture;
|
|||
use futures::{Stream, TryFutureExt, TryStreamExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::future::IntoFuture;
|
||||
use std::io::ErrorKind;
|
||||
|
||||
type QueryRun = BoxFuture<'static, Result<KustoResponse>>;
|
||||
|
@ -47,29 +48,7 @@ pub struct V1QueryRunner(pub QueryRunner);
|
|||
|
||||
pub struct V2QueryRunner(pub QueryRunner);
|
||||
|
||||
impl V1QueryRunner {
|
||||
pub fn into_future(self) -> V1QueryRun {
|
||||
Box::pin(async {
|
||||
let V1QueryRunner(query_runner) = self;
|
||||
let future = query_runner.into_future().await?;
|
||||
Ok(
|
||||
std::convert::TryInto::try_into(future).expect("Unexpected conversion error from KustoResponse to KustoResponseDataSetV1 - please report this issue to the Kusto team")
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl V2QueryRunner {
|
||||
pub fn into_future(self) -> V2QueryRun {
|
||||
Box::pin(async {
|
||||
let V2QueryRunner(query_runner) = self;
|
||||
let future = query_runner.into_future().await?;
|
||||
Ok(
|
||||
std::convert::TryInto::try_into(future).expect("Unexpected conversion error from KustoResponse to KustoResponseDataSetV2 - please report this issue to the Kusto team")
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn into_stream(self) -> Result<impl Stream<Item = Result<V2QueryResult>>> {
|
||||
let V2QueryRunner(query_runner) = self;
|
||||
query_runner.into_stream().await
|
||||
|
@ -77,27 +56,6 @@ impl V2QueryRunner {
|
|||
}
|
||||
|
||||
impl QueryRunner {
|
||||
pub fn into_future(self) -> QueryRun {
|
||||
let this = self.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
let response = self.into_response().await?;
|
||||
|
||||
Ok(match this.kind {
|
||||
QueryKind::Management => {
|
||||
<KustoResponseDataSetV1 as TryFrom<HttpResponse>>::try_from(response)
|
||||
.map_ok(KustoResponse::V1)
|
||||
.await?
|
||||
}
|
||||
QueryKind::Query => {
|
||||
<KustoResponseDataSetV2 as TryFrom<HttpResponse>>::try_from(response)
|
||||
.map_ok(KustoResponse::V2)
|
||||
.await?
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async fn into_response(self) -> Result<Response> {
|
||||
let url = match self.kind {
|
||||
QueryKind::Management => self.client.management_url(),
|
||||
|
@ -155,6 +113,62 @@ impl QueryRunner {
|
|||
}
|
||||
}
|
||||
|
||||
impl IntoFuture for V1QueryRunner {
|
||||
type IntoFuture = V1QueryRun;
|
||||
type Output = Result<KustoResponseDataSetV1>;
|
||||
|
||||
fn into_future(self) -> V1QueryRun {
|
||||
Box::pin(async {
|
||||
let V1QueryRunner(query_runner) = self;
|
||||
let future = query_runner.into_future().await?;
|
||||
Ok(
|
||||
std::convert::TryInto::try_into(future).expect("Unexpected conversion error from KustoResponse to KustoResponseDataSetV1 - please report this issue to the Kusto team")
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoFuture for V2QueryRunner {
|
||||
type IntoFuture = V2QueryRun;
|
||||
type Output = Result<KustoResponseDataSetV2>;
|
||||
|
||||
fn into_future(self) -> V2QueryRun {
|
||||
Box::pin(async {
|
||||
let V2QueryRunner(query_runner) = self;
|
||||
let future = query_runner.into_future().await?;
|
||||
Ok(
|
||||
std::convert::TryInto::try_into(future).expect("Unexpected conversion error from KustoResponse to KustoResponseDataSetV2 - please report this issue to the Kusto team")
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoFuture for QueryRunner {
|
||||
type IntoFuture = QueryRun;
|
||||
type Output = Result<KustoResponse>;
|
||||
|
||||
fn into_future(self) -> QueryRun {
|
||||
let this = self.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
let response = self.into_response().await?;
|
||||
|
||||
Ok(match this.kind {
|
||||
QueryKind::Management => {
|
||||
<KustoResponseDataSetV1 as TryFrom<HttpResponse>>::try_from(response)
|
||||
.map_ok(KustoResponse::V1)
|
||||
.await?
|
||||
}
|
||||
QueryKind::Query => {
|
||||
<KustoResponseDataSetV2 as TryFrom<HttpResponse>>::try_from(response)
|
||||
.map_ok(KustoResponse::V2)
|
||||
.await?
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// A Kusto query response.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum KustoResponse {
|
||||
|
@ -410,7 +424,7 @@ impl KustoResponseDataSetV2 {
|
|||
/// table_name: "table_1".to_string(),
|
||||
/// table_kind: TableKind::PrimaryResult,
|
||||
/// columns: vec![Column{column_name: "col1".to_string(), column_type: ColumnType::Long}],
|
||||
/// rows: vec![vec![Value::from(3u64)]],
|
||||
/// rows: vec![Value::Array(vec![Value::from(3u64)])],
|
||||
/// }),
|
||||
/// V2QueryResult::TableHeader(TableHeader {
|
||||
/// table_id: 1,
|
||||
|
@ -420,7 +434,7 @@ impl KustoResponseDataSetV2 {
|
|||
/// }),
|
||||
/// V2QueryResult::TableFragment(TableFragment {
|
||||
/// table_id: 1,
|
||||
/// rows: vec![vec![Value::from("first")], vec![Value::from("second")]],
|
||||
/// rows: vec![Value::Array(vec![Value::from("first")]), Value::Array(vec![Value::from("second")])],
|
||||
/// field_count: Some(1),
|
||||
/// table_fragment_type: TableFragmentType::DataAppend,
|
||||
/// }),
|
||||
|
@ -515,16 +529,6 @@ impl TryFrom<HttpResponse> for KustoResponseDataSetV1 {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO enable once in stable
|
||||
// #[cfg(feature = "into_future")]
|
||||
// impl std::future::IntoFuture for ExecuteQueryBuilder {
|
||||
// type IntoFuture = ExecuteQuery;
|
||||
// type Output = <ExecuteQuery as std::future::Future>::Output;
|
||||
// fn into_future(self) -> Self::IntoFuture {
|
||||
// Self::into_future(self)
|
||||
// }
|
||||
// }
|
||||
|
||||
pub fn prepare_request(url: Url, http_method: Method) -> Request {
|
||||
const API_VERSION: &str = "2019-02-13";
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ impl Display for KustoDateTime {
|
|||
|
||||
impl Debug for KustoDateTime {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self)
|
||||
write!(f, "{self}")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -141,7 +141,7 @@ impl Display for KustoDuration {
|
|||
|
||||
impl Debug for KustoDuration {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self)
|
||||
write!(f, "{self}")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,6 @@ async fn arrow_roundtrip() {
|
|||
";
|
||||
let response = client
|
||||
.execute_query(database, query)
|
||||
.into_future()
|
||||
.await
|
||||
.expect("Failed to run query");
|
||||
let batches = response
|
||||
|
@ -73,8 +72,8 @@ async fn arrow_roundtrip() {
|
|||
"+----+------------+----------+---------+------------+-----------+---------------------+",
|
||||
"| id | string_col | bool_col | int_col | bigint_col | float_col | timestamp_col |",
|
||||
"+----+------------+----------+---------+------------+-----------+---------------------+",
|
||||
"| 6 | Hello | true | 0 | 0 | 0 | 2009-04-01 00:00:00 |",
|
||||
"| 7 | World | false | 1 | 10 | 1.1 | 2009-04-01 00:01:00 |",
|
||||
"| 6 | Hello | true | 0 | 0 | 0 | 2009-04-01T00:00:00 |",
|
||||
"| 7 | World | false | 1 | 10 | 1.1 | 2009-04-01T00:01:00 |",
|
||||
"+----+------------+----------+---------+------------+-----------+---------------------+",
|
||||
];
|
||||
assert_batches_eq!(
|
||||
|
|
|
@ -1,13 +1,47 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use std::str::FromStr;
|
||||
use time::Duration;
|
||||
|
||||
use azure_kusto_data::types::{KustoDateTime, KustoDuration};
|
||||
use decimal::d128;
|
||||
use uuid::Uuid;
|
||||
|
||||
mod setup;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
|
||||
struct Item {
|
||||
vnum: i32,
|
||||
vdec: d128,
|
||||
vdate: KustoDateTime,
|
||||
vspan: KustoDuration,
|
||||
vobj: Value,
|
||||
vb: bool,
|
||||
vreal: f64,
|
||||
vstr: String,
|
||||
vlong: i64,
|
||||
vguid: Uuid,
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn create_query_delete_table() {
|
||||
let (client, database) = setup::create_kusto_client();
|
||||
|
||||
let query = ".set KustoRsTest <| let text=\"Hello, World!\"; print str=text";
|
||||
let query = r#".set-or-replace KustoRsTest <| datatable(vnum:int, vdec:decimal, vdate:datetime, vspan:timespan, vobj:dynamic, vb:bool, vreal:real, vstr:string, vlong:long, vguid:guid)
|
||||
[
|
||||
1, decimal(2.00000000000001), datetime(2020-03-04T14:05:01.3109965Z), time(01:23:45.6789000), dynamic({
|
||||
"moshe": "value"
|
||||
}), true, 0.01, "asdf", 9223372036854775807, guid(74be27de-1e4e-49d9-b579-fe0b331d3642),
|
||||
2, decimal(5.00000000000005), datetime(2022-05-06T16:07:03.1234300Z), time(04:56:59.9120000), dynamic({
|
||||
"moshe": "value2"
|
||||
}), false, 0.05, "qwerty", 9223372036854775806, guid(f6e97f76-8b73-45c0-b9ef-f68e8f897713),
|
||||
3, decimal(9.9999999999999), datetime(2023-07-08T18:09:05.5678000Z), time(07:43:12.3456000), dynamic({
|
||||
"moshe": "value3"
|
||||
}), true, 0.99, "zxcv", 9223372036854775805, guid(d8e3575c-a7a0-47b3-8c73-9a7a6aaabc12),
|
||||
]
|
||||
"#;
|
||||
let response = client
|
||||
.execute_command(database.clone(), query)
|
||||
.into_future()
|
||||
.await
|
||||
.expect("Failed to run query");
|
||||
|
||||
|
@ -16,26 +50,83 @@ async fn create_query_delete_table() {
|
|||
let query = ".show tables | where TableName == \"KustoRsTest\"";
|
||||
let response = client
|
||||
.execute_command(database.clone(), query)
|
||||
.into_future()
|
||||
.await
|
||||
.expect("Failed to run query");
|
||||
|
||||
assert_eq!(response.table_count(), 4);
|
||||
|
||||
let query = "KustoRsTest | take 1";
|
||||
let query = "KustoRsTest";
|
||||
let response = client
|
||||
.execute_query(database.clone(), query)
|
||||
.into_future()
|
||||
.await
|
||||
.expect("Failed to run query");
|
||||
|
||||
let results = response.into_primary_results().collect::<Vec<_>>();
|
||||
assert_eq!(results[0].rows.len(), 1);
|
||||
let results = response.into_primary_results().next().expect("No results");
|
||||
|
||||
let rows = results.rows;
|
||||
|
||||
let expected = vec![
|
||||
Item {
|
||||
vnum: 1,
|
||||
vdec: d128!(2.00000000000001),
|
||||
vdate: KustoDateTime::from_str("2020-03-04T14:05:01.3109965Z").unwrap(),
|
||||
vspan: KustoDuration::from(
|
||||
Duration::seconds(3600 + 23 * 60 + 45) + Duration::microseconds(678900),
|
||||
),
|
||||
vobj: Value::Object(serde_json::Map::from_iter(vec![(
|
||||
"moshe".to_string(),
|
||||
Value::String("value".to_string()),
|
||||
)])),
|
||||
vb: true,
|
||||
vreal: 0.01,
|
||||
vstr: "asdf".to_string(),
|
||||
vlong: 9223372036854775807,
|
||||
vguid: Uuid::parse_str("74be27de-1e4e-49d9-b579-fe0b331d3642").unwrap(),
|
||||
},
|
||||
Item {
|
||||
vnum: 2,
|
||||
vdec: d128!(5.00000000000005),
|
||||
vdate: KustoDateTime::from_str("2022-05-06T16:07:03.1234300Z").unwrap(),
|
||||
vspan: KustoDuration::from(
|
||||
Duration::seconds(4 * 3600 + 56 * 60 + 59) + Duration::microseconds(912000),
|
||||
),
|
||||
vobj: Value::Object(serde_json::Map::from_iter(vec![(
|
||||
"moshe".to_string(),
|
||||
Value::String("value2".to_string()),
|
||||
)])),
|
||||
vb: false,
|
||||
vreal: 0.05,
|
||||
vstr: "qwerty".to_string(),
|
||||
vlong: 9223372036854775806,
|
||||
vguid: Uuid::parse_str("f6e97f76-8b73-45c0-b9ef-f68e8f897713").unwrap(),
|
||||
},
|
||||
Item {
|
||||
vnum: 3,
|
||||
vdec: d128!(9.9999999999999),
|
||||
vdate: KustoDateTime::from_str("2023-07-08T18:09:05.5678000Z").unwrap(),
|
||||
vspan: KustoDuration::from(
|
||||
Duration::seconds(7 * 3600 + 43 * 60 + 12) + Duration::microseconds(345600),
|
||||
),
|
||||
vobj: Value::Object(serde_json::Map::from_iter(vec![(
|
||||
"moshe".to_string(),
|
||||
Value::String("value3".to_string()),
|
||||
)])),
|
||||
vb: true,
|
||||
vreal: 0.99,
|
||||
vstr: "zxcv".to_string(),
|
||||
vlong: 9223372036854775805,
|
||||
vguid: Uuid::parse_str("d8e3575c-a7a0-47b3-8c73-9a7a6aaabc12").unwrap(),
|
||||
},
|
||||
];
|
||||
|
||||
let items =
|
||||
serde_json::from_value::<Vec<Item>>(Value::Array(rows)).expect("Failed to deserialize");
|
||||
|
||||
assert_eq!(items, expected);
|
||||
|
||||
let query = ".drop table KustoRsTest | where TableName == \"KustoRsTest\"";
|
||||
let response = client
|
||||
.execute_command(database.clone(), query)
|
||||
.into_future()
|
||||
.await
|
||||
.expect("Failed to run query");
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче