Added back consuming
This commit is contained in:
Родитель
4f44a535a9
Коммит
7c0fa47a5a
|
@ -80,7 +80,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
}
|
||||
|
||||
// Print the primary tables
|
||||
let primary_results = response.primary_results().collect::<Vec<_>>();
|
||||
let primary_results = response.into_primary_results().collect::<Vec<_>>();
|
||||
println!("primary results: {:#?}", primary_results);
|
||||
|
||||
println!("Querying {} with streaming client", args.query);
|
||||
|
|
|
@ -190,12 +190,12 @@ impl std::convert::TryFrom<KustoResponse> for KustoResponseDataSetV1 {
|
|||
}
|
||||
}
|
||||
|
||||
struct KustoResponseDataSetV2TableIterator<'a, T: Iterator<Item = &'a V2QueryResult>> {
|
||||
struct KustoResponseDataSetV2TableIterator<T: Iterator<Item = V2QueryResult>> {
|
||||
tables: T,
|
||||
finished: bool,
|
||||
}
|
||||
|
||||
impl<'a, T: Iterator<Item = &'a V2QueryResult>> KustoResponseDataSetV2TableIterator<'a, T> {
|
||||
impl<T: Iterator<Item = V2QueryResult>> KustoResponseDataSetV2TableIterator<T> {
|
||||
fn new(tables: T) -> Self {
|
||||
Self {
|
||||
tables,
|
||||
|
@ -204,9 +204,7 @@ impl<'a, T: Iterator<Item = &'a V2QueryResult>> KustoResponseDataSetV2TableItera
|
|||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Iterator<Item = &'a V2QueryResult>> Iterator
|
||||
for KustoResponseDataSetV2TableIterator<'a, T>
|
||||
{
|
||||
impl<T: Iterator<Item = V2QueryResult>> Iterator for KustoResponseDataSetV2TableIterator<T> {
|
||||
type Item = DataTable;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
|
@ -219,7 +217,7 @@ impl<'a, T: Iterator<Item = &'a V2QueryResult>> Iterator
|
|||
});
|
||||
|
||||
if let Some(V2QueryResult::DataTable(t)) = next_table {
|
||||
return Some(t.clone());
|
||||
return Some(t);
|
||||
}
|
||||
|
||||
let mut table = DataTable {
|
||||
|
@ -232,9 +230,9 @@ impl<'a, T: Iterator<Item = &'a V2QueryResult>> Iterator
|
|||
|
||||
if let Some(V2QueryResult::TableHeader(header)) = next_table {
|
||||
table.table_id = header.table_id;
|
||||
table.table_kind = header.table_kind.clone();
|
||||
table.table_name = header.table_name.clone();
|
||||
table.columns = header.columns.clone();
|
||||
table.table_kind = header.table_kind;
|
||||
table.table_name = header.table_name;
|
||||
table.columns = header.columns;
|
||||
} else {
|
||||
self.finished = true;
|
||||
return None;
|
||||
|
@ -247,8 +245,8 @@ impl<'a, T: Iterator<Item = &'a V2QueryResult>> Iterator
|
|||
V2QueryResult::TableFragment(fragment) => {
|
||||
assert_eq!(fragment.table_id, table.table_id);
|
||||
match fragment.table_fragment_type {
|
||||
TableFragmentType::DataAppend => table.rows.extend(fragment.rows.clone()),
|
||||
TableFragmentType::DataReplace => table.rows = fragment.rows.clone(),
|
||||
TableFragmentType::DataAppend => table.rows.extend(fragment.rows),
|
||||
TableFragmentType::DataReplace => table.rows = fragment.rows,
|
||||
};
|
||||
}
|
||||
V2QueryResult::TableProgress(progress) => {
|
||||
|
@ -256,7 +254,10 @@ impl<'a, T: Iterator<Item = &'a V2QueryResult>> Iterator
|
|||
}
|
||||
V2QueryResult::TableCompletion(completion) => {
|
||||
assert_eq!(completion.table_id, table.table_id);
|
||||
assert_eq!(completion.row_count, table.rows.len() as i32);
|
||||
assert_eq!(
|
||||
completion.row_count,
|
||||
TryInto::<i32>::try_into(table.rows.len()).expect("Row count overflow")
|
||||
);
|
||||
finished_table = true;
|
||||
break;
|
||||
}
|
||||
|
@ -278,20 +279,35 @@ impl KustoResponseDataSetV2 {
|
|||
self.tables.len()
|
||||
}
|
||||
|
||||
pub fn parsed_data_tables<'a>(&'a self) -> impl Iterator<Item = DataTable> + 'a {
|
||||
KustoResponseDataSetV2TableIterator::new(self.tables.iter())
|
||||
pub fn parsed_data_tables(&self) -> impl Iterator<Item = DataTable> + '_ {
|
||||
KustoResponseDataSetV2TableIterator::new(self.tables.iter().cloned())
|
||||
}
|
||||
|
||||
/// Consumes the response into an iterator over all PrimaryResult tables within the response dataset
|
||||
pub fn primary_results<'a>(&'a self) -> impl Iterator<Item = DataTable> + 'a {
|
||||
pub fn primary_results(&self) -> impl Iterator<Item = DataTable> + '_ {
|
||||
self.parsed_data_tables()
|
||||
.filter(|t| t.table_kind == TableKind::PrimaryResult)
|
||||
}
|
||||
|
||||
#[cfg(feature = "arrow")]
|
||||
pub fn record_batches<'a>(&'a self) -> impl Iterator<Item = Result<RecordBatch>> + 'a {
|
||||
pub fn record_batches(&self) -> impl Iterator<Item = Result<RecordBatch>> + '_ {
|
||||
self.primary_results().map(convert_table)
|
||||
}
|
||||
|
||||
pub fn into_parsed_data_tables(self) -> impl Iterator<Item = DataTable> {
|
||||
KustoResponseDataSetV2TableIterator::new(self.tables.into_iter())
|
||||
}
|
||||
|
||||
/// Consumes the response into an iterator over all PrimaryResult tables within the response dataset
|
||||
pub fn into_primary_results(self) -> impl Iterator<Item = DataTable> {
|
||||
self.into_parsed_data_tables()
|
||||
.filter(|t| t.table_kind == TableKind::PrimaryResult)
|
||||
}
|
||||
|
||||
#[cfg(feature = "arrow")]
|
||||
pub fn into_record_batches(self) -> impl Iterator<Item = Result<RecordBatch>> {
|
||||
self.into_primary_results().map(convert_table)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
|
||||
|
|
Загрузка…
Ссылка в новой задаче