This commit is contained in:
asafmahlev 2024-01-04 14:31:26 +02:00
Родитель 71aaff7cd3
Коммит 5b3e4e0492
7 изменённых файлов: 276 добавлений и 50 удалений

Просмотреть файл

@ -2,20 +2,25 @@
use crate::authorization_policy::AuthorizationPolicy;
use crate::connection_string::{ConnectionString, ConnectionStringAuth};
use crate::error::{Error, Result};
use crate::error::{Error, ParseError, Partial, Result};
use crate::operations::query::{QueryRunner, QueryRunnerBuilder, V1QueryRunner, V2QueryRunner};
use azure_core::{ClientOptions, Pipeline};
use azure_core::{ClientOptions, Context, CustomHeaders, Method, Pipeline, Request, Response, ResponseBody};
use crate::client_details::ClientDetails;
use crate::models::v2::Row;
use crate::prelude::ClientRequestProperties;
use crate::prelude::{ClientRequestProperties, ClientRequestPropertiesBuilder, OptionsBuilder};
use azure_core::headers::Headers;
use azure_core::prelude::{Accept, AcceptEncoding, ClientVersion, ContentType};
use serde::de::DeserializeOwned;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;
use futures::TryStreamExt;
use crate::operations;
use crate::operations::v2::{FullDataset, IterativeDataset};
use crate::query::QueryBody;
use crate::request_options::Options;
/// Options for specifying how a Kusto client will behave
#[derive(Clone, Default)]
@ -285,6 +290,76 @@ impl KustoClient {
) -> V1QueryRunner {
V1QueryRunner(self.execute_with_options(database, query, QueryKind::Management, options))
}
#[must_use]
pub async fn query(&self, database: impl Into<String>, query: impl Into<String>, options: impl Into<Option<ClientRequestProperties>>) -> Partial<FullDataset> {
let body = self.execute(QueryKind::Query, database.into(), query.into(), options.into()).await.map_err(|e| (None, e))?;
FullDataset::from_async_buf_read(body.into_stream().map_err(|e| std::io::Error::other(e)).into_async_read()).await
}
#[must_use]
pub async fn iterative_query(&self, database: impl Into<String>, query: impl Into<String>, options: impl Into<Option<ClientRequestProperties>>) -> Result<Arc<IterativeDataset>> {
let iterative_options = ClientRequestPropertiesBuilder::default().with_options(
OptionsBuilder::default()
.with_results_v2_newlines_between_frames(true)
.with_results_v2_fragment_primary_tables(true)
.with_error_reporting_placement("end_of_table").build().expect("Failed to build options"))
.build()
.expect("Failed to build options");
//TODO merge options
let body = self.execute(QueryKind::Query, database.into(), query.into(), iterative_options.into()).await?;
Ok(IterativeDataset::from_async_buf_read(body.into_stream().map_err(|e| std::io::Error::other(e)).into_async_read()).await)
}
async fn execute(&self, kind: QueryKind, database: String, query: String, options: Option<ClientRequestProperties>) -> Result<ResponseBody> {
let url = match kind {
QueryKind::Management => self.management_url(),
QueryKind::Query => self.query_url(),
};
let mut context = Context::new();
let mut request = Request::new(url.parse().map_err(ParseError::from)?, Method::Post);
let mut headers = (*self.default_headers).clone();
if let Some(client_request_properties) = &options {
if let Some(client_request_id) = &client_request_properties.client_request_id {
headers.insert("x-ms-client-request-id", client_request_id);
}
if let Some(application) = &client_request_properties.application {
headers.insert("x-ms-app", application);
}
}
context.insert(CustomHeaders::from(headers));
let body = QueryBody {
db: database,
csl: query,
properties: options,
};
let bytes = bytes::Bytes::from(serde_json::to_string(&body)?);
request.set_body(bytes);
let response = self
.pipeline()
.send(&mut context, &mut request)
.await?;
let status = response.status();
if !status.is_success() {
let body = response.into_body().collect_string().await;
return Err(Error::HttpError(status, body.unwrap_or_else(|e| format!("{:?}", e))));
}
Ok(response.into_body())
}
}
impl TryFrom<ConnectionString> for KustoClient {

Просмотреть файл

@ -1,6 +1,7 @@
//! Defines [Error] for representing failures in various operations.
use azure_core::StatusCode;
use std::fmt::Debug;
use oauth2::url;
use crate::models::v2::OneApiError;
use thiserror;
@ -70,6 +71,16 @@ impl From<Vec<Error>> for Error {
}
}
impl From<Vec<OneApiError>> for Error {
fn from(errors: Vec<OneApiError>) -> Self {
if errors.len() == 1 {
Error::from(errors.into_iter().next().map(Error::QueryApiError).expect("Should be one"))
} else {
Error::MultipleErrors(errors.into_iter().map(Error::QueryApiError).collect())
}
}
}
/// Errors raised when parsing values.
#[derive(thiserror::Error, Debug)]
pub enum ParseError {
@ -100,6 +111,9 @@ pub enum ParseError {
/// Raised when a dynamic value is failed to be parsed.
#[error("Error parsing dynamic: {0}")]
Dynamic(#[from] serde_json::Error),
#[error("Error parsing url: {0}")]
Url(#[from] url::ParseError),
}
/// Errors raised when parsing connection strings.

Просмотреть файл

@ -564,7 +564,7 @@ fn expected_v2_partial_error_full_dataset() -> Vec<Frame> {
serde_json::Value::String("Visualization".to_string()),
serde_json::Value::String("{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}".to_string()),
]),
Row::Error((OneApiErrors {
Row::Error(OneApiErrors {
errors: vec![OneApiError {
error_message: crate::models::v2::ErrorMessage {
code: "LimitsExceeded".to_string(),
@ -588,7 +588,7 @@ fn expected_v2_partial_error_full_dataset() -> Vec<Frame> {
is_permanent: false,
},
}]
})),
}),
],
}),
Frame::DataSetCompletion(DataSetCompletion {

Просмотреть файл

@ -57,7 +57,7 @@ impl Into<Result<Vec<serde_json::Value>, Error>> for Row {
fn into(self) -> Result<Vec<serde_json::Value>, Error> {
match self {
Row::Values(v) => Ok(v),
Row::Error(e) => Err(e.errors.into_iter().map(Error::QueryApiError).collect::<Vec<_>>().into()),
Row::Error(e) => Err(e.errors.into()),
}
}
}

Просмотреть файл

@ -1,2 +1,2 @@
pub mod query;
mod v2;
pub(crate) mod v2;

Просмотреть файл

@ -1,14 +1,14 @@
use std::sync::Arc;
use crate::error::{Error::JsonError, Result};
use crate::error::{Error, Error::JsonError, Partial, Result};
use crate::models::v2;
use futures::{stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream, StreamExt, TryStreamExt, pin_mut};
use futures::lock::Mutex;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::models::v2::{DataTable, Frame, QueryCompletionInformation, QueryProperties, TableKind};
use crate::models::v2::{DataSetCompletion, DataSetHeader, DataTable, Frame, QueryCompletionInformation, QueryProperties, Row, TableKind};
pub fn parse_frames_iterative(
reader: impl AsyncBufRead + Unpin,
) -> impl Stream<Item = Result<v2::Frame>> {
) -> impl Stream<Item=Result<Frame>> {
let buf = Vec::with_capacity(4096);
stream::unfold((reader, buf), |(mut reader, mut buf)| async move {
buf.clear();
@ -30,9 +30,9 @@ pub fn parse_frames_iterative(
})
}
pub async fn parse_frames_full(
async fn parse_frames_full(
mut reader: (impl AsyncBufRead + Send + Unpin),
) -> Result<Vec<v2::Frame>> {
) -> Result<Vec<Frame>> {
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await?;
return Ok(serde_json::from_slice(&buf)?);
@ -44,18 +44,32 @@ type M<T> = Arc<Mutex<T>>;
/// Arc Mutex Option
type OM<T> = M<Option<T>>;
struct StreamingDataset {
header : OM<v2::DataSetHeader>,
completion : OM<v2::DataSetCompletion>,
query_properties : OM<Vec<QueryProperties>>,
query_completion_information : OM<Vec<QueryCompletionInformation>>,
results : Receiver<DataTable>
pub struct IterativeDataset {
pub header: OM<v2::DataSetHeader>,
pub completion: OM<v2::DataSetCompletion>,
pub query_properties: OM<Vec<QueryProperties>>,
pub query_completion_information: OM<Vec<QueryCompletionInformation>>,
pub results: Receiver<Result<IterativeTable>>,
}
impl StreamingDataset {
struct IterativeTable {
pub table_id: i32,
pub table_name: String,
pub table_kind: TableKind,
pub columns: Vec<v2::Column>,
pub rows: Receiver<Result<Vec<Row>>>,
}
impl IterativeDataset {
pub fn from_async_buf_read(stream: impl AsyncBufRead + Send + Unpin + 'static) -> Arc<Self> {
let stream = parse_frames_iterative(stream).map_err(|e| (None, e))?;
Self::new(stream)
}
fn new(stream: impl Stream<Item=Result<Frame>> + Send + 'static) -> Arc<Self> {
// TODO: make channel size configurable
let (tx, rx) = tokio::sync::mpsc::channel(1);
let res = StreamingDataset {
let res = IterativeDataset {
header: Arc::new(Mutex::new(None)),
completion: Arc::new(Mutex::new(None)),
query_properties: Arc::new(Mutex::new(None)),
@ -72,9 +86,106 @@ impl StreamingDataset {
res
}
async fn populate_with_stream(&self, stream: impl Stream<Item = Result<Frame>>, tx: Sender<DataTable>) {
async fn populate_with_stream(&self, stream: impl Stream<Item=Result<Frame>>, tx: Sender<Result<IterativeTable>>) {
pin_mut!(stream);
let mut rows_tx = None;
while let Some(frame) = stream.try_next().await.transpose() {
// TODO: handle errors
let Ok(frame) = frame else {
tx.send(Err(Error::ExternalError("Failed to parse frames".to_string()))).await.expect("failed to send error");
continue;
};
match frame {
Frame::DataSetHeader(header) => {
self.header.lock().await.replace(header);
}
Frame::DataSetCompletion(completion) => {
if let Some(errs) = &completion.one_api_errors {
// todo - better error than crashing when failing to send
tx.send(Err(errs.clone().into())).await.expect("failed to send error");
}
self.completion.lock().await.replace(completion);
}
// TODO: properly handle errors/missing
Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => {
self.query_properties.lock().await.replace(table.deserialize_values::<QueryProperties>().expect("failed to deserialize query properties"));
}
Frame::DataTable(table) if table.table_kind == TableKind::QueryCompletionInformation => {
self.query_completion_information.lock().await.replace(table.deserialize_values::<QueryCompletionInformation>().expect("failed to deserialize query completion information"));
}
Frame::DataTable(table) => {
let (datatable_tx, datatable_rx) = tokio::sync::mpsc::channel(1);
tx.send(Ok(IterativeTable {
table_id: table.table_id,
table_name: table.table_name,
table_kind: table.table_kind,
columns: table.columns,
rows: datatable_rx,
})).await.expect("failed to send table");
datatable_tx.send(Ok(table.rows)).await.expect("failed to send rows");
}
Frame::TableHeader(table_header) => {
let (rows_tx_, rows_rx) = tokio::sync::mpsc::channel(1);
tx.send(Ok(IterativeTable {
table_id: table_header.table_id,
table_name: table_header.table_name,
table_kind: table_header.table_kind,
columns: table_header.columns,
rows: rows_rx,
})).await.expect("failed to send table");
rows_tx = Some(rows_tx_);
}
Frame::TableFragment(table_fragment) => {
if let Some(rows_tx) = &mut rows_tx {
rows_tx.send(Ok(table_fragment.rows)).await.expect("failed to send rows");
}
}
Frame::TableCompletion(table_completion) => {
if let Some(rows_tx) = rows_tx.take() {
if let Some(errs) = &table_completion.one_api_errors {
// todo - better error than crashing when failing to send
rows_tx.send(Err(errs.clone().into())).await.expect("failed to send rows");
}
}
}
Frame::TableProgress(_) => {}
}
}
}
}
pub struct FullDataset {
pub header: Option<DataSetHeader>,
pub completion: Option<DataSetCompletion>,
pub query_properties: Option<Vec<QueryProperties>>,
pub query_completion_information: Option<Vec<QueryCompletionInformation>>,
pub results: Vec<DataTable>,
}
impl FullDataset {
pub async fn from_async_buf_read(reader: impl AsyncBufRead + Send + Unpin + 'static) -> Partial<FullDataset> {
let vec = parse_frames_full(reader).await.map_err(|e| (None, e))?;
Self::from_frame_stream(stream::iter(vec.into_iter())).await
}
async fn from_frame_stream(stream: impl Stream<Item=Frame> + Send + 'static) -> Partial<FullDataset> {
pin_mut!(stream);
let mut dataset = FullDataset {
header: None,
completion: None,
query_properties: None,
query_completion_information: None,
results: Vec::new(),
};
let mut current_table = Some(DataTable {
table_id: 0,
table_name: "".to_string(),
@ -83,28 +194,47 @@ impl StreamingDataset {
rows: Vec::new(),
});
while let Some(frame) = stream.try_next().await.transpose() {
// TODO: handle errors
let frame = frame.expect("failed to read frame");
let mut errors: Vec<Error> = Vec::new();
while let Some(frame) = stream.next().await {
match frame {
v2::Frame::DataSetHeader(header) => {
self.header.lock().await.replace(header);
},
v2::Frame::DataSetCompletion(completion) => {
self.completion.lock().await.replace(completion);
},
Frame::DataSetHeader(header) => {
dataset.header = Some(header);
}
Frame::DataSetCompletion(completion) => {
if let Some(errs) = &completion.one_api_errors {
errors.push(errs.clone().into());
}
dataset.completion = Some(completion);
}
// TODO: properly handle errors/missing
v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => {
self.query_properties.lock().await.replace(table.deserialize_values::<QueryProperties>().expect("failed to deserialize query properties"));
},
v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryCompletionInformation => {
self.query_completion_information.lock().await.replace(table.deserialize_values::<QueryCompletionInformation>().expect("failed to deserialize query completion information"));
},
v2::Frame::DataTable(table) => {
tx.send(table).await.expect("failed to send table");
},
// TODO - handle errors
v2::Frame::TableHeader(table_header) => {
Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => {
match table.deserialize_values::<QueryProperties>() {
Ok(query_properties) => {
dataset.query_properties = Some(query_properties);
}
Err((q, e)) => {
dataset.query_properties = q;
errors.push(e);
}
}
}
Frame::DataTable(table) if table.table_kind == TableKind::QueryCompletionInformation => {
match table.deserialize_values::<QueryCompletionInformation>() {
Ok(query_completion_information) => {
dataset.query_completion_information = Some(query_completion_information);
}
Err((q, e)) => {
dataset.query_completion_information = q;
errors.push(e);
}
}
}
Frame::DataTable(table) => {
dataset.results.push(table);
}
Frame::TableHeader(table_header) => {
if let Some(table) = &mut current_table {
table.table_id = table_header.table_id;
table.table_name = table_header.table_name.clone();
@ -112,21 +242,26 @@ impl StreamingDataset {
table.columns = table_header.columns.clone();
}
}
v2::Frame::TableFragment(table_fragment) => {
Frame::TableFragment(table_fragment) => {
if let Some(table) = &mut current_table {
table.rows.extend(table_fragment.rows);
}
}
v2::Frame::TableCompletion(table_completion) => {
Frame::TableCompletion(table_completion) => {
//todo handle table errors
if let Some(table) = current_table.take() {
// TODO - handle errors
tx.send(table).await.expect("failed to send table");
dataset.results.push(table);
}
}
Frame::TableProgress(_) => {}
}
}
match &errors[..] {
[] => Partial::Ok(dataset),
[e] => Partial::Err((Some(dataset), (*e).clone())),
_ => Partial::Err((Some(dataset), Error::MultipleErrors(errors))),
}
}
}
@ -140,7 +275,7 @@ mod tests {
#[tokio::test]
async fn test_parse_frames_full() {
for (contents ,frames) in v2_files_full() {
for (contents, frames) in v2_files_full() {
println!("testing: {}", contents);
let reader = Cursor::new(contents.as_bytes());
let parsed_frames = super::parse_frames_full(reader).await.unwrap();
@ -150,7 +285,7 @@ mod tests {
#[tokio::test]
async fn test_parse_frames_iterative() {
for (contents ,frames) in v2_files_iterative() {
for (contents, frames) in v2_files_iterative() {
println!("testing: {}", contents);
let reader = Cursor::new(contents.as_bytes());
let parsed_frames = super::parse_frames_iterative(reader)

Просмотреть файл

@ -235,9 +235,11 @@ pub struct Options {
pub truncation_max_size: Option<i64>,
/// Validates user's permissions to perform the query and doesn't run the query itself.
pub validate_permissions: Option<bool>,
/// If set, enables the newlines between frames in the progressive query stream.
#[builder(default = "Some(true)")]
///
results_v2_newlines_between_frames: Option<bool>,
///
results_v2_fragment_primary_tables: Option<bool>,
error_reporting_placement: Option<String>,
/// Additional options to be passed to the service.
#[serde(flatten)]
pub additional: HashMap<String, String>,