streaming query draft
This commit is contained in:
Родитель
8eaaaa6d50
Коммит
6331fb1698
|
@ -31,10 +31,12 @@ lazy_static = "1.4.0"
|
|||
hashbrown = "0.12.0"
|
||||
regex = "1.5.5"
|
||||
time = { version = "0.3.9", features = ["serde", "parsing", "formatting", "macros", "serde-well-known"] }
|
||||
tokio-serde = "0.8.0"
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = "0.9"
|
||||
tokio = { version = "1", features = ["macros"] }
|
||||
clap = { version = "3.1.8", features = ["derive", "env"] }
|
||||
|
||||
[features]
|
||||
default = ["arrow"]
|
||||
|
|
|
@ -1,52 +1,78 @@
|
|||
use azure_kusto_data::prelude::*;
|
||||
use clap::Parser;
|
||||
use futures::{pin_mut, TryStreamExt};
|
||||
use std::error::Error;
|
||||
|
||||
/// Simple program to greet a person
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author, version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// Kusto cluster endpoint
|
||||
#[clap(env, long)]
|
||||
endpoint: String,
|
||||
|
||||
/// Name of the database
|
||||
#[clap(env, long)]
|
||||
database: String,
|
||||
|
||||
/// Query to execute
|
||||
#[clap(env, long)]
|
||||
query: String,
|
||||
|
||||
#[clap(env, long)]
|
||||
application_id: String,
|
||||
|
||||
#[clap(env, long)]
|
||||
application_key: String,
|
||||
|
||||
#[clap(env, long)]
|
||||
tenant_id: String,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
|
||||
let service_url = std::env::args()
|
||||
.nth(1)
|
||||
.expect("please specify service url name as first command line parameter");
|
||||
|
||||
let database = std::env::args()
|
||||
.nth(2)
|
||||
.expect("please specify database name as second command line parameter");
|
||||
|
||||
let query = std::env::args()
|
||||
.nth(3)
|
||||
.expect("please specify query as third command line parameter");
|
||||
|
||||
let client_id =
|
||||
std::env::var("AZURE_CLIENT_ID").expect("Set env variable AZURE_CLIENT_ID first!");
|
||||
let client_secret =
|
||||
std::env::var("AZURE_CLIENT_SECRET").expect("Set env variable AZURE_CLIENT_SECRET first!");
|
||||
let authority_id =
|
||||
std::env::var("AZURE_TENANT_ID").expect("Set env variable AZURE_TENANT_ID first!");
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let args = Args::parse();
|
||||
|
||||
let kcsb = ConnectionStringBuilder::new_with_aad_application_key_authentication(
|
||||
&service_url,
|
||||
&authority_id,
|
||||
&client_id,
|
||||
&client_secret,
|
||||
&args.endpoint,
|
||||
&args.tenant_id,
|
||||
&args.application_id,
|
||||
&args.application_key,
|
||||
);
|
||||
|
||||
let client = KustoClient::try_from(kcsb).unwrap();
|
||||
|
||||
let response = client
|
||||
.execute_query(database, query)
|
||||
.execute_query(args.database.clone(), args.query.clone())
|
||||
.into_future()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for table in &response.tables {
|
||||
match table {
|
||||
ResultTable::DataSetHeader(header) => println!("header: {:?}", header),
|
||||
ResultTable::DataTable(table) => println!("table: {:?}", table),
|
||||
ResultTable::DataSetCompletion(completion) => println!("completion: {:?}", completion),
|
||||
ResultTable::DataSetHeader(header) => println!("header: {:#?}", header),
|
||||
ResultTable::DataTable(table) => println!("table: {:#?}", table),
|
||||
ResultTable::DataSetCompletion(completion) => println!("completion: {:#?}", completion),
|
||||
}
|
||||
}
|
||||
|
||||
let primary_results = response.into_primary_results().collect::<Vec<_>>();
|
||||
println!("primary results: {:?}", primary_results);
|
||||
println!("primary results: {:#?}", primary_results);
|
||||
|
||||
let mut stream = client
|
||||
.execute_query(args.database, args.query)
|
||||
.into_stream()
|
||||
.await?;
|
||||
|
||||
pin_mut!(stream);
|
||||
|
||||
while let Some(table) = stream.try_next().await? {
|
||||
match table {
|
||||
ResultTable::DataSetHeader(header) => println!("header: {:#?}", header),
|
||||
ResultTable::DataTable(table) => println!("table: {:#?}", table),
|
||||
ResultTable::DataSetCompletion(completion) => println!("completion: {:#?}", completion),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
use futures::io::BufReader;
|
||||
use futures::{pin_mut, stream, AsyncRead, AsyncReadExt, Stream, TryStreamExt};
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
|
||||
// TODO: Find a crate that does this better / move this into another crate
|
||||
|
||||
async fn read_skipping_ws(reader: impl AsyncRead) -> io::Result<u8> {
|
||||
pin_mut!(reader);
|
||||
loop {
|
||||
let mut byte = 0u8;
|
||||
reader.read_exact(std::slice::from_mut(&mut byte)).await?;
|
||||
if !byte.is_ascii_whitespace() {
|
||||
return Ok(byte);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn invalid_data(msg: &str) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::InvalidData, msg)
|
||||
}
|
||||
|
||||
async fn deserialize_single<T: DeserializeOwned, R: AsyncRead>(
|
||||
reader: R,
|
||||
) -> io::Result<(T, Vec<u8>)> {
|
||||
let mut balance = 0;
|
||||
let mut vec = Vec::with_capacity(4096);
|
||||
let mut buf = [0; 4096];
|
||||
let mut leftover = Vec::with_capacity(4096);
|
||||
pin_mut!(reader);
|
||||
loop {
|
||||
let size = reader.read(&mut buf).await?;
|
||||
let mut result = None;
|
||||
for (i, byte) in buf[..size].iter().copied().enumerate() {
|
||||
match byte {
|
||||
b'{' => balance += 1,
|
||||
b'}' => balance -= 1,
|
||||
b',' | b']' if balance == 0 => {
|
||||
result = Some(i);
|
||||
break;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
if let Some(i) = result {
|
||||
vec.extend_from_slice(&buf[..i]);
|
||||
leftover.extend_from_slice(&buf[i..size]);
|
||||
break;
|
||||
} else {
|
||||
vec.extend_from_slice(&buf[..size]);
|
||||
}
|
||||
}
|
||||
|
||||
Ok((serde_json::from_slice(vec.as_slice())?, leftover))
|
||||
}
|
||||
|
||||
async fn yield_next_obj<T: DeserializeOwned, R: AsyncRead>(
|
||||
reader: R,
|
||||
first_time: bool,
|
||||
) -> io::Result<Option<(T, Vec<u8>)>> {
|
||||
pin_mut!(reader);
|
||||
|
||||
match read_skipping_ws(&mut reader).await? {
|
||||
b'[' if first_time => {
|
||||
let (result, leftover) = deserialize_single(&mut reader).await?;
|
||||
Ok(Some((result, leftover)))
|
||||
}
|
||||
b',' if !first_time => {
|
||||
let (result, leftover) = deserialize_single(&mut reader).await?;
|
||||
Ok(Some((result, leftover)))
|
||||
}
|
||||
b']' if !first_time => Ok(None),
|
||||
c => Err(invalid_data(&format!("Unexpected char {}", c as char))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iter_results<T: DeserializeOwned, R: AsyncRead + 'static>(
|
||||
reader: R,
|
||||
) -> impl Stream<Item = Result<T, io::Error>> {
|
||||
stream::try_unfold(
|
||||
(
|
||||
Box::pin(BufReader::new(reader)) as Pin<Box<dyn AsyncRead>>,
|
||||
true,
|
||||
),
|
||||
|(mut reader, first_time)| async move {
|
||||
let result = yield_next_obj::<T, _>(reader.as_mut(), first_time).await?;
|
||||
Ok(result.map(|(result, leftover)| {
|
||||
(
|
||||
result,
|
||||
(
|
||||
Box::pin(
|
||||
stream::iter(vec![Ok(leftover.into_iter())])
|
||||
.into_async_read()
|
||||
.chain(reader),
|
||||
) as Pin<Box<dyn AsyncRead>>,
|
||||
false,
|
||||
),
|
||||
)
|
||||
}))
|
||||
},
|
||||
)
|
||||
}
|
|
@ -1,2 +1,3 @@
|
|||
mod async_deserializer;
|
||||
pub mod query;
|
||||
pub mod types;
|
||||
|
|
|
@ -3,13 +3,18 @@ use crate::arrow::convert_table;
|
|||
use crate::client::KustoClient;
|
||||
#[cfg(feature = "arrow")]
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use async_convert::TryFrom;
|
||||
use std::io::ErrorKind;
|
||||
|
||||
use azure_core::prelude::*;
|
||||
use azure_core::setters;
|
||||
use azure_core::{collect_pinned_stream, Response as HttpResponse};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::io::BufReader;
|
||||
use futures::{io, pin_mut, stream, AsyncRead, AsyncReadExt, Stream, TryStreamExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use std::pin::Pin;
|
||||
|
||||
type ExecuteQuery = BoxFuture<'static, crate::error::Result<KustoResponseDataSetV2>>;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
@ -58,45 +63,66 @@ impl ExecuteQueryBuilder {
|
|||
context: Context => context,
|
||||
}
|
||||
|
||||
pub async fn into_response(self) -> crate::error::Result<HttpResponse> {
|
||||
let url = self.client.query_url();
|
||||
let mut request = self
|
||||
.client
|
||||
.prepare_request(url.parse()?, http::Method::POST);
|
||||
|
||||
if let Some(request_id) = &self.client_request_id {
|
||||
request.insert_headers(request_id);
|
||||
};
|
||||
if let Some(app) = &self.app {
|
||||
request.insert_headers(app);
|
||||
};
|
||||
if let Some(user) = &self.user {
|
||||
request.insert_headers(user);
|
||||
};
|
||||
|
||||
let body = QueryBody {
|
||||
db: self.database,
|
||||
csl: self.query,
|
||||
};
|
||||
let bytes = bytes::Bytes::from(serde_json::to_string(&body)?);
|
||||
request.insert_headers(&ContentLength::new(bytes.len() as i32));
|
||||
request.set_body(bytes.into());
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.pipeline()
|
||||
.send(&mut self.context.clone(), &mut request)
|
||||
.await?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub async fn into_stream(
|
||||
self,
|
||||
) -> crate::error::Result<impl Stream<Item = Result<ResultTable, io::Error>>> {
|
||||
let response = self.into_response().await?;
|
||||
let (_status_code, _header_map, pinned_stream) = response.deconstruct();
|
||||
let reader = pinned_stream
|
||||
.map_err(|e| std::io::Error::new(ErrorKind::Other, e))
|
||||
.into_async_read();
|
||||
|
||||
Ok(async_deserializer::iter_results::<ResultTable, _>(reader))
|
||||
}
|
||||
|
||||
pub fn into_future(self) -> ExecuteQuery {
|
||||
let this = self.clone();
|
||||
let ctx = self.context.clone();
|
||||
let this = self;
|
||||
|
||||
Box::pin(async move {
|
||||
let url = this.client.query_url();
|
||||
let mut request = this
|
||||
.client
|
||||
.prepare_request(url.parse()?, http::Method::POST);
|
||||
|
||||
if let Some(request_id) = &this.client_request_id {
|
||||
request.insert_headers(request_id);
|
||||
};
|
||||
if let Some(app) = &this.app {
|
||||
request.insert_headers(app);
|
||||
};
|
||||
if let Some(user) = &this.user {
|
||||
request.insert_headers(user);
|
||||
};
|
||||
|
||||
let body = QueryBody {
|
||||
db: this.database,
|
||||
csl: this.query,
|
||||
};
|
||||
let bytes = bytes::Bytes::from(serde_json::to_string(&body)?);
|
||||
request.insert_headers(&ContentLength::new(bytes.len() as i32));
|
||||
request.set_body(bytes.into());
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.pipeline()
|
||||
.send(&mut ctx.clone(), &mut request)
|
||||
.await?;
|
||||
|
||||
<KustoResponseDataSetV2 as TryFrom<HttpResponse>>::try_from(response).await
|
||||
let response = this.into_response().await?;
|
||||
<KustoResponseDataSetV2 as async_convert::TryFrom<HttpResponse>>::try_from(response)
|
||||
.await
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
use crate::operations::async_deserializer;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::{self};
|
||||
|
||||
// TODO enable once in stable
|
||||
// #[cfg(feature = "into_future")]
|
||||
// impl std::future::IntoFuture for ExecuteQueryBuilder {
|
||||
|
|
Загрузка…
Ссылка в новой задаче