Format Rust code using rustfmt

This commit is contained in:
github-actions[bot] 2024-01-04 12:32:30 +00:00 коммит произвёл GitHub
Родитель 5b3e4e0492
Коммит ecaca3890e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 182 добавлений и 72 удалений

Просмотреть файл

@ -5,22 +5,24 @@ use crate::connection_string::{ConnectionString, ConnectionStringAuth};
use crate::error::{Error, ParseError, Partial, Result};
use crate::operations::query::{QueryRunner, QueryRunnerBuilder, V1QueryRunner, V2QueryRunner};
use azure_core::{ClientOptions, Context, CustomHeaders, Method, Pipeline, Request, Response, ResponseBody};
use azure_core::{
ClientOptions, Context, CustomHeaders, Method, Pipeline, Request, Response, ResponseBody,
};
use crate::client_details::ClientDetails;
use crate::models::v2::Row;
use crate::operations;
use crate::operations::v2::{FullDataset, IterativeDataset};
use crate::prelude::{ClientRequestProperties, ClientRequestPropertiesBuilder, OptionsBuilder};
use crate::query::QueryBody;
use crate::request_options::Options;
use azure_core::headers::Headers;
use azure_core::prelude::{Accept, AcceptEncoding, ClientVersion, ContentType};
use futures::TryStreamExt;
use serde::de::DeserializeOwned;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;
use futures::TryStreamExt;
use crate::operations;
use crate::operations::v2::{FullDataset, IterativeDataset};
use crate::query::QueryBody;
use crate::request_options::Options;
/// Options for specifying how a Kusto client will behave
#[derive(Clone, Default)]
@ -255,7 +257,11 @@ impl KustoClient {
.rows
.into_iter()
.map(Row::into_result)
.map(|r| r.and_then(|v| serde_json::from_value::<T>(serde_json::Value::Array(v)).map_err(Error::from)))
.map(|r| {
r.and_then(|v| {
serde_json::from_value::<T>(serde_json::Value::Array(v)).map_err(Error::from)
})
})
.collect::<Result<Vec<T>>>()?;
Ok(results)
@ -292,29 +298,74 @@ impl KustoClient {
}
#[must_use]
pub async fn query(&self, database: impl Into<String>, query: impl Into<String>, options: impl Into<Option<ClientRequestProperties>>) -> Partial<FullDataset> {
let body = self.execute(QueryKind::Query, database.into(), query.into(), options.into()).await.map_err(|e| (None, e))?;
pub async fn query(
&self,
database: impl Into<String>,
query: impl Into<String>,
options: impl Into<Option<ClientRequestProperties>>,
) -> Partial<FullDataset> {
let body = self
.execute(
QueryKind::Query,
database.into(),
query.into(),
options.into(),
)
.await
.map_err(|e| (None, e))?;
FullDataset::from_async_buf_read(body.into_stream().map_err(|e| std::io::Error::other(e)).into_async_read()).await
FullDataset::from_async_buf_read(
body.into_stream()
.map_err(|e| std::io::Error::other(e))
.into_async_read(),
)
.await
}
#[must_use]
pub async fn iterative_query(&self, database: impl Into<String>, query: impl Into<String>, options: impl Into<Option<ClientRequestProperties>>) -> Result<Arc<IterativeDataset>> {
let iterative_options = ClientRequestPropertiesBuilder::default().with_options(
OptionsBuilder::default()
.with_results_v2_newlines_between_frames(true)
.with_results_v2_fragment_primary_tables(true)
.with_error_reporting_placement("end_of_table").build().expect("Failed to build options"))
pub async fn iterative_query(
&self,
database: impl Into<String>,
query: impl Into<String>,
options: impl Into<Option<ClientRequestProperties>>,
) -> Result<Arc<IterativeDataset>> {
let iterative_options = ClientRequestPropertiesBuilder::default()
.with_options(
OptionsBuilder::default()
.with_results_v2_newlines_between_frames(true)
.with_results_v2_fragment_primary_tables(true)
.with_error_reporting_placement("end_of_table")
.build()
.expect("Failed to build options"),
)
.build()
.expect("Failed to build options");
//TODO merge options
let body = self.execute(QueryKind::Query, database.into(), query.into(), iterative_options.into()).await?;
Ok(IterativeDataset::from_async_buf_read(body.into_stream().map_err(|e| std::io::Error::other(e)).into_async_read()).await)
let body = self
.execute(
QueryKind::Query,
database.into(),
query.into(),
iterative_options.into(),
)
.await?;
Ok(IterativeDataset::from_async_buf_read(
body.into_stream()
.map_err(|e| std::io::Error::other(e))
.into_async_read(),
)
.await)
}
async fn execute(&self, kind: QueryKind, database: String, query: String, options: Option<ClientRequestProperties>) -> Result<ResponseBody> {
async fn execute(
&self,
kind: QueryKind,
database: String,
query: String,
options: Option<ClientRequestProperties>,
) -> Result<ResponseBody> {
let url = match kind {
QueryKind::Management => self.management_url(),
QueryKind::Query => self.query_url(),
@ -325,7 +376,6 @@ impl KustoClient {
let mut headers = (*self.default_headers).clone();
if let Some(client_request_properties) = &options {
if let Some(client_request_id) = &client_request_properties.client_request_id {
headers.insert("x-ms-client-request-id", client_request_id);
@ -346,16 +396,16 @@ impl KustoClient {
let bytes = bytes::Bytes::from(serde_json::to_string(&body)?);
request.set_body(bytes);
let response = self
.pipeline()
.send(&mut context, &mut request)
.await?;
let response = self.pipeline().send(&mut context, &mut request).await?;
let status = response.status();
if !status.is_success() {
let body = response.into_body().collect_string().await;
return Err(Error::HttpError(status, body.unwrap_or_else(|e| format!("{:?}", e))));
return Err(Error::HttpError(
status,
body.unwrap_or_else(|e| format!("{:?}", e)),
));
}
Ok(response.into_body())

Просмотреть файл

@ -1,7 +1,7 @@
//! Defines [Error] for representing failures in various operations.
use azure_core::StatusCode;
use std::fmt::Debug;
use oauth2::url;
use std::fmt::Debug;
use crate::models::v2::OneApiError;
use thiserror;
@ -74,7 +74,13 @@ impl From<Vec<Error>> for Error {
impl From<Vec<OneApiError>> for Error {
fn from(errors: Vec<OneApiError>) -> Self {
if errors.len() == 1 {
Error::from(errors.into_iter().next().map(Error::QueryApiError).expect("Should be one"))
Error::from(
errors
.into_iter()
.next()
.map(Error::QueryApiError)
.expect("Should be one"),
)
} else {
Error::MultipleErrors(errors.into_iter().map(Error::QueryApiError).collect())
}

Просмотреть файл

@ -1,12 +1,26 @@
use crate::models::ColumnType;
use crate::models::v2::{Column, DataSetCompletion, DataSetHeader, DataTable, Frame, OneApiError, OneApiErrors, Row, TableCompletion, TableFragment, TableFragmentType, TableHeader, TableKind};
use crate::models::v2::ErrorReportingPlacement::EndOfTable;
use crate::models::v2::{
Column, DataSetCompletion, DataSetHeader, DataTable, Frame, OneApiError, OneApiErrors, Row,
TableCompletion, TableFragment, TableFragmentType, TableHeader, TableKind,
};
use crate::models::ColumnType;
const V2_VALID_FRAMES: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/validFrames.json"));
const V2_TWO_TABLES: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/twoTables.json"));
const V2_PARTIAL_ERROR: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/partialError.json"));
const V2_PARTIAL_ERROR_FULL_DATASET: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/partialErrorFullDataset.json"));
const V2_VALID_FRAMES: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/inputs/v2/validFrames.json"
));
const V2_TWO_TABLES: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/inputs/v2/twoTables.json"
));
const V2_PARTIAL_ERROR: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/inputs/v2/partialError.json"
));
const V2_PARTIAL_ERROR_FULL_DATASET: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/inputs/v2/partialErrorFullDataset.json"
));
fn expected_v2_valid_frames() -> Vec<Frame> {
vec![
@ -205,7 +219,6 @@ fn expected_v2_valid_frames() -> Vec<Frame> {
]
}
fn expected_v2_two_tables() -> Vec<Frame> {
vec![
Frame::DataSetHeader(DataSetHeader {
@ -623,17 +636,18 @@ fn expected_v2_partial_error_full_dataset() -> Vec<Frame> {
]
}
pub fn v2_files_full() -> Vec<(&'static str, Vec<Frame>)> {
vec![
(V2_VALID_FRAMES, expected_v2_valid_frames()),
(V2_TWO_TABLES, expected_v2_two_tables()),
(V2_PARTIAL_ERROR, expected_v2_partial_error()),
(V2_PARTIAL_ERROR_FULL_DATASET, expected_v2_partial_error_full_dataset()),
(
V2_PARTIAL_ERROR_FULL_DATASET,
expected_v2_partial_error_full_dataset(),
),
]
}
pub fn v2_files_iterative() -> Vec<(&'static str, Vec<Frame>)> {
vec![
(V2_VALID_FRAMES, expected_v2_valid_frames()),

Просмотреть файл

@ -1,5 +1,5 @@
use crate::{KustoDateTime, KustoDynamic, KustoGuid, KustoInt, KustoString};
use serde::{Deserialize, Serialize};
use crate::{KustoInt, KustoString, KustoDateTime, KustoDynamic, KustoGuid};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]

Просмотреть файл

@ -6,11 +6,11 @@ mod errors;
mod frames;
mod known_tables;
use crate::error::{Error, Partial};
pub use consts::*;
pub use errors::*;
pub use frames::*;
pub use known_tables::*;
use crate::error::{Error, Partial};
/// A result of a V2 query.
/// Could be a table, a part of a table, or metadata about the dataset.
@ -78,7 +78,7 @@ impl DataTable {
Err(e) => match e {
Error::MultipleErrors(e) => errors.extend(e),
_ => errors.push(e),
}
},
}
}
match (values.len(), errors.len()) {
@ -100,7 +100,7 @@ impl DataTable {
Err(e) => match e {
Error::MultipleErrors(e) => errors.extend(e),
_ => errors.push(e),
}
},
}
}

Просмотреть файл

@ -1,14 +1,19 @@
use std::sync::Arc;
use crate::error::{Error, Error::JsonError, Partial, Result};
use crate::models::v2;
use futures::{stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream, StreamExt, TryStreamExt, pin_mut};
use crate::models::v2::{
DataSetCompletion, DataSetHeader, DataTable, Frame, QueryCompletionInformation,
QueryProperties, Row, TableKind,
};
use futures::lock::Mutex;
use futures::{
pin_mut, stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream, StreamExt, TryStreamExt,
};
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::models::v2::{DataSetCompletion, DataSetHeader, DataTable, Frame, QueryCompletionInformation, QueryProperties, Row, TableKind};
pub fn parse_frames_iterative(
reader: impl AsyncBufRead + Unpin,
) -> impl Stream<Item=Result<Frame>> {
) -> impl Stream<Item = Result<Frame>> {
let buf = Vec::with_capacity(4096);
stream::unfold((reader, buf), |(mut reader, mut buf)| async move {
buf.clear();
@ -30,15 +35,12 @@ pub fn parse_frames_iterative(
})
}
async fn parse_frames_full(
mut reader: (impl AsyncBufRead + Send + Unpin),
) -> Result<Vec<Frame>> {
async fn parse_frames_full(mut reader: (impl AsyncBufRead + Send + Unpin)) -> Result<Vec<Frame>> {
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await?;
return Ok(serde_json::from_slice(&buf)?);
}
/// Arc Mutex
type M<T> = Arc<Mutex<T>>;
/// Arc Mutex Option
@ -66,7 +68,7 @@ impl IterativeDataset {
Self::new(stream)
}
fn new(stream: impl Stream<Item=Result<Frame>> + Send + 'static) -> Arc<Self> {
fn new(stream: impl Stream<Item = Result<Frame>> + Send + 'static) -> Arc<Self> {
// TODO: make channel size configurable
let (tx, rx) = tokio::sync::mpsc::channel(1);
let res = IterativeDataset {
@ -86,7 +88,11 @@ impl IterativeDataset {
res
}
async fn populate_with_stream(&self, stream: impl Stream<Item=Result<Frame>>, tx: Sender<Result<IterativeTable>>) {
async fn populate_with_stream(
&self,
stream: impl Stream<Item = Result<Frame>>,
tx: Sender<Result<IterativeTable>>,
) {
pin_mut!(stream);
let mut rows_tx = None;
@ -94,7 +100,11 @@ impl IterativeDataset {
while let Some(frame) = stream.try_next().await.transpose() {
// TODO: handle errors
let Ok(frame) = frame else {
tx.send(Err(Error::ExternalError("Failed to parse frames".to_string()))).await.expect("failed to send error");
tx.send(Err(Error::ExternalError(
"Failed to parse frames".to_string(),
)))
.await
.expect("failed to send error");
continue;
};
@ -105,16 +115,28 @@ impl IterativeDataset {
Frame::DataSetCompletion(completion) => {
if let Some(errs) = &completion.one_api_errors {
// todo - better error than crashing when failing to send
tx.send(Err(errs.clone().into())).await.expect("failed to send error");
tx.send(Err(errs.clone().into()))
.await
.expect("failed to send error");
}
self.completion.lock().await.replace(completion);
}
// TODO: properly handle errors/missing
Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => {
self.query_properties.lock().await.replace(table.deserialize_values::<QueryProperties>().expect("failed to deserialize query properties"));
self.query_properties.lock().await.replace(
table
.deserialize_values::<QueryProperties>()
.expect("failed to deserialize query properties"),
);
}
Frame::DataTable(table) if table.table_kind == TableKind::QueryCompletionInformation => {
self.query_completion_information.lock().await.replace(table.deserialize_values::<QueryCompletionInformation>().expect("failed to deserialize query completion information"));
Frame::DataTable(table)
if table.table_kind == TableKind::QueryCompletionInformation =>
{
self.query_completion_information.lock().await.replace(
table
.deserialize_values::<QueryCompletionInformation>()
.expect("failed to deserialize query completion information"),
);
}
Frame::DataTable(table) => {
let (datatable_tx, datatable_rx) = tokio::sync::mpsc::channel(1);
@ -125,9 +147,14 @@ impl IterativeDataset {
table_kind: table.table_kind,
columns: table.columns,
rows: datatable_rx,
})).await.expect("failed to send table");
}))
.await
.expect("failed to send table");
datatable_tx.send(Ok(table.rows)).await.expect("failed to send rows");
datatable_tx
.send(Ok(table.rows))
.await
.expect("failed to send rows");
}
Frame::TableHeader(table_header) => {
let (rows_tx_, rows_rx) = tokio::sync::mpsc::channel(1);
@ -138,20 +165,28 @@ impl IterativeDataset {
table_kind: table_header.table_kind,
columns: table_header.columns,
rows: rows_rx,
})).await.expect("failed to send table");
}))
.await
.expect("failed to send table");
rows_tx = Some(rows_tx_);
}
Frame::TableFragment(table_fragment) => {
if let Some(rows_tx) = &mut rows_tx {
rows_tx.send(Ok(table_fragment.rows)).await.expect("failed to send rows");
rows_tx
.send(Ok(table_fragment.rows))
.await
.expect("failed to send rows");
}
}
Frame::TableCompletion(table_completion) => {
if let Some(rows_tx) = rows_tx.take() {
if let Some(errs) = &table_completion.one_api_errors {
// todo - better error than crashing when failing to send
rows_tx.send(Err(errs.clone().into())).await.expect("failed to send rows");
rows_tx
.send(Err(errs.clone().into()))
.await
.expect("failed to send rows");
}
}
}
@ -170,12 +205,16 @@ pub struct FullDataset {
}
impl FullDataset {
pub async fn from_async_buf_read(reader: impl AsyncBufRead + Send + Unpin + 'static) -> Partial<FullDataset> {
pub async fn from_async_buf_read(
reader: impl AsyncBufRead + Send + Unpin + 'static,
) -> Partial<FullDataset> {
let vec = parse_frames_full(reader).await.map_err(|e| (None, e))?;
Self::from_frame_stream(stream::iter(vec.into_iter())).await
}
async fn from_frame_stream(stream: impl Stream<Item=Frame> + Send + 'static) -> Partial<FullDataset> {
async fn from_frame_stream(
stream: impl Stream<Item = Frame> + Send + 'static,
) -> Partial<FullDataset> {
pin_mut!(stream);
let mut dataset = FullDataset {
@ -196,7 +235,6 @@ impl FullDataset {
let mut errors: Vec<Error> = Vec::new();
while let Some(frame) = stream.next().await {
match frame {
Frame::DataSetHeader(header) => {
@ -220,10 +258,13 @@ impl FullDataset {
}
}
}
Frame::DataTable(table) if table.table_kind == TableKind::QueryCompletionInformation => {
Frame::DataTable(table)
if table.table_kind == TableKind::QueryCompletionInformation =>
{
match table.deserialize_values::<QueryCompletionInformation>() {
Ok(query_completion_information) => {
dataset.query_completion_information = Some(query_completion_information);
dataset.query_completion_information =
Some(query_completion_information);
}
Err((q, e)) => {
dataset.query_completion_information = q;
@ -269,9 +310,9 @@ impl FullDataset {
#[cfg(test)]
mod tests {
use crate::models::test_helpers::{v2_files_full, v2_files_iterative};
use futures::io::Cursor;
use futures::StreamExt;
use crate::models::test_helpers::{v2_files_full, v2_files_iterative};
#[tokio::test]
async fn test_parse_frames_full() {
@ -290,9 +331,9 @@ mod tests {
let reader = Cursor::new(contents.as_bytes());
let parsed_frames = super::parse_frames_iterative(reader)
.map(|f| f.expect("failed to parse frame"))
.collect::<Vec<_>>().await;
.collect::<Vec<_>>()
.await;
assert_eq!(parsed_frames, frames);
}
}
}

Просмотреть файл

@ -237,7 +237,7 @@ pub struct Options {
pub validate_permissions: Option<bool>,
///
results_v2_newlines_between_frames: Option<bool>,
///
///
results_v2_fragment_primary_tables: Option<bool>,
error_reporting_placement: Option<String>,
/// Additional options to be passed to the service.

Просмотреть файл

@ -103,7 +103,6 @@ enum KustoValue {
Dynamic(KustoDynamic),
}
impl FromStr for KustoString {
type Err = Infallible;