Merge pull request #22 from Azure/convert-to-struct

This commit is contained in:
AsafMah 2022-12-07 10:11:37 +02:00 коммит произвёл GitHub
Родитель f93092aeae 049eb514e3
Коммит f49892169f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 298 добавлений и 97 удалений

Просмотреть файл

@ -5,7 +5,7 @@ description = "Rust wrappers around Microsoft Azure REST APIs - Azure Data Explo
readme = "README.md"
license = "MIT"
edition = "2021"
rust-version = "1.64"
rust-version = "1.65"
repository = "https://github.com/azure/azure-sdk-for-rust"
homepage = "https://github.com/azure/azure-sdk-for-rust"
documentation = "https://docs.rs/azure_kusto_data"
@ -13,24 +13,24 @@ keywords = ["sdk", "azure", "kusto", "azure-data-explorer"]
categories = ["api-bindings"]
[dependencies]
arrow = { version = "26.0.0", optional = true }
arrow = { version = "28.0.0", optional = true }
azure_core = { version = "0.7.0", features = [
"enable_reqwest",
"enable_reqwest_gzip",
] }
azure_identity = { version = "0.8.0" }
async-trait = "0.1.57"
async-trait = "0.1.59"
async-convert = "1.0.0"
bytes = "1.2.1"
futures = "0.3.23"
bytes = "1.3.0"
futures = "0.3.25"
http = "0.2.8"
serde = { version = "1.0.143", features = ["derive"] }
serde_json = "1.0.81"
serde_with = { version = "2.0.0", features = ["json"] }
thiserror = "1.0.32"
hashbrown = "0.13"
regex = "1.6.0"
time = { version = "0.3.13", features = [
serde = { version = "1.0.148", features = ["derive"] }
serde_json = "1.0.89"
serde_with = { version = "2.1.0", features = ["json"] }
thiserror = "1.0.37"
hashbrown = "0.13.1"
regex = "1.7.0"
time = { version = "0.3.17", features = [
"serde",
"parsing",
"formatting",
@ -38,16 +38,18 @@ time = { version = "0.3.13", features = [
"serde-well-known",
] }
derive_builder = "0.12"
once_cell = "1.13.0"
once_cell = "1.16.0"
[dev-dependencies]
arrow = { version = "26.0.0", features = ["prettyprint"] }
arrow = { version = "28.0.0", features = ["prettyprint"] }
dotenv = "0.15.0"
env_logger = "0.10"
tokio = { version = "1.20.1", features = ["macros"] }
oauth2 = "4.2.3"
criterion = "0.4"
env_logger = "0.10.0"
tokio = { version = "1.22.0", features = ["macros"] }
oauth2 = "4.3.0"
criterion = "0.4.0"
clap = { version = "4", features = ["derive", "env"] }
decimal = "2.0.0"
uuid = { version = "1.2.2", features = [ "serde"] }
[features]
default = ["arrow"]

Просмотреть файл

@ -1,12 +1,15 @@
use azure_kusto_data::models::V2QueryResult;
use azure_kusto_data::prelude::*;
use azure_kusto_data::request_options::RequestOptionsBuilder;
use azure_kusto_data::types::{KustoDateTime, KustoDuration};
use clap::Parser;
use futures::{pin_mut, TryStreamExt};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::error::Error;
/// Simple program to greet a person
#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
#[clap(author, version, about, long_about = None)]
struct Args {
/// Kusto cluster endpoint
@ -36,14 +39,64 @@ async fn main() -> Result<(), Box<dyn Error>> {
let args = Args::parse();
let kcsb = ConnectionString::with_application_auth(
args.endpoint,
args.application_id,
args.application_key,
args.tenant_id,
args.endpoint.clone(),
args.application_id.clone(),
args.application_key.clone(),
args.tenant_id.clone(),
);
let client = KustoClient::try_from(kcsb).unwrap();
non_progressive(&args, &client).await;
progressive(&args, &client).await?;
to_struct(&args, &client).await?;
Ok(())
}
async fn progressive(args: &Args, client: &KustoClient) -> Result<(), Box<dyn Error>> {
println!("Querying {} with streaming client", args.query);
let stream = client
.execute_query_with_options(
args.database.clone(),
args.query.clone(),
Some(
RequestOptionsBuilder::default()
.with_results_progressive_enabled(true)
.build()
.expect("Failed to create request options"),
),
)
.into_stream()
.await?;
println!("Printing all streaming results");
pin_mut!(stream);
while let Some(table) = stream.try_next().await? {
match table {
V2QueryResult::DataSetHeader(header) => println!("header: {:#?}", header),
V2QueryResult::DataTable(table) => println!("table: {:#?}", table),
V2QueryResult::DataSetCompletion(completion) => {
println!("completion: {:#?}", completion)
}
V2QueryResult::TableHeader(header) => println!("header: {:#?}", header),
V2QueryResult::TableFragment(fragment) => println!("fragment: {:#?}", fragment),
V2QueryResult::TableProgress(progress) => println!("progress: {:#?}", progress),
V2QueryResult::TableCompletion(completion) => {
println!("completion: {:#?}", completion)
}
}
}
Ok(())
}
async fn non_progressive(args: &Args, client: &KustoClient) {
println!("Querying {} with regular client", args.query);
let response = client
@ -81,42 +134,48 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Print the primary tables
let primary_results = response.into_primary_results().collect::<Vec<_>>();
println!("primary results: {:#?}", primary_results);
}
println!("Querying {} with streaming client", args.query);
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct Item {
vnum: i32,
vdec: String, // optionally, you can use a decimal type here
vdate: KustoDateTime,
vspan: KustoDuration,
vobj: Value,
vb: bool,
vreal: f64,
vstr: String,
vlong: i64,
vguid: String, // optionally, you can use a guid type here
}
let stream = client
.execute_query_with_options(
args.database,
args.query,
Some(
RequestOptionsBuilder::default()
.with_results_progressive_enabled(true)
.build()
.expect("Failed to create request options"),
),
)
.into_stream()
.await?;
async fn to_struct(args: &Args, client: &KustoClient) -> Result<(), Box<dyn Error>> {
let query = r#"datatable(vnum:int, vdec:decimal, vdate:datetime, vspan:timespan, vobj:dynamic, vb:bool, vreal:real, vstr:string, vlong:long, vguid:guid)
[
1, decimal(2.00000000000001), datetime(2020-03-04T14:05:01.3109965Z), time(01:23:45.6789000), dynamic({
"moshe": "value"
}), true, 0.01, "asdf", 9223372036854775807, guid(74be27de-1e4e-49d9-b579-fe0b331d3642),
2, decimal(5.00000000000005), datetime(2022-05-06T16:07:03.1234300Z), time(04:56:59.9120000), dynamic({
"moshe": "value2"
}), false, 0.05, "qwerty", 9223372036854775806, guid(f6e97f76-8b73-45c0-b9ef-f68e8f897713),
3, decimal(9.9999999999999), datetime(2023-07-08T18:09:05.5678000Z), time(07:43:12.3456000), dynamic({
"moshe": "value3"
}), true, 0.99, "zxcv", 9223372036854775805, guid(d8e3575c-a7a0-47b3-8c73-9a7a6aaabc12),
]"#;
println!("Printing all streaming results");
let response = client.execute_query(args.database.clone(), query).await?;
pin_mut!(stream);
let results = response
.into_primary_results()
.next()
.ok_or_else(|| "Expected to get a primary result, but got none".to_string())?;
while let Some(table) = stream.try_next().await? {
match table {
V2QueryResult::DataSetHeader(header) => println!("header: {:#?}", header),
V2QueryResult::DataTable(table) => println!("table: {:#?}", table),
V2QueryResult::DataSetCompletion(completion) => {
println!("completion: {:#?}", completion)
}
V2QueryResult::TableHeader(header) => println!("header: {:#?}", header),
V2QueryResult::TableFragment(fragment) => println!("fragment: {:#?}", fragment),
V2QueryResult::TableProgress(progress) => println!("progress: {:#?}", progress),
V2QueryResult::TableCompletion(completion) => {
println!("completion: {:#?}", completion)
}
}
}
let rows = results.rows;
let items = serde_json::from_value::<Vec<Item>>(Value::Array(rows))?;
println!("items: {:#?}", items);
Ok(())
}

Просмотреть файл

@ -13,20 +13,21 @@ use arrow::{
record_batch::RecordBatch,
};
use azure_core::error::{ErrorKind, ResultExt};
use serde_json::Value;
use crate::error::Result;
use crate::models::ColumnType;
use crate::models::{Column, DataTable};
use crate::types::{KustoDateTime, KustoDuration};
fn convert_array_string(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let strings: Vec<Option<String>> = serde_json::from_value(serde_json::Value::Array(values))?;
fn convert_array_string(values: Vec<Value>) -> Result<ArrayRef> {
let strings: Vec<Option<String>> = serde_json::from_value(Value::Array(values))?;
let strings: Vec<Option<&str>> = strings.iter().map(Option::as_deref).collect();
Ok(Arc::new(StringArray::from(strings)))
}
fn convert_array_datetime(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let dates: Vec<String> = serde_json::from_value(serde_json::Value::Array(values))?;
fn convert_array_datetime(values: Vec<Value>) -> Result<ArrayRef> {
let dates: Vec<String> = serde_json::from_value(Value::Array(values))?;
let timestamps = dates
.into_iter()
.map(|d| {
@ -40,16 +41,16 @@ fn convert_array_datetime(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
Ok(dates_array)
}
fn safe_map_f64(value: serde_json::Value) -> Result<Option<f64>> {
fn safe_map_f64(value: Value) -> Result<Option<f64>> {
match value {
serde_json::Value::String(val) if val == "NaN" => Ok(None),
serde_json::Value::String(val) if val == "Infinity" => Ok(Some(f64::INFINITY)),
serde_json::Value::String(val) if val == "-Infinity" => Ok(Some(-f64::INFINITY)),
Value::String(val) if val == "NaN" => Ok(None),
Value::String(val) if val == "Infinity" => Ok(Some(f64::INFINITY)),
Value::String(val) if val == "-Infinity" => Ok(Some(-f64::INFINITY)),
_ => Ok(serde_json::from_value(value)?),
}
}
fn convert_array_float(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
fn convert_array_float(values: Vec<Value>) -> Result<ArrayRef> {
let reals: Vec<Option<f64>> = values
.into_iter()
.map(safe_map_f64)
@ -57,8 +58,8 @@ fn convert_array_float(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
Ok(Arc::new(Float64Array::from(reals)))
}
fn convert_array_timespan(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let strings: Vec<String> = serde_json::from_value(serde_json::Value::Array(values))?;
fn convert_array_timespan(values: Vec<Value>) -> Result<ArrayRef> {
let strings: Vec<String> = serde_json::from_value(Value::Array(values))?;
let durations: Vec<Option<i64>> = strings
.iter()
.map(|s| {
@ -70,22 +71,22 @@ fn convert_array_timespan(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
Ok(Arc::new(DurationNanosecondArray::from(durations)))
}
fn convert_array_bool(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let bools: Vec<Option<bool>> = serde_json::from_value(serde_json::Value::Array(values))?;
fn convert_array_bool(values: Vec<Value>) -> Result<ArrayRef> {
let bools: Vec<Option<bool>> = serde_json::from_value(Value::Array(values))?;
Ok(Arc::new(BooleanArray::from(bools)))
}
fn convert_array_i32(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let ints: Vec<Option<i32>> = serde_json::from_value(serde_json::Value::Array(values))?;
fn convert_array_i32(values: Vec<Value>) -> Result<ArrayRef> {
let ints: Vec<Option<i32>> = serde_json::from_value(Value::Array(values))?;
Ok(Arc::new(Int32Array::from(ints)))
}
fn convert_array_i64(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let ints: Vec<Option<i64>> = serde_json::from_value(serde_json::Value::Array(values))?;
fn convert_array_i64(values: Vec<Value>) -> Result<ArrayRef> {
let ints: Vec<Option<i64>> = serde_json::from_value(Value::Array(values))?;
Ok(Arc::new(Int64Array::from(ints)))
}
pub fn convert_column(data: Vec<serde_json::Value>, column: &Column) -> Result<(Field, ArrayRef)> {
pub fn convert_column(data: Vec<Value>, column: &Column) -> Result<(Field, ArrayRef)> {
let column_name = &column.column_name;
match column.column_type {
ColumnType::String => convert_array_string(data)
@ -119,17 +120,20 @@ pub fn convert_column(data: Vec<serde_json::Value>, column: &Column) -> Result<(
}
pub fn convert_table(table: DataTable) -> Result<RecordBatch> {
let mut buffer: Vec<Vec<serde_json::Value>> = Vec::with_capacity(table.columns.len());
let mut buffer: Vec<Vec<Value>> = Vec::with_capacity(table.columns.len());
let mut fields: Vec<Field> = Vec::with_capacity(table.columns.len());
let mut columns: Vec<ArrayRef> = Vec::with_capacity(table.columns.len());
for _ in 0..table.columns.len() {
buffer.push(Vec::with_capacity(table.rows.len()));
}
table.rows.into_iter().for_each(|row| {
row.into_iter()
.enumerate()
.for_each(|(idx, value)| buffer[idx].push(value))
table.rows.into_iter().for_each(|row| match row {
Value::Array(v) => {
v.into_iter().enumerate().for_each(|(i, v)| {
buffer[i].push(v);
});
}
_ => unreachable!("Must be an array"),
});
buffer

Просмотреть файл

@ -2,13 +2,14 @@
use crate::authorization_policy::AuthorizationPolicy;
use crate::connection_string::ConnectionString;
use crate::error::Result;
use crate::error::{Error, Result};
use crate::operations::query::{QueryRunner, QueryRunnerBuilder, V1QueryRunner, V2QueryRunner};
use azure_core::auth::TokenCredential;
use azure_core::{ClientOptions, Context, Pipeline};
use crate::request_options::RequestOptions;
use serde::de::DeserializeOwned;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;
@ -211,6 +212,52 @@ impl KustoClient {
V2QueryRunner(self.execute_with_options(database, query, QueryKind::Query, None))
}
/// Execute a KQL query into an array of structs.
/// To learn more about KQL go to [https://docs.microsoft.com/en-us/azure/kusto/query/](https://docs.microsoft.com/en-us/azure/kusto/query)
///
/// This method is the simplest way to just convert your data into a struct.
/// It assumes there is one primary result table.
///
/// Your struct should implement the [serde::DeserializeOwned](https://docs.serde.rs/serde/trait.DeserializeOwned.html) trait.
///
/// # Example
/// ```no_run
/// use azure_kusto_data::prelude::*;
/// use serde::Deserialize;
///
/// #[derive(serde::Deserialize, Debug)]
/// struct MyStruct {
/// name: String,
/// age: u32,
/// }
///
/// # #[tokio::main] async fn main() -> Result<(), Error> {
/// let client = KustoClient::new(
/// ConnectionString::with_default_auth("https://mycluster.region.kusto.windows.net/"),
/// KustoClientOptions::default())?;
///
/// let result: Vec<MyStruct> = client.execute_query_to_struct("some_database", "MyTable | take 10").await?;
/// println!("{:?}", result); // prints [MyStruct { name: "foo", age: 42 }, MyStruct { name: "bar", age: 43 }]
///
/// # Ok(())}
/// ```
pub async fn execute_query_to_struct<T: DeserializeOwned>(
&self,
database: impl Into<String>,
query: impl Into<String>,
) -> Result<Vec<T>> {
let response = self.execute_query(database, query).await?;
let results = response
.into_primary_results()
.next()
.ok_or_else(|| Error::QueryError("No primary results found".into()))?;
Ok(serde_json::from_value::<Vec<T>>(serde_json::Value::Array(
results.rows,
))?)
}
/// Execute a management command with additional options.
/// To learn more about see [commands](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/)
///
@ -272,7 +319,7 @@ impl KustoClient {
}
impl TryFrom<ConnectionString> for KustoClient {
type Error = crate::error::Error;
type Error = Error;
fn try_from(value: ConnectionString) -> Result<Self> {
Self::new(value, KustoClientOptions::new())

Просмотреть файл

@ -38,6 +38,10 @@ pub enum Error {
/// Errors raised when the operation is not supported
#[error("Operation not supported: {0}")]
UnsupportedOperation(String),
/// Errors raised when the query is invalid
#[error("Invalid query: {0}")]
QueryError(String),
}
/// Errors raised when an invalid argument or option is provided.

Просмотреть файл

@ -143,7 +143,7 @@ pub struct DataTable {
/// Columns in the table.
pub columns: Vec<Column>,
/// Rows in the table. Each row is a list of values, corresponding to the columns in the table.
pub rows: Vec<Vec<serde_json::Value>>,
pub rows: Vec<serde_json::Value>,
}
/// A header of a fragment of a table (in progressive mode).
@ -180,7 +180,7 @@ pub struct TableFragment {
/// The type of the fragment, instructs to how to use it.
pub table_fragment_type: TableFragmentType,
/// Rows in the table. Each row is a list of values, corresponding to the columns in the TableHeader.
pub rows: Vec<Vec<serde_json::Value>>,
pub rows: Vec<serde_json::Value>,
}
/// Progress report for a table (in progressive mode).

Просмотреть файл

@ -425,7 +425,7 @@ impl KustoResponseDataSetV2 {
/// table_name: "table_1".to_string(),
/// table_kind: TableKind::PrimaryResult,
/// columns: vec![Column{column_name: "col1".to_string(), column_type: ColumnType::Long}],
/// rows: vec![vec![Value::from(3u64)]],
/// rows: vec![Value::Array(vec![Value::from(3u64)])],
/// }),
/// V2QueryResult::TableHeader(TableHeader {
/// table_id: 1,
@ -435,7 +435,7 @@ impl KustoResponseDataSetV2 {
/// }),
/// V2QueryResult::TableFragment(TableFragment {
/// table_id: 1,
/// rows: vec![vec![Value::from("first")], vec![Value::from("second")]],
/// rows: vec![Value::Array(vec![Value::from("first")]), Value::Array(vec![Value::from("second")])],
/// field_count: Some(1),
/// table_fragment_type: TableFragmentType::DataAppend,
/// }),
@ -530,16 +530,6 @@ impl TryFrom<HttpResponse> for KustoResponseDataSetV1 {
}
}
// TODO enable once in stable
// #[cfg(feature = "into_future")]
// impl std::future::IntoFuture for ExecuteQueryBuilder {
// type IntoFuture = ExecuteQuery;
// type Output = <ExecuteQuery as std::future::Future>::Output;
// fn into_future(self) -> Self::IntoFuture {
// Self::into_future(self)
// }
// }
pub fn prepare_request(url: Url, http_method: Method) -> Request {
const API_VERSION: &str = "2019-02-13";

Просмотреть файл

@ -1,10 +1,45 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::str::FromStr;
use time::Duration;
use azure_kusto_data::types::{KustoDateTime, KustoDuration};
use decimal::d128;
use uuid::Uuid;
mod setup;
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct Item {
vnum: i32,
vdec: d128,
vdate: KustoDateTime,
vspan: KustoDuration,
vobj: Value,
vb: bool,
vreal: f64,
vstr: String,
vlong: i64,
vguid: Uuid,
}
#[tokio::test]
async fn create_query_delete_table() {
let (client, database) = setup::create_kusto_client();
let query = ".set KustoRsTest <| let text=\"Hello, World!\"; print str=text";
let query = r#".set-or-replace KustoRsTest <| datatable(vnum:int, vdec:decimal, vdate:datetime, vspan:timespan, vobj:dynamic, vb:bool, vreal:real, vstr:string, vlong:long, vguid:guid)
[
1, decimal(2.00000000000001), datetime(2020-03-04T14:05:01.3109965Z), time(01:23:45.6789000), dynamic({
"moshe": "value"
}), true, 0.01, "asdf", 9223372036854775807, guid(74be27de-1e4e-49d9-b579-fe0b331d3642),
2, decimal(5.00000000000005), datetime(2022-05-06T16:07:03.1234300Z), time(04:56:59.9120000), dynamic({
"moshe": "value2"
}), false, 0.05, "qwerty", 9223372036854775806, guid(f6e97f76-8b73-45c0-b9ef-f68e8f897713),
3, decimal(9.9999999999999), datetime(2023-07-08T18:09:05.5678000Z), time(07:43:12.3456000), dynamic({
"moshe": "value3"
}), true, 0.99, "zxcv", 9223372036854775805, guid(d8e3575c-a7a0-47b3-8c73-9a7a6aaabc12),
]
"#;
let response = client
.execute_command(database.clone(), query)
.await
@ -20,14 +55,74 @@ async fn create_query_delete_table() {
assert_eq!(response.table_count(), 4);
let query = "KustoRsTest | take 1";
let query = "KustoRsTest";
let response = client
.execute_query(database.clone(), query)
.await
.expect("Failed to run query");
let results = response.into_primary_results().collect::<Vec<_>>();
assert_eq!(results[0].rows.len(), 1);
let results = response.into_primary_results().next().expect("No results");
let rows = results.rows;
let expected = vec![
Item {
vnum: 1,
vdec: d128!(2.00000000000001),
vdate: KustoDateTime::from_str("2020-03-04T14:05:01.3109965Z").unwrap(),
vspan: KustoDuration::from(
Duration::seconds(3600 + 23 * 60 + 45) + Duration::microseconds(678900),
),
vobj: Value::Object(serde_json::Map::from_iter(vec![(
"moshe".to_string(),
Value::String("value".to_string()),
)])),
vb: true,
vreal: 0.01,
vstr: "asdf".to_string(),
vlong: 9223372036854775807,
vguid: Uuid::parse_str("74be27de-1e4e-49d9-b579-fe0b331d3642").unwrap(),
},
Item {
vnum: 2,
vdec: d128!(5.00000000000005),
vdate: KustoDateTime::from_str("2022-05-06T16:07:03.1234300Z").unwrap(),
vspan: KustoDuration::from(
Duration::seconds(4 * 3600 + 56 * 60 + 59) + Duration::microseconds(912000),
),
vobj: Value::Object(serde_json::Map::from_iter(vec![(
"moshe".to_string(),
Value::String("value2".to_string()),
)])),
vb: false,
vreal: 0.05,
vstr: "qwerty".to_string(),
vlong: 9223372036854775806,
vguid: Uuid::parse_str("f6e97f76-8b73-45c0-b9ef-f68e8f897713").unwrap(),
},
Item {
vnum: 3,
vdec: d128!(9.9999999999999),
vdate: KustoDateTime::from_str("2023-07-08T18:09:05.5678000Z").unwrap(),
vspan: KustoDuration::from(
Duration::seconds(7 * 3600 + 43 * 60 + 12) + Duration::microseconds(345600),
),
vobj: Value::Object(serde_json::Map::from_iter(vec![(
"moshe".to_string(),
Value::String("value3".to_string()),
)])),
vb: true,
vreal: 0.99,
vstr: "zxcv".to_string(),
vlong: 9223372036854775805,
vguid: Uuid::parse_str("d8e3575c-a7a0-47b3-8c73-9a7a6aaabc12").unwrap(),
},
];
let items =
serde_json::from_value::<Vec<Item>>(Value::Array(rows)).expect("Failed to deserialize");
assert_eq!(items, expected);
let query = ".drop table KustoRsTest | where TableName == \"KustoRsTest\"";
let response = client