This commit is contained in:
Robert Pack 2022-04-23 17:40:48 +02:00
Родитель 8b0e4314c1
Коммит b5b678947d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: AF0F16B1EB819B7D
7 изменённых файлов: 312 добавлений и 184 удалений

Просмотреть файл

@ -12,12 +12,12 @@ keywords = ["sdk", "azure", "kusto", "azure-data-explorer"]
categories = ["api-bindings"]
[dependencies]
arrow = { version = "9", optional = true }
azure_core = { git = "https://github.com/roeap/azure-sdk-for-rust", branch="kusto" , features = [
arrow = { version = "12", optional = true }
azure_core = { git = "https://github.com/roeap/azure-sdk-for-rust", rev = "f4d7cf5899b87e59b84015044e12d4a19da40af6", features = [
"enable_reqwest",
"enable_reqwest_gzip",
] }
azure_identity = { git = "https://github.com/roeap/azure-sdk-for-rust", branch="kusto" }
azure_identity = { git = "https://github.com/roeap/azure-sdk-for-rust", rev = "f4d7cf5899b87e59b84015044e12d4a19da40af6" }
async-trait = "0.1"
async-convert = "1"
bytes = "1"
@ -25,9 +25,18 @@ futures = "0.3"
http = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = { version = "1.12.0", features = ["json"] }
thiserror = "1"
lazy_static = "1.4.0"
hashbrown = "0.12.0"
regex = "1.5.5"
time = { version = "0.3.9", features = [
"serde",
"parsing",
"formatting",
"macros",
"serde-well-known",
] }
[dev-dependencies]
env_logger = "0.9"

Просмотреть файл

@ -1,184 +1,147 @@
use crate::operations::query::*;
use std::convert::TryInto;
use std::str::FromStr;
use std::sync::Arc;
use arrow::array::TimestampNanosecondArray;
use arrow::{
array::{
ArrayRef, BooleanArray, DurationNanosecondArray, Float64Array, Int32Array, Int64Array,
StringArray,
},
compute::cast,
datatypes::{DataType, Field, Schema, TimeUnit},
record_batch::RecordBatch,
};
use std::sync::Arc;
use azure_core::error::{ErrorKind, ResultExt};
const SECOND_TO_NANOSECONDS: i64 = 1000000000;
const MINUTES_TO_SECONDS: i64 = 60;
const HOURS_TO_SECONDS: i64 = 60 * MINUTES_TO_SECONDS;
const DAYS_TO_SECONDS: i64 = 24 * HOURS_TO_SECONDS;
const TICK_TO_NANOSECONDS: i64 = 100;
use crate::error::Result;
use crate::models::ColumnType;
use crate::operations::query::*;
use crate::types::{KustoDateTime, KustoDuration};
#[inline]
fn to_nanoseconds(days: i64, hours: i64, minutes: i64, seconds: i64, ticks: i64) -> i64 {
let d_secs = days * DAYS_TO_SECONDS;
let h_secs = hours * HOURS_TO_SECONDS;
let m_secs = minutes * MINUTES_TO_SECONDS;
let total_secs = d_secs + h_secs + m_secs + seconds;
let rest_in_ns = ticks * TICK_TO_NANOSECONDS;
total_secs * SECOND_TO_NANOSECONDS + rest_in_ns
}
fn parse_segment(seg: &str) -> i64 {
let trimmed = seg.trim_start_matches('0');
if !trimmed.is_empty() {
trimmed.parse::<i64>().unwrap()
} else {
0
}
}
fn destructure_time(dur: &str) -> (i64, i64, i64) {
let parts = dur.split(':').collect::<Vec<_>>();
match parts.as_slice() {
[hours, minutes, seconds] => (
parse_segment(hours),
parse_segment(minutes),
parse_segment(seconds),
),
_ => (0, 0, 0),
}
}
/// The timespan format Kusto returns is 'd.hh:mm:ss.ssssss' or 'hh:mm:ss.ssssss' or 'hh:mm:ss'
/// Kusto also stores fractions in ticks: 1 tick = 100 ns
pub fn string_to_duration_i64(dur: Option<&str>) -> Option<i64> {
let dur = dur?;
let factor = if dur.starts_with('-') { -1 } else { 1 };
let parts: Vec<&str> = dur.trim_start_matches('-').split('.').collect();
let ns = match parts.as_slice() {
[days, hours, ticks] => {
let days_ = parse_segment(days);
let ticks_ = parse_segment(ticks);
let (hours, minutes, seconds) = destructure_time(hours);
to_nanoseconds(days_, hours, minutes, seconds, ticks_)
}
[first, ticks] => {
let ticks_ = parse_segment(ticks);
let (hours, minutes, seconds) = destructure_time(first);
to_nanoseconds(0, hours, minutes, seconds, ticks_)
}
[one] => {
let (hours, minutes, seconds) = destructure_time(one);
to_nanoseconds(0, hours, minutes, seconds, 0)
}
_ => 0,
};
Some(factor * ns)
}
fn convert_array_string(values: Vec<serde_json::Value>) -> ArrayRef {
let strings: Vec<Option<String>> =
serde_json::from_value(serde_json::Value::Array(values)).unwrap();
fn convert_array_string(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let strings: Vec<Option<String>> = serde_json::from_value(serde_json::Value::Array(values))?;
let strings: Vec<Option<&str>> = strings.iter().map(|opt| opt.as_deref()).collect();
Arc::new(StringArray::from(strings))
Ok(Arc::new(StringArray::from(strings)))
}
// TODO provide a safe variant for datetime conversions (chrono panics)
fn convert_array_datetime_unsafe(values: Vec<serde_json::Value>) -> ArrayRef {
let strings: Vec<Option<String>> =
serde_json::from_value(serde_json::Value::Array(values)).unwrap();
let strings: Vec<Option<&str>> = strings.iter().map(|opt| opt.as_deref()).collect();
let string_array: ArrayRef = Arc::new(StringArray::from(strings));
cast(
&string_array,
&DataType::Timestamp(TimeUnit::Nanosecond, None),
)
.unwrap()
fn convert_array_datetime(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let dates: Vec<String> = serde_json::from_value(serde_json::Value::Array(values))?;
let timestamps = dates
.into_iter()
.map(|d| {
KustoDateTime::from_str(&d)
.ok()
.map(|d| d.unix_timestamp_nanos())
.and_then(|n| n.try_into().ok())
})
.collect::<Vec<Option<i64>>>();
let dates_array = Arc::new(TimestampNanosecondArray::from(timestamps));
Ok(dates_array)
}
fn safe_map_f64(value: serde_json::Value) -> Option<f64> {
fn safe_map_f64(value: serde_json::Value) -> Result<Option<f64>> {
match value {
serde_json::Value::String(val) if val == "NaN" => None,
serde_json::Value::String(val) if val == "Infinity" => Some(f64::INFINITY),
serde_json::Value::String(val) if val == "-Infinity" => Some(-f64::INFINITY),
_ => serde_json::from_value(value).unwrap(),
serde_json::Value::String(val) if val == "NaN" => Ok(None),
serde_json::Value::String(val) if val == "Infinity" => Ok(Some(f64::INFINITY)),
serde_json::Value::String(val) if val == "-Infinity" => Ok(Some(-f64::INFINITY)),
_ => Ok(serde_json::from_value(value)?),
}
}
fn convert_array_float(values: Vec<serde_json::Value>) -> ArrayRef {
let reals: Vec<Option<f64>> = values.into_iter().map(safe_map_f64).collect();
Arc::new(Float64Array::from(reals))
fn convert_array_float(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let reals: Vec<Option<f64>> = values
.into_iter()
.map(safe_map_f64)
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(Float64Array::from(reals)))
}
fn convert_array_timespan(values: Vec<serde_json::Value>) -> ArrayRef {
let strings: Vec<Option<String>> =
serde_json::from_value(serde_json::Value::Array(values)).unwrap();
fn convert_array_timespan(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let strings: Vec<String> = serde_json::from_value(serde_json::Value::Array(values))?;
let durations: Vec<Option<i64>> = strings
.iter()
.map(|opt| opt.as_deref())
.map(string_to_duration_i64)
.map(|s| {
KustoDuration::from_str(s)
.ok()
.and_then(|d| i64::try_from(d.whole_nanoseconds()).ok())
})
.collect();
Arc::new(DurationNanosecondArray::from(durations))
Ok(Arc::new(DurationNanosecondArray::from(durations)))
}
fn convert_array_bool(values: Vec<serde_json::Value>) -> ArrayRef {
let bools: Vec<Option<bool>> =
serde_json::from_value(serde_json::Value::Array(values)).unwrap();
Arc::new(BooleanArray::from(bools))
fn convert_array_bool(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let bools: Vec<Option<bool>> = serde_json::from_value(serde_json::Value::Array(values))?;
Ok(Arc::new(BooleanArray::from(bools)))
}
fn convert_array_i32(values: Vec<serde_json::Value>) -> ArrayRef {
let ints: Vec<Option<i32>> = serde_json::from_value(serde_json::Value::Array(values)).unwrap();
Arc::new(Int32Array::from(ints))
fn convert_array_i32(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let ints: Vec<Option<i32>> = serde_json::from_value(serde_json::Value::Array(values))?;
Ok(Arc::new(Int32Array::from(ints)))
}
fn convert_array_i64(values: Vec<serde_json::Value>) -> ArrayRef {
let ints: Vec<Option<i64>> = serde_json::from_value(serde_json::Value::Array(values)).unwrap();
Arc::new(Int64Array::from(ints))
fn convert_array_i64(values: Vec<serde_json::Value>) -> Result<ArrayRef> {
let ints: Vec<Option<i64>> = serde_json::from_value(serde_json::Value::Array(values))?;
Ok(Arc::new(Int64Array::from(ints)))
}
pub fn convert_column(data: Vec<serde_json::Value>, column: Column) -> (Field, ArrayRef) {
pub fn convert_column(data: Vec<serde_json::Value>, column: Column) -> Result<(Field, ArrayRef)> {
match column.column_type {
ColumnType::String => (
Field::new(column.column_name.as_str(), DataType::Utf8, true),
convert_array_string(data),
),
ColumnType::Bool | ColumnType::Boolean => (
Field::new(column.column_name.as_str(), DataType::Boolean, true),
convert_array_bool(data),
),
ColumnType::Int => (
Field::new(column.column_name.as_str(), DataType::Int32, true),
convert_array_i32(data),
),
ColumnType::Long => (
Field::new(column.column_name.as_str(), DataType::Int64, true),
convert_array_i64(data),
),
ColumnType::Real => (
Field::new(column.column_name.as_str(), DataType::Float64, true),
convert_array_float(data),
),
ColumnType::Datetime => (
Field::new(
column.column_name.as_str(),
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
convert_array_datetime_unsafe(data),
),
ColumnType::Timespan => (
Field::new(
column.column_name.as_str(),
DataType::Duration(TimeUnit::Nanosecond),
true,
),
convert_array_timespan(data),
),
ColumnType::String => convert_array_string(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Utf8, true),
data,
)
}),
ColumnType::Bool | ColumnType::Boolean => convert_array_bool(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Boolean, true),
data,
)
}),
ColumnType::Int => convert_array_i32(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Int32, true),
data,
)
}),
ColumnType::Long => convert_array_i64(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Int64, true),
data,
)
}),
ColumnType::Real => convert_array_float(data).map(|data| {
(
Field::new(column.column_name.as_str(), DataType::Float64, true),
data,
)
}),
ColumnType::Datetime => convert_array_datetime(data).map(|data| {
(
Field::new(
column.column_name.as_str(),
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
data,
)
}),
ColumnType::Timespan => convert_array_timespan(data).map(|data| {
(
Field::new(
column.column_name.as_str(),
DataType::Duration(TimeUnit::Nanosecond),
true,
),
data,
)
}),
_ => todo!(),
}
}
pub fn convert_table(table: DataTable) -> RecordBatch {
pub fn convert_table(table: DataTable) -> Result<RecordBatch> {
let mut buffer: Vec<Vec<serde_json::Value>> = Vec::with_capacity(table.columns.len());
let mut fields: Vec<Field> = Vec::with_capacity(table.columns.len());
let mut columns: Vec<ArrayRef> = Vec::with_capacity(table.columns.len());
@ -196,12 +159,15 @@ pub fn convert_table(table: DataTable) -> RecordBatch {
.into_iter()
.zip(table.columns.into_iter())
.map(|(data, column)| convert_column(data, column))
.for_each(|(field, array)| {
.try_for_each::<_, Result<()>>(|result| {
let (field, data) = result?;
fields.push(field);
columns.push(array);
});
columns.push(data);
Ok(())
})?;
RecordBatch::try_new(Arc::new(Schema::new(fields)), columns).unwrap()
Ok(RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
.context(ErrorKind::DataConversion, "Failed to create record batch")?)
}
#[cfg(test)]
@ -252,21 +218,4 @@ mod tests {
};
assert_eq!(t, ref_tbl)
}
#[test]
fn string_conversion() {
let refs: Vec<(&str, i64)> = vec![
("1.00:00:00.0000000", 86400000000000),
("01:00:00.0000000", 3600000000000),
("01:00:00", 3600000000000),
("00:05:00.0000000", 300000000000),
("00:00:00.0000001", 100),
("-01:00:00", -3600000000000),
("-1.00:00:00.0000000", -86400000000000),
];
for (from, to) in refs {
assert_eq!(string_to_duration_i64(Some(from)), Some(to));
}
}
}

