This commit is contained in:
AsafMah 2022-03-28 17:25:12 +03:00
Родитель b09a75103c
Коммит 2eec8ef53a
6 изменённых файлов: 40 добавлений и 7 удалений

Просмотреть файл

@ -12,6 +12,7 @@ keywords = ["sdk", "azure", "kusto", "azure-data-explorer"]
categories = ["api-bindings"]
[dependencies]
arrow = { version = "9", optional = true }
azure_core = { git = "https://github.com/roeap/azure-sdk-for-rust", branch="kusto" , features = [
"enable_reqwest",
"enable_reqwest_gzip",
@ -33,6 +34,7 @@ env_logger = "0.9"
tokio = { version = "1", features = ["macros"] }
[features]
default = ["arrow"]
mock_transport_framework = ["azure_core/mock_transport_framework"]
#into_future = [] TODO - properly turn it on
test_e2e = []

Просмотреть файл

@ -37,7 +37,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
.await
.unwrap();
for table in response.tables {
for table in &response.tables {
match table {
ResultTable::DataSetHeader(header) => println!("header: {:?}", header),
ResultTable::DataTable(table) => println!("table: {:?}", table),
@ -45,5 +45,8 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
}
}
let primary_results = response.into_primary_results().collect::<Vec<_>>();
println!("primary results: {:?}", primary_results);
Ok(())
}

Просмотреть файл

@ -39,3 +39,9 @@ impl From<azure_core::error::Error> for Error {
Self::AzureError(err.into())
}
}
impl From<azure_core::StreamError> for Error {
fn from(error: azure_core::StreamError) -> Self {
Self::AzureError(azure_core::Error::Stream(error))
}
}

Просмотреть файл

@ -1,3 +1,5 @@
#[cfg(feature = "arrow")]
mod arrow;
pub mod authorization_policy;
pub mod client;
pub mod connection_string;

Просмотреть файл

@ -1,5 +1,9 @@
#[cfg(feature = "arrow")]
use crate::arrow::convert_table;
use crate::client::KustoClient;
use async_convert::{async_trait, TryFrom};
#[cfg(feature = "arrow")]
use arrow::record_batch::RecordBatch;
use async_convert::TryFrom;
use azure_core::prelude::*;
use azure_core::setters;
use azure_core::{collect_pinned_stream, Response as HttpResponse};
@ -105,14 +109,14 @@ pub struct KustoResponseDataSetV2 {
pub tables: Vec<ResultTable>,
}
#[async_trait]
impl TryFrom<HttpResponse> for KustoResponseDataSetV2 {
#[async_convert::async_trait]
impl async_convert::TryFrom<HttpResponse> for KustoResponseDataSetV2 {
type Error = crate::error::Error;
async fn try_from(response: HttpResponse) -> Result<Self, crate::error::Error> {
let (_status_code, _header_map, pinned_stream) = response.deconstruct();
let data = collect_pinned_stream(pinned_stream).await.unwrap();
let tables: Vec<ResultTable> = serde_json::from_slice(&data.to_vec()).unwrap();
let data = collect_pinned_stream(pinned_stream).await?;
let tables: Vec<ResultTable> = serde_json::from_slice(&data.to_vec())?;
Ok(Self { tables })
}
}
@ -121,6 +125,22 @@ impl KustoResponseDataSetV2 {
pub fn table_count(&self) -> usize {
self.tables.len()
}
/// Consumes the response into an iterator over all PrimaryResult tables within the response dataset
pub fn into_primary_results(self) -> impl Iterator<Item = DataTable> {
self.tables
.into_iter()
.filter(|t| matches!(t, ResultTable::DataTable(tbl) if tbl.table_kind == TableKind::PrimaryResult))
.map(|t| match t {
ResultTable::DataTable(tbl) => tbl,
_ => unreachable!("All other variants are excluded by filter"),
})
}
#[cfg(feature = "arrow")]
pub fn into_record_batches(self) -> impl Iterator<Item = RecordBatch> {
self.into_primary_results().map(convert_table)
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]

Просмотреть файл

@ -13,4 +13,4 @@
pub use crate::client::{KustoClient, KustoClientOptions};
pub use crate::connection_string::ConnectionStringBuilder;
pub use crate::operations::query::ResultTable;
pub use crate::operations::query::{KustoResponseDataSetV2, ResultTable};