Made data model easier to parse, and added a convenience method.
This commit is contained in:
Родитель
b5667bcc7b
Коммит
60b951544a
|
@ -172,11 +172,7 @@ async fn to_struct(args: &Args, client: &KustoClient) -> Result<(), Box<dyn Erro
|
|||
|
||||
let results = response.into_primary_results().next()?;
|
||||
|
||||
let rows = results
|
||||
.rows
|
||||
.into_iter()
|
||||
.map(Value::Array)
|
||||
.collect::<Vec<Value>>();
|
||||
let rows = results.rows;
|
||||
|
||||
let items = serde_json::from_value::<Vec<Item>>(Value::Array(rows))?;
|
||||
|
||||
|
|
|
@ -13,20 +13,21 @@ use arrow::{
|
|||
record_batch::RecordBatch,
|
||||
};
|
||||
use azure_core::error::{ErrorKind, ResultExt};
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::models::ColumnType;
|
||||
use crate::models::{Column, DataTable};
|
||||
use crate::types::{KustoDateTime, KustoDuration};
|
||||
|
||||
fn convert_array_string(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let strings: Vec<Option<String>> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_string(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let strings: Vec<Option<String>> = serde_json::from_value(Value::Array(values))?;
|
||||
let strings: Vec<Option<&str>> = strings.iter().map(Option::as_deref).collect();
|
||||
Ok(Arc::new(StringArray::from(strings)))
|
||||
}
|
||||
|
||||
fn convert_array_datetime(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let dates: Vec<String> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_datetime(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let dates: Vec<String> = serde_json::from_value(Value::Array(values))?;
|
||||
let timestamps = dates
|
||||
.into_iter()
|
||||
.map(|d| {
|
||||
|
@ -40,16 +41,16 @@ fn convert_array_datetime(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
|||
Ok(dates_array)
|
||||
}
|
||||
|
||||
fn safe_map_f64(value: serde_json::Value) -> Result<Option<f64>> {
|
||||
fn safe_map_f64(value: Value) -> Result<Option<f64>> {
|
||||
match value {
|
||||
serde_json::Value::String(val) if val == "NaN" => Ok(None),
|
||||
serde_json::Value::String(val) if val == "Infinity" => Ok(Some(f64::INFINITY)),
|
||||
serde_json::Value::String(val) if val == "-Infinity" => Ok(Some(-f64::INFINITY)),
|
||||
Value::String(val) if val == "NaN" => Ok(None),
|
||||
Value::String(val) if val == "Infinity" => Ok(Some(f64::INFINITY)),
|
||||
Value::String(val) if val == "-Infinity" => Ok(Some(-f64::INFINITY)),
|
||||
_ => Ok(serde_json::from_value(value)?),
|
||||
}
|
||||
}
|
||||
|
||||
fn convert_array_float(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
fn convert_array_float(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let reals: Vec<Option<f64>> = values
|
||||
.into_iter()
|
||||
.map(safe_map_f64)
|
||||
|
@ -57,8 +58,8 @@ fn convert_array_float(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
|||
Ok(Arc::new(Float64Array::from(reals)))
|
||||
}
|
||||
|
||||
fn convert_array_timespan(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let strings: Vec<String> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_timespan(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let strings: Vec<String> = serde_json::from_value(Value::Array(values))?;
|
||||
let durations: Vec<Option<i64>> = strings
|
||||
.iter()
|
||||
.map(|s| {
|
||||
|
@ -70,22 +71,22 @@ fn convert_array_timespan(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
|||
Ok(Arc::new(DurationNanosecondArray::from(durations)))
|
||||
}
|
||||
|
||||
fn convert_array_bool(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let bools: Vec<Option<bool>> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_bool(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let bools: Vec<Option<bool>> = serde_json::from_value(Value::Array(values))?;
|
||||
Ok(Arc::new(BooleanArray::from(bools)))
|
||||
}
|
||||
|
||||
fn convert_array_i32(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let ints: Vec<Option<i32>> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_i32(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let ints: Vec<Option<i32>> = serde_json::from_value(Value::Array(values))?;
|
||||
Ok(Arc::new(Int32Array::from(ints)))
|
||||
}
|
||||
|
||||
fn convert_array_i64(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
|
||||
let ints: Vec<Option<i64>> = serde_json::from_value(serde_json::Value::Array(values))?;
|
||||
fn convert_array_i64(values: Vec<Value>) -> Result<ArrayRef> {
|
||||
let ints: Vec<Option<i64>> = serde_json::from_value(Value::Array(values))?;
|
||||
Ok(Arc::new(Int64Array::from(ints)))
|
||||
}
|
||||
|
||||
pub fn convert_column(data: Vec<serde_json::Value>, column: &Column) -> Result<(Field, ArrayRef)> {
|
||||
pub fn convert_column(data: Vec<Value>, column: &Column) -> Result<(Field, ArrayRef)> {
|
||||
let column_name = &column.column_name;
|
||||
match column.column_type {
|
||||
ColumnType::String => convert_array_string(data)
|
||||
|
@ -119,17 +120,20 @@ pub fn convert_column(data: Vec<serde_json::Value>, column: &Column) -> Result<(
|
|||
}
|
||||
|
||||
pub fn convert_table(table: DataTable) -> Result<RecordBatch> {
|
||||
let mut buffer: Vec<Vec<serde_json::Value>> = Vec::with_capacity(table.columns.len());
|
||||
let mut buffer: Vec<Vec<Value>> = Vec::with_capacity(table.columns.len());
|
||||
let mut fields: Vec<Field> = Vec::with_capacity(table.columns.len());
|
||||
let mut columns: Vec<ArrayRef> = Vec::with_capacity(table.columns.len());
|
||||
|
||||
for _ in 0..table.columns.len() {
|
||||
buffer.push(Vec::with_capacity(table.rows.len()));
|
||||
}
|
||||
table.rows.into_iter().for_each(|row| {
|
||||
row.into_iter()
|
||||
.enumerate()
|
||||
.for_each(|(idx, value)| buffer[idx].push(value))
|
||||
table.rows.into_iter().for_each(|row| match row {
|
||||
Value::Array(v) => {
|
||||
v.into_iter().enumerate().for_each(|(i, v)| {
|
||||
buffer[i].push(v);
|
||||
});
|
||||
}
|
||||
_ => panic!("Must be an array"),
|
||||
});
|
||||
|
||||
buffer
|
||||
|
|
|
@ -2,13 +2,14 @@
|
|||
|
||||
use crate::authorization_policy::AuthorizationPolicy;
|
||||
use crate::connection_string::ConnectionString;
|
||||
use crate::error::Result;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::operations::query::{QueryRunner, QueryRunnerBuilder, V1QueryRunner, V2QueryRunner};
|
||||
use azure_core::auth::TokenCredential;
|
||||
|
||||
use azure_core::{ClientOptions, Context, Pipeline};
|
||||
|
||||
use crate::request_options::RequestOptions;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
@ -214,6 +215,53 @@ impl KustoClient {
|
|||
V2QueryRunner(self.execute_with_options(database, query, QueryKind::Query, None))
|
||||
}
|
||||
|
||||
/// Execute a KQL query into an array of structs.
|
||||
/// To learn more about KQL go to [https://docs.microsoft.com/en-us/azure/kusto/query/](https://docs.microsoft.com/en-us/azure/kusto/query)
|
||||
///
|
||||
/// This method is the simplest way to just convert your data into a struct.
|
||||
/// It assumes there is one primary result table.
|
||||
///
|
||||
/// Your struct should implement the [serde::DeserializeOwned](https://docs.serde.rs/serde/trait.DeserializeOwned.html) trait.
|
||||
///
|
||||
/// # Example
|
||||
/// ```no_run
|
||||
/// use azure_kusto_data::prelude::*;
|
||||
/// use serde::Deserialize;
|
||||
///
|
||||
/// #[derive(serde::Deserialize, Debug)]
|
||||
/// struct MyStruct {
|
||||
/// name: String,
|
||||
/// age: u32,
|
||||
/// }
|
||||
///
|
||||
/// # #[tokio::main] async fn main() -> Result<(), Error> {
|
||||
/// let client = KustoClient::new(
|
||||
/// ConnectionString::with_default_auth("https://mycluster.region.kusto.windows.net/"),
|
||||
/// KustoClientOptions::default())?;
|
||||
///
|
||||
/// // Once the [IntoFuture] trait is stabilized, we can drop the call the `into_future()` here
|
||||
/// let result = client.execute_query_to_struct("some_database", "MyTable | take 10").await?;
|
||||
/// println!("{:?}", result); // prints [MyStruct { name: "foo", age: 42 }, MyStruct { name: "bar", age: 43 }]
|
||||
///
|
||||
/// # Ok(())}
|
||||
/// ```
|
||||
pub async fn execute_query_to_struct<T: DeserializeOwned>(
|
||||
&self,
|
||||
database: impl Into<String>,
|
||||
query: impl Into<String>,
|
||||
) -> Result<Vec<T>> {
|
||||
let response = self.execute_query(database, query).into_future().await?;
|
||||
|
||||
let results = response
|
||||
.into_primary_results()
|
||||
.next()
|
||||
.ok_or_else(|| Error::QueryError("No primary results found".into()))?;
|
||||
|
||||
Ok(serde_json::from_value::<Vec<T>>(serde_json::Value::Array(
|
||||
results.rows,
|
||||
))?)
|
||||
}
|
||||
|
||||
/// Execute a management command with additional options.
|
||||
/// To learn more about see [commands](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/management/)
|
||||
///
|
||||
|
@ -277,7 +325,7 @@ impl KustoClient {
|
|||
}
|
||||
|
||||
impl TryFrom<ConnectionString> for KustoClient {
|
||||
type Error = crate::error::Error;
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: ConnectionString) -> Result<Self> {
|
||||
Self::new(value, KustoClientOptions::new())
|
||||
|
|
|
@ -38,6 +38,10 @@ pub enum Error {
|
|||
/// Errors raised when the operation is not supported
|
||||
#[error("Operation not supported: {0}")]
|
||||
UnsupportedOperation(String),
|
||||
|
||||
/// Errors raised when the query is invalid
|
||||
#[error("Invalid query: {0}")]
|
||||
QueryError(String),
|
||||
}
|
||||
|
||||
/// Errors raised when an invalid argument or option is provided.
|
||||
|
|
|
@ -143,7 +143,7 @@ pub struct DataTable {
|
|||
/// Columns in the table.
|
||||
pub columns: Vec<Column>,
|
||||
/// Rows in the table. Each row is a list of values, corresponding to the columns in the table.
|
||||
pub rows: Vec<Vec<serde_json::Value>>,
|
||||
pub rows: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// A header of a fragment of a table (in progressive mode).
|
||||
|
@ -180,7 +180,7 @@ pub struct TableFragment {
|
|||
/// The type of the fragment, instructs to how to use it.
|
||||
pub table_fragment_type: TableFragmentType,
|
||||
/// Rows in the table. Each row is a list of values, corresponding to the columns in the TableHeader.
|
||||
pub rows: Vec<Vec<serde_json::Value>>,
|
||||
pub rows: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
/// Progress report for a table (in progressive mode).
|
||||
|
|
|
@ -66,11 +66,7 @@ async fn create_query_delete_table() {
|
|||
|
||||
let results = response.into_primary_results().next().expect("No results");
|
||||
|
||||
let rows = results
|
||||
.rows
|
||||
.into_iter()
|
||||
.map(Value::Array)
|
||||
.collect::<Vec<Value>>();
|
||||
let rows = results.rows;
|
||||
|
||||
let expected = vec![
|
||||
Item {
|
||||
|
|
Загрузка…
Ссылка в новой задаче