Просмотреть файл

@ -9,6 +9,7 @@ use azure_identity::token_credentials::{
AzureCliCredential, ClientSecretCredential, DefaultAzureCredential,
ImdsManagedIdentityCredential, TokenCredentialOptions,
};
use http::Uri;
use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;
@ -112,8 +113,8 @@ impl KustoClient {
ExecuteQueryBuilder::new(self.clone(), database.into(), query.into(), Context::new())
}
pub(crate) fn prepare_request(&self, uri: &str, http_method: http::Method) -> Request {
let mut request = Request::new(uri.parse().unwrap(), http_method);
pub(crate) fn prepare_request(&self, uri: Uri, http_method: http::Method) -> Request {
let mut request = Request::new(uri, http_method);
request.insert_headers(&Version::from(API_VERSION));
request.insert_headers(&Accept::from("application/json"));
request.insert_headers(&ContentType::new("application/json; charset=utf-8"));

Просмотреть файл

@ -1,4 +1,5 @@
//! Defines `KustoRsError` for representing failures in various operations.
use http::uri::InvalidUri;
use std::fmt::Debug;
use thiserror;
@ -12,8 +13,11 @@ pub enum Error {
ExternalError(String),
/// Error raised when an invalid argument / option is provided.
#[error("Type conversion not available")]
InvalidArgumentError(String),
#[error("Invalid argument {source}")]
InvalidArgumentError {
#[source]
source: Box<dyn std::error::Error>,
},
/// Error raised when specific functionality is not (yet) implemented
#[error("Feature not implemented")]
@ -40,8 +44,10 @@ impl From<azure_core::error::Error> for Error {
}
}
impl From<azure_core::StreamError> for Error {
fn from(error: azure_core::StreamError) -> Self {
Self::AzureError(azure_core::Error::Stream(error))
impl From<InvalidUri> for Error {
fn from(error: InvalidUri) -> Self {
Self::InvalidArgumentError {
source: Box::new(error),
}
}
}

Просмотреть файл

@ -4,5 +4,7 @@ pub mod authorization_policy;
pub mod client;
pub mod connection_string;
pub mod error;
pub mod models;
mod operations;
pub mod prelude;
pub mod types;

Просмотреть файл

@ -64,7 +64,9 @@ impl ExecuteQueryBuilder {
Box::pin(async move {
let url = this.client.query_url();
let mut request = this.client.prepare_request(url, http::Method::POST);
let mut request = this
.client
.prepare_request(url.parse()?, http::Method::POST);
if let Some(request_id) = &this.client_request_id {
request.insert_headers(request_id);
@ -129,17 +131,16 @@ impl KustoResponseDataSetV2 {
/// Consumes the response into an iterator over all PrimaryResult tables within the response dataset
pub fn into_primary_results(self) -> impl Iterator<Item = DataTable> {
self.tables
.into_iter()
.filter(|t| matches!(t, ResultTable::DataTable(tbl) if tbl.table_kind == TableKind::PrimaryResult))
.map(|t| match t {
ResultTable::DataTable(tbl) => tbl,
_ => unreachable!("All other variants are excluded by filter"),
})
self.tables.into_iter().filter_map(|table| match table {
ResultTable::DataTable(table) if table.table_kind == TableKind::PrimaryResult => {
Some(table)
}
_ => None,
})
}
#[cfg(feature = "arrow")]
pub fn into_record_batches(self) -> impl Iterator<Item = RecordBatch> {
pub fn into_record_batches(self) -> impl Iterator<Item = crate::error::Result<RecordBatch>> {
self.into_primary_results().map(convert_table)
}
}

Просмотреть файл

@ -0,0 +1,160 @@
use azure_core::error::{ErrorKind, ResultExt};
use lazy_static::lazy_static;
use regex::{Captures, Regex};
use serde_with::{DeserializeFromStr, SerializeDisplay};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Deref;
use std::str::FromStr;
use time::{Duration, OffsetDateTime};
use crate::error::Error;
use time::format_description::well_known::Rfc3339;
#[derive(PartialEq, Copy, Clone, DeserializeFromStr, SerializeDisplay)]
pub struct KustoDateTime(pub OffsetDateTime);
impl FromStr for KustoDateTime {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(OffsetDateTime::parse(s, &Rfc3339)
.map(KustoDateTime)
.context(ErrorKind::DataConversion, "Failed to parse KustoDateTime")?)
}
}
impl Display for KustoDateTime {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
self.0.format(&Rfc3339).unwrap_or_else(|_| "".into())
)?;
Ok(())
}
}
impl Debug for KustoDateTime {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
impl From<OffsetDateTime> for KustoDateTime {
fn from(time: OffsetDateTime) -> Self {
KustoDateTime(time)
}
}
impl Deref for KustoDateTime {
type Target = OffsetDateTime;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(PartialEq, Copy, Clone, DeserializeFromStr, SerializeDisplay)]
pub struct KustoDuration(pub time::Duration);
impl From<time::Duration> for KustoDuration {
fn from(duration: Duration) -> Self {
KustoDuration(duration)
}
}
impl Deref for KustoDuration {
type Target = time::Duration;
fn deref(&self) -> &Self::Target {
&self.0
}
}
fn parse_regex_segment(captures: &Captures, name: &str) -> i64 {
captures
.name(name)
.map(|m| m.as_str().parse::<i64>().unwrap())
.unwrap_or(0)
}
impl FromStr for KustoDuration {
type Err = crate::error::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
lazy_static! {
static ref RE: Regex = Regex::new(r"^(?P<neg>\-)?((?P<days>\d+)\.)?(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)(\.(?P<nanos>\d+))?$").unwrap();
}
if let Some(captures) = RE.captures(s) {
let neg = match captures.name("neg") {
None => 1,
Some(_) => -1,
};
let days = parse_regex_segment(&captures, "days");
let hours = parse_regex_segment(&captures, "hours");
let minutes = parse_regex_segment(&captures, "minutes");
let seconds = parse_regex_segment(&captures, "seconds");
let nanos = parse_regex_segment(&captures, "nanos");
let duration = neg
* (time::Duration::days(days)
+ time::Duration::hours(hours)
+ time::Duration::minutes(minutes)
+ time::Duration::seconds(seconds)
+ time::Duration::nanoseconds(nanos * 100)); // Ticks
Ok(KustoDuration(duration))
} else {
Err(Error::InvalidArgumentError {
source: format!("{} is not a valid duration", s).into(),
})
}
}
}
impl Display for KustoDuration {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if self.is_negative() {
write!(f, "-")?;
}
if self.whole_days() > 0 {
write!(f, "{}.", self.whole_days())?;
}
write!(
f,
"{:02}:{:02}:{:02}",
self.whole_hours(),
self.whole_minutes(),
self.whole_seconds()
)?;
if self.whole_nanoseconds() > 0 {
write!(f, ".{:07}", self.whole_nanoseconds())?;
}
Ok(())
}
}
impl Debug for KustoDuration {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
#[test]
fn string_conversion() {
let refs: Vec<(&str, i64)> = vec![
("1.00:00:00.0000000", 86400000000000),
("01:00:00.0000000", 3600000000000),
("01:00:00", 3600000000000),
("00:05:00.0000000", 300000000000),
("00:00:00.0000001", 100),
("-01:00:00", -3600000000000),
("-1.00:00:00.0000000", -86400000000000),
];
for (from, to) in refs {
assert_eq!(
KustoDuration::from_str(from).unwrap().whole_nanoseconds(),
to as i128
);
}
}