Signed-off-by: AsafMah <asafmahlev@microsoft.com>
This commit is contained in:
AsafMah 2022-04-14 09:44:43 +03:00
Родитель c4d3a5c061
Коммит 537dcd242f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: AD0D1680EEE7A4FF
16 изменённых файлов: 1805 добавлений и 14 удалений

Просмотреть файл

@ -1,6 +1,40 @@
[package]
name = "azure-kusto-data"
version = "0.1.0"
description = "Rust wrappers around Microsoft Azure REST APIs - Azure Data Explorer"
readme = "README.md"
license = "MIT"
edition = "2021"
repository = "https://github.com/azure/azure-sdk-for-rust"
homepage = "https://github.com/azure/azure-sdk-for-rust"
documentation = "https://docs.rs/azure_kusto_data"
keywords = ["sdk", "azure", "kusto", "azure-data-explorer"]
categories = ["api-bindings"]
[dependencies]
arrow = { version = "9", optional = true }
azure_core = { git = "https://github.com/roeap/azure-sdk-for-rust", branch="kusto" , features = [
"enable_reqwest",
"enable_reqwest_gzip",
] }
azure_identity = { git = "https://github.com/roeap/azure-sdk-for-rust", branch="kusto" }
async-trait = "0.1"
async-convert = "1"
bytes = "1"
futures = "0.3"
http = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
thiserror = "1"
lazy_static = "1.4.0"
hashbrown = "0.12.0"
[dev-dependencies]
env_logger = "0.9"
tokio = { version = "1", features = ["macros"] }
[features]
default = ["arrow"]
mock_transport_framework = ["azure_core/mock_transport_framework"]
#into_future = [] TODO - properly turn it on
test_e2e = []

Просмотреть файл

@ -0,0 +1,8 @@
# Azure SDK for Rust - Azure Kusto crate
## The Kusto crate.
`azure-data-kusto` offers functionality needed to interact with Azure Data Explorer (Kusto) from Rust.
As an abstraction over the [Azure Data Explorer REST API](https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/rest/)
For usage have a look at the [examples](https://github.com/Azure/azure-sdk-for-rust/tree/main/sdk/data_kusto/examples).

Просмотреть файл

@ -0,0 +1,52 @@
use azure_kusto_data::prelude::*;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let service_url = std::env::args()
.nth(1)
.expect("please specify service url name as first command line parameter");
let database = std::env::args()
.nth(2)
.expect("please specify database name as second command line parameter");
let query = std::env::args()
.nth(3)
.expect("please specify query as third command line parameter");
let client_id =
std::env::var("AZURE_CLIENT_ID").expect("Set env variable AZURE_CLIENT_ID first!");
let client_secret =
std::env::var("AZURE_CLIENT_SECRET").expect("Set env variable AZURE_CLIENT_SECRET first!");
let authority_id =
std::env::var("AZURE_TENANT_ID").expect("Set env variable AZURE_TENANT_ID first!");
let kcsb = ConnectionStringBuilder::new_with_aad_application_key_authentication(
&service_url,
&authority_id,
&client_id,
&client_secret,
);
let client = KustoClient::try_from(kcsb).unwrap();
let response = client
.execute_query(database, query)
.into_future()
.await
.unwrap();
for table in &response.tables {
match table {
ResultTable::DataSetHeader(header) => println!("header: {:?}", header),
ResultTable::DataTable(table) => println!("table: {:?}", table),
ResultTable::DataSetCompletion(completion) => println!("completion: {:?}", completion),
}
}
let primary_results = response.into_primary_results().collect::<Vec<_>>();
println!("primary results: {:?}", primary_results);
Ok(())
}

Просмотреть файл

@ -0,0 +1,272 @@
use crate::operations::query::*;
use arrow::{
array::{
ArrayRef, BooleanArray, DurationNanosecondArray, Float64Array, Int32Array, Int64Array,
StringArray,
},
compute::cast,
datatypes::{DataType, Field, Schema, TimeUnit},
record_batch::RecordBatch,
};
use std::sync::Arc;
const SECOND_TO_NANOSECONDS: i64 = 1000000000;
const MINUTES_TO_SECONDS: i64 = 60;
const HOURS_TO_SECONDS: i64 = 60 * MINUTES_TO_SECONDS;
const DAYS_TO_SECONDS: i64 = 24 * HOURS_TO_SECONDS;
const TICK_TO_NANOSECONDS: i64 = 100;
#[inline]
fn to_nanoseconds(days: i64, hours: i64, minutes: i64, seconds: i64, ticks: i64) -> i64 {
let d_secs = days * DAYS_TO_SECONDS;
let h_secs = hours * HOURS_TO_SECONDS;
let m_secs = minutes * MINUTES_TO_SECONDS;
let total_secs = d_secs + h_secs + m_secs + seconds;
let rest_in_ns = ticks * TICK_TO_NANOSECONDS;
total_secs * SECOND_TO_NANOSECONDS + rest_in_ns
}
fn parse_segment(seg: &str) -> i64 {
let trimmed = seg.trim_start_matches('0');
if !trimmed.is_empty() {
trimmed.parse::<i64>().unwrap()
} else {
0
}
}
fn destructure_time(dur: &str) -> (i64, i64, i64) {
let parts = dur.split(':').collect::<Vec<_>>();
match parts.as_slice() {
[hours, minutes, seconds] => (
parse_segment(hours),
parse_segment(minutes),
parse_segment(seconds),
),
_ => (0, 0, 0),
}
}
/// The timespan format Kusto returns is 'd.hh:mm:ss.ssssss' or 'hh:mm:ss.ssssss' or 'hh:mm:ss'
/// Kusto also stores fractions in ticks: 1 tick = 100 ns
pub fn string_to_duration_i64(dur: Option<&str>) -> Option<i64> {
let dur = dur?;
let factor = if dur.starts_with('-') { -1 } else { 1 };
let parts: Vec<&str> = dur.trim_start_matches('-').split('.').collect();
let ns = match parts.as_slice() {
[days, hours, ticks] => {
let days_ = parse_segment(days);
let ticks_ = parse_segment(ticks);
let (hours, minutes, seconds) = destructure_time(hours);
to_nanoseconds(days_, hours, minutes, seconds, ticks_)
}
[first, ticks] => {
let ticks_ = parse_segment(ticks);
let (hours, minutes, seconds) = destructure_time(first);
to_nanoseconds(0, hours, minutes, seconds, ticks_)
}
[one] => {
let (hours, minutes, seconds) = destructure_time(one);
to_nanoseconds(0, hours, minutes, seconds, 0)
}
_ => 0,
};
Some(factor * ns)
}
fn convert_array_string(values: Vec<serde_json::Value>) -> ArrayRef {
let strings: Vec<Option<String>> =
serde_json::from_value(serde_json::Value::Array(values)).unwrap();
let strings: Vec<Option<&str>> = strings.iter().map(|opt| opt.as_deref()).collect();
Arc::new(StringArray::from(strings))
}
// TODO provide a safe variant for datetime conversions (chrono panics)
fn convert_array_datetime_unsafe(values: Vec<serde_json::Value>) -> ArrayRef {
let strings: Vec<Option<String>> =
serde_json::from_value(serde_json::Value::Array(values)).unwrap();
let strings: Vec<Option<&str>> = strings.iter().map(|opt| opt.as_deref()).collect();
let string_array: ArrayRef = Arc::new(StringArray::from(strings));
cast(
&string_array,
&DataType::Timestamp(TimeUnit::Nanosecond, None),
)
.unwrap()
}
fn safe_map_f64(value: serde_json::Value) -> Option<f64> {
match value {
serde_json::Value::String(val) if val == "NaN" => None,
serde_json::Value::String(val) if val == "Infinity" => Some(f64::INFINITY),
serde_json::Value::String(val) if val == "-Infinity" => Some(-f64::INFINITY),
_ => serde_json::from_value(value).unwrap(),
}
}
fn convert_array_float(values: Vec<serde_json::Value>) -> ArrayRef {
let reals: Vec<Option<f64>> = values.into_iter().map(safe_map_f64).collect();
Arc::new(Float64Array::from(reals))
}
fn convert_array_timespan(values: Vec<serde_json::Value>) -> ArrayRef {
let strings: Vec<Option<String>> =
serde_json::from_value(serde_json::Value::Array(values)).unwrap();
let durations: Vec<Option<i64>> = strings
.iter()
.map(|opt| opt.as_deref())
.map(string_to_duration_i64)
.collect();
Arc::new(DurationNanosecondArray::from(durations))
}
fn convert_array_bool(values: Vec<serde_json::Value>) -> ArrayRef {
let bools: Vec<Option<bool>> =
serde_json::from_value(serde_json::Value::Array(values)).unwrap();
Arc::new(BooleanArray::from(bools))
}
fn convert_array_i32(values: Vec<serde_json::Value>) -> ArrayRef {
let ints: Vec<Option<i32>> = serde_json::from_value(serde_json::Value::Array(values)).unwrap();
Arc::new(Int32Array::from(ints))
}
fn convert_array_i64(values: Vec<serde_json::Value>) -> ArrayRef {
let ints: Vec<Option<i64>> = serde_json::from_value(serde_json::Value::Array(values)).unwrap();
Arc::new(Int64Array::from(ints))
}
pub fn convert_column(data: Vec<serde_json::Value>, column: Column) -> (Field, ArrayRef) {
match column.column_type {
ColumnType::String => (
Field::new(column.column_name.as_str(), DataType::Utf8, true),
convert_array_string(data),
),
ColumnType::Bool | ColumnType::Boolean => (
Field::new(column.column_name.as_str(), DataType::Boolean, true),
convert_array_bool(data),
),
ColumnType::Int => (
Field::new(column.column_name.as_str(), DataType::Int32, true),
convert_array_i32(data),
),
ColumnType::Long => (
Field::new(column.column_name.as_str(), DataType::Int64, true),
convert_array_i64(data),
),
ColumnType::Real => (
Field::new(column.column_name.as_str(), DataType::Float64, true),
convert_array_float(data),
),
ColumnType::Datetime => (
Field::new(
column.column_name.as_str(),
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
convert_array_datetime_unsafe(data),
),
ColumnType::Timespan => (
Field::new(
column.column_name.as_str(),
DataType::Duration(TimeUnit::Nanosecond),
true,
),
convert_array_timespan(data),
),
_ => todo!(),
}
}
pub fn convert_table(table: DataTable) -> RecordBatch {
let mut buffer: Vec<Vec<serde_json::Value>> = Vec::with_capacity(table.columns.len());
let mut fields: Vec<Field> = Vec::with_capacity(table.columns.len());
let mut columns: Vec<ArrayRef> = Vec::with_capacity(table.columns.len());
for _ in 0..table.columns.len() {
buffer.push(Vec::with_capacity(table.rows.len()));
}
table.rows.into_iter().for_each(|row| {
row.into_iter()
.enumerate()
.for_each(|(idx, value)| buffer[idx].push(value))
});
buffer
.into_iter()
.zip(table.columns.into_iter())
.map(|(data, column)| convert_column(data, column))
.for_each(|(field, array)| {
fields.push(field);
columns.push(array);
});
RecordBatch::try_new(Arc::new(Schema::new(fields)), columns).unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deserialize_column() {
let data = r#" {
"ColumnName": "int_col",
"ColumnType": "int"
} "#;
let c: Column = serde_json::from_str(data).expect("deserialize error");
let ref_col = Column {
column_name: "int_col".to_string(),
column_type: ColumnType::Int,
};
assert_eq!(c, ref_col)
}
#[test]
fn deserialize_table() {
let data = r#" {
"FrameType": "DataTable",
"TableId": 1,
"TableName": "Deft",
"TableKind": "PrimaryResult",
"Columns": [
{
"ColumnName": "int_col",
"ColumnType": "int"
}
],
"Rows": []
} "#;
let t: DataTable = serde_json::from_str(data).expect("deserialize error");
let ref_tbl = DataTable {
table_id: 1,
table_name: "Deft".to_string(),
table_kind: TableKind::PrimaryResult,
columns: vec![Column {
column_name: "int_col".to_string(),
column_type: ColumnType::Int,
}],
rows: vec![],
};
assert_eq!(t, ref_tbl)
}
#[test]
fn string_conversion() {
let refs: Vec<(&str, i64)> = vec![
("1.00:00:00.0000000", 86400000000000),
("01:00:00.0000000", 3600000000000),
("01:00:00", 3600000000000),
("00:05:00.0000000", 300000000000),
("00:00:00.0000001", 100),
("-01:00:00", -3600000000000),
("-1.00:00:00.0000000", -86400000000000),
];
for (from, to) in refs {
assert_eq!(string_to_duration_i64(Some(from)), Some(to));
}
}
}

Просмотреть файл

@ -0,0 +1,55 @@
use azure_core::{auth::TokenCredential, Context, Policy, PolicyResult, Request};
use http::header::AUTHORIZATION;
use http::HeaderValue;
use std::sync::Arc;
#[derive(Clone)]
pub struct AuthorizationPolicy {
credential: Arc<dyn TokenCredential>,
resource: String,
}
impl std::fmt::Debug for AuthorizationPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("AuthorizationPolicy")
.field("credential", &"TokenCredential")
.field("resource", &self.resource)
.finish()
}
}
impl AuthorizationPolicy {
pub(crate) fn new<T>(credential: Arc<dyn TokenCredential>, resource: T) -> Self
where
T: Into<String>,
{
Self {
credential,
resource: resource.into(),
}
}
}
#[async_trait::async_trait]
impl Policy for AuthorizationPolicy {
async fn send(
&self,
ctx: &Context,
request: &mut Request,
next: &[Arc<dyn Policy>],
) -> PolicyResult {
assert!(
!next.is_empty(),
"Authorization policies cannot be the last policy of a pipeline"
);
let token = self.credential.get_token(&self.resource).await?;
let auth_header_value = format!("Bearer {}", token.token.secret().clone());
request
.headers_mut()
.insert(AUTHORIZATION, HeaderValue::from_str(&auth_header_value)?);
next[0].send(ctx, request, &next[1..]).await
}
}

Просмотреть файл

@ -0,0 +1,182 @@
use crate::authorization_policy::AuthorizationPolicy;
use crate::connection_string::{ConnectionString, ConnectionStringBuilder};
use crate::error::Result;
use crate::operations::query::ExecuteQueryBuilder;
use azure_core::auth::TokenCredential;
use azure_core::prelude::*;
use azure_core::{ClientOptions, Context, Pipeline, Request};
use azure_identity::token_credentials::{
AzureCliCredential, ClientSecretCredential, DefaultAzureCredential,
ImdsManagedIdentityCredential, TokenCredentialOptions,
};
use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;
const API_VERSION: &str = "2019-02-13";
/// Options for specifying how a Kusto client will behave
#[derive(Clone, Default)]
pub struct KustoClientOptions {
options: ClientOptions,
}
impl KustoClientOptions {
/// Create new options
pub fn new() -> Self {
Self::default()
}
#[cfg(feature = "mock_transport_framework")]
/// Create new options with a given transaction name
pub fn new_with_transaction_name<T: Into<String>>(name: T) -> Self {
Self {
options: ClientOptions::new_with_transaction_name(name.into()),
}
}
}
fn new_pipeline_from_options(
credential: Arc<dyn TokenCredential>,
resource: &str,
options: KustoClientOptions,
) -> Pipeline {
let auth_policy = Arc::new(AuthorizationPolicy::new(credential, resource));
// take care of adding the AuthorizationPolicy as **last** retry policy.
let per_retry_policies: Vec<Arc<(dyn azure_core::Policy + 'static)>> = vec![auth_policy];
Pipeline::new(
option_env!("CARGO_PKG_NAME"),
option_env!("CARGO_PKG_VERSION"),
options.options,
Vec::new(),
per_retry_policies,
)
}
/// Kusto client for Rust.
/// The client is a wrapper around the Kusto REST API.
/// To read more about it, go to https://docs.microsoft.com/en-us/azure/kusto/api/rest/
///
/// The primary methods are:
/// `execute_query`: executes a KQL query against the Kusto service.
#[derive(Clone, Debug)]
pub struct KustoClient {
pipeline: Pipeline,
query_url: String,
management_url: String,
}
impl KustoClient {
pub fn new_with_options<T>(
url: T,
credential: Arc<dyn TokenCredential>,
options: KustoClientOptions,
) -> Result<Self>
where
T: Into<String>,
{
let service_url: String = url.into();
let service_url = service_url.trim_end_matches('/');
let query_url = format!("{}/v2/rest/query", service_url);
let management_url = format!("{}/v1/rest/mgmt", service_url);
let pipeline = new_pipeline_from_options(credential, service_url, options);
Ok(Self {
pipeline,
query_url,
management_url,
})
}
pub(crate) fn query_url(&self) -> &str {
&self.query_url
}
pub fn management_url(&self) -> &str {
&self.management_url
}
/// Execute a KQL query.
/// To learn more about KQL go to https://docs.microsoft.com/en-us/azure/kusto/query/
///
/// # Arguments
///
/// * `database` - Name of the database in scope that is the target of the query
/// * `query` - Text of the query to execute
pub fn execute_query<DB, Q>(&self, database: DB, query: Q) -> ExecuteQueryBuilder
where
DB: Into<String>,
Q: Into<String>,
{
ExecuteQueryBuilder::new(self.clone(), database.into(), query.into(), Context::new())
}
pub(crate) fn prepare_request(&self, uri: &str, http_method: http::Method) -> Request {
let mut request = Request::new(uri.parse().unwrap(), http_method);
request.insert_headers(&Version::from(API_VERSION));
request.insert_headers(&Accept::from("application/json"));
request.insert_headers(&ContentType::new("application/json; charset=utf-8"));
request.insert_headers(&AcceptEncoding::from("gzip"));
request.insert_headers(&ClientVersion::from(format!(
"Kusto.Rust.Client:{}",
env!("CARGO_PKG_VERSION"),
)));
request
}
pub(crate) fn pipeline(&self) -> &Pipeline {
&self.pipeline
}
}
impl<'a> TryFrom<ConnectionString<'a>> for KustoClient {
type Error = crate::error::Error;
fn try_from(value: ConnectionString) -> Result<Self> {
let service_url = value
.data_source
.expect("A data source / service url must always be specified");
let credential: Arc<dyn TokenCredential> = match value {
ConnectionString {
application_client_id: Some(client_id),
application_key: Some(client_secret),
authority_id: Some(tenant_id),
..
} => Arc::new(ClientSecretCredential::new(
tenant_id.to_string(),
client_id.to_string(),
client_secret.to_string(),
TokenCredentialOptions::default(),
)),
ConnectionString {
msi_auth: Some(true),
..
} => Arc::new(ImdsManagedIdentityCredential {}),
ConnectionString {
az_cli: Some(true), ..
} => Arc::new(AzureCliCredential {}),
_ => Arc::new(DefaultAzureCredential::default()),
};
Self::new_with_options(service_url, credential, KustoClientOptions::new())
}
}
impl TryFrom<String> for KustoClient {
type Error = crate::error::Error;
fn try_from(value: String) -> Result<Self> {
let connection_string = ConnectionString::new(value.as_str())?;
Self::try_from(connection_string)
}
}
impl<'a> TryFrom<ConnectionStringBuilder<'a>> for KustoClient {
type Error = crate::error::Error;
fn try_from(value: ConnectionStringBuilder) -> Result<Self> {
let connection_string = value.build();
Self::try_from(connection_string)
}
}

Просмотреть файл

@ -0,0 +1,480 @@
// Set of properties that can be use in a connection string provided to KustoConnectionStringBuilder.
// For a complete list of properties go to https://docs.microsoft.com/en-us/azure/kusto/api/connection-strings/kusto
use hashbrown::HashMap;
use lazy_static::lazy_static;
enum ConnectionStringKey {
DataSource,
FederatedSecurity,
UserId,
Password,
ApplicationClientId,
ApplicationKey,
ApplicationCertificate,
ApplicationCertificateThumbprint,
AuthorityId,
ApplicationToken,
UserToken,
MsiAuth,
MsiParams,
AzCli,
}
impl ConnectionStringKey {
fn to_str(&self) -> &'static str {
match self {
ConnectionStringKey::DataSource => "Data Source",
ConnectionStringKey::FederatedSecurity => "AAD Federated Security",
ConnectionStringKey::UserId => "AAD User ID",
ConnectionStringKey::Password => "Password",
ConnectionStringKey::ApplicationClientId => "Application Client Id",
ConnectionStringKey::ApplicationKey => "Application Key",
ConnectionStringKey::ApplicationCertificate => "ApplicationCertificate",
ConnectionStringKey::ApplicationCertificateThumbprint => {
"Application Certificate Thumbprint"
}
ConnectionStringKey::AuthorityId => "Authority Id",
ConnectionStringKey::ApplicationToken => "ApplicationToken",
ConnectionStringKey::UserToken => "UserToken",
ConnectionStringKey::MsiAuth => "MSI Authentication",
ConnectionStringKey::MsiParams => "MSI Params",
ConnectionStringKey::AzCli => "AZ CLI",
}
}
}
lazy_static! {
static ref ALIAS_MAP: HashMap<&'static str, ConnectionStringKey> = {
let mut m = HashMap::new();
m.insert("data source", ConnectionStringKey::DataSource);
m.insert("addr", ConnectionStringKey::DataSource);
m.insert("address", ConnectionStringKey::DataSource);
m.insert("network address", ConnectionStringKey::DataSource);
m.insert("server", ConnectionStringKey::DataSource);
m.insert(
"aad federated security",
ConnectionStringKey::FederatedSecurity,
);
m.insert("federated security", ConnectionStringKey::FederatedSecurity);
m.insert("federated", ConnectionStringKey::FederatedSecurity);
m.insert("fed", ConnectionStringKey::FederatedSecurity);
m.insert("aadfed", ConnectionStringKey::FederatedSecurity);
m.insert("aad user id", ConnectionStringKey::UserId);
m.insert("user id", ConnectionStringKey::UserId);
m.insert("uid", ConnectionStringKey::UserId);
m.insert("user", ConnectionStringKey::UserId);
m.insert("password", ConnectionStringKey::Password);
m.insert("pwd", ConnectionStringKey::Password);
m.insert(
"application client id",
ConnectionStringKey::ApplicationClientId,
);
m.insert("appclientid", ConnectionStringKey::ApplicationClientId);
m.insert("application key", ConnectionStringKey::ApplicationKey);
m.insert("appkey", ConnectionStringKey::ApplicationKey);
m.insert(
"application certificate",
ConnectionStringKey::ApplicationCertificate,
);
m.insert(
"application certificate thumbprint",
ConnectionStringKey::ApplicationCertificateThumbprint,
);
m.insert(
"appcert",
ConnectionStringKey::ApplicationCertificateThumbprint,
);
m.insert("authority id", ConnectionStringKey::AuthorityId);
m.insert("authorityid", ConnectionStringKey::AuthorityId);
m.insert("authority", ConnectionStringKey::AuthorityId);
m.insert("tenantid", ConnectionStringKey::AuthorityId);
m.insert("tenant", ConnectionStringKey::AuthorityId);
m.insert("tid", ConnectionStringKey::AuthorityId);
m.insert("application token", ConnectionStringKey::ApplicationToken);
m.insert("apptoken", ConnectionStringKey::ApplicationToken);
m.insert("user token", ConnectionStringKey::UserToken);
m.insert("usertoken", ConnectionStringKey::UserToken);
m.insert("msi auth", ConnectionStringKey::MsiAuth);
m.insert("msi_auth", ConnectionStringKey::MsiAuth);
m.insert("msi", ConnectionStringKey::MsiAuth);
m.insert("msi params", ConnectionStringKey::MsiParams);
m.insert("msi_params", ConnectionStringKey::MsiParams);
m.insert("msi_type", ConnectionStringKey::MsiParams);
m.insert("az cli", ConnectionStringKey::AzCli);
m
};
}
// TODO: when available
// pub const PUBLIC_APPLICATION_CERTIFICATE_NAME: &str = "Public Application Certificate";
// pub const INTERACTIVE_LOGIN_NAME: &str = "Interactive Login";
// pub const LOGIN_HINT_NAME: &str = "Login Hint";
// pub const DOMAIN_HINT_NAME: &str = "Domain Hint";
/*
m.insert("application certificate private key", ConnectionStringKey::ApplicationCertificatePrivateKey);
m.insert("application certificate x5c", ConnectionStringKey::ApplicationCertificateX5C);
m.insert("application certificate send public certificate", ConnectionStringKey::ApplicationCertificateX5C);
m.insert("application certificate sendx5c", ConnectionStringKey::ApplicationCertificateX5C);
m.insert("sendx5c", ConnectionStringKey::ApplicationCertificateX5C);
ConnectionStringKey::ApplicationCertificatePrivateKey => "Application Certificate PrivateKey",
ConnectionStringKey::ApplicationCertificateX5C => "Application Certificate x5c",
*/
#[derive(Debug, thiserror::Error)]
pub enum ConnectionStringError {
#[error("Missing value for key '{}'", key)]
MissingValue { key: String },
#[error("Unexpected key '{}'", key)]
UnexpectedKey { key: String },
#[error("Parsing error: {}", msg)]
ParsingError { msg: String },
}
/// Build a connection string to connect to a Kusto service instance.
///
/// For more information on Kusto connection strings visit:
/// https://docs.microsoft.com/en-us/azure/data-explorer/kusto/api/connection-strings/kusto
#[derive(Default)]
pub struct ConnectionStringBuilder<'a>(ConnectionString<'a>);
impl<'a> ConnectionStringBuilder<'a> {
/// Creates a ConnectionStringBuilder with no configuration options set
pub fn new() -> Self {
Self(ConnectionString::default())
}
/// Creates a ConnectionStringBuilder that will authenticate with AAD application and key.
///
/// # Arguments
///
/// * `service_url` - Kusto service url should be of the format: https://<clusterName>.kusto.windows.net
/// * `authority_id` - Authority id (aka Tenant id) must be provided
/// * `client_id` - AAD application ID.
/// * `client_secret` - Corresponding key of the AAD application.
pub fn new_with_aad_application_key_authentication(
service_url: &'a str,
authority_id: &'a str,
client_id: &'a str,
client_secret: &'a str,
) -> Self {
Self(ConnectionString {
data_source: Some(service_url),
federated_security: Some(true),
application_client_id: Some(client_id),
application_key: Some(client_secret),
authority_id: Some(authority_id),
..ConnectionString::default()
})
}
pub fn build(&self) -> String {
let mut kv_pairs = Vec::new();
if let Some(data_source) = self.0.data_source {
kv_pairs.push(format!(
"{}={}",
ConnectionStringKey::DataSource.to_str(),
data_source
));
}
if let Some(user_id) = self.0.user_id {
kv_pairs.push(format!(
"{}={}",
ConnectionStringKey::UserId.to_str(),
user_id
));
}
if let Some(application_client_id) = self.0.application_client_id {
kv_pairs.push(format!(
"{}={}",
ConnectionStringKey::ApplicationClientId.to_str(),
application_client_id
));
}
if let Some(application_key) = self.0.application_key {
kv_pairs.push(format!(
"{}={}",
ConnectionStringKey::ApplicationKey.to_str(),
application_key
));
}
if let Some(application_token) = self.0.application_token {
kv_pairs.push(format!(
"{}={}",
ConnectionStringKey::ApplicationToken.to_str(),
application_token
));
}
if let Some(authority_id) = self.0.authority_id {
kv_pairs.push(format!(
"{}={}",
ConnectionStringKey::AuthorityId.to_str(),
authority_id
));
}
kv_pairs.join(";")
}
pub fn data_source(&'a mut self, data_source: &'a str) -> &'a mut Self {
self.0.data_source = Some(data_source);
self
}
pub fn user_id(&'a mut self, user_id: &'a str) -> &'a mut Self {
self.0.user_id = Some(user_id);
self
}
pub fn application_client_id(&'a mut self, application_client_id: &'a str) -> &'a mut Self {
self.0.application_client_id = Some(application_client_id);
self
}
pub fn application_token(&'a mut self, application_token: &'a str) -> &'a mut Self {
self.0.application_token = Some(application_token);
self
}
pub fn application_key(&'a mut self, application_key: &'a str) -> &'a mut Self {
self.0.application_key = Some(application_key);
self
}
pub fn authority_id(&'a mut self, authority_id: &'a str) -> &'a mut Self {
self.0.authority_id = Some(authority_id);
self
}
}
/// A Kusto service connection string.
///
/// For more information on Kusto connection strings visit:
/// https://docs.microsoft.com/en-us/azure/kusto/api/connection-strings/kusto
#[derive(Debug, Default)]
pub struct ConnectionString<'a> {
/// The URI specifying the Kusto service endpoint.
/// For example, https://mycluster.kusto.windows.net or net.tcp://localhost
pub data_source: Option<&'a str>,
/// A Boolean value that instructs the client to perform Azure Active Directory login.
pub federated_security: Option<bool>,
/// A String value that instructs the client to perform user authentication with the indicated user name.
pub user_id: Option<&'a str>,
pub user_token: Option<&'a str>,
/// ...
pub password: Option<&'a str>,
/// A String value that provides the application client ID to use when authenticating.
pub application_client_id: Option<&'a str>,
/// A String value that provides the application key to use when authenticating using an application secret flow.
pub application_key: Option<&'a str>,
/// A String value that instructs the client to perform application authenticating with the specified bearer token.
pub application_token: Option<&'a str>,
/// ...
pub application_certificate: Option<&'a str>,
/// A String value that provides the thumbprint of the client
/// certificate to use when using an application client certificate authenticating flow.
pub application_certificate_thumbprint: Option<&'a str>,
/// A String value that provides the name or ID of the tenant in which the application is registered.
pub authority_id: Option<&'a str>,
/// Denotes if MSI authorization should be used
pub msi_auth: Option<bool>,
pub msi_params: Option<&'a str>,
pub az_cli: Option<bool>,
}
impl<'a> PartialEq for ConnectionString<'a> {
fn eq(&self, other: &Self) -> bool {
self.data_source == other.data_source
&& self.federated_security == other.federated_security
&& self.user_id == other.user_id
&& self.user_token == other.user_token
&& self.password == other.password
&& self.application_client_id == other.application_client_id
&& self.application_key == other.application_key
&& self.application_token == other.application_token
&& self.application_certificate == other.application_certificate
&& self.application_certificate_thumbprint == other.application_certificate_thumbprint
&& self.authority_id == other.authority_id
&& self.msi_auth == other.msi_auth
&& self.msi_params == other.msi_params
&& self.az_cli == other.az_cli
}
}
impl<'a> ConnectionString<'a> {
pub fn new(connection_string: &'a str) -> Result<Self, ConnectionStringError> {
let mut data_source = None;
let mut federated_security = None;
let mut user_id = None;
let mut user_token = None;
let mut password = None;
let mut application_client_id = None;
let mut application_token = None;
let mut application_key = None;
let mut application_certificate = None;
let mut application_certificate_thumbprint = None;
let mut authority_id = None;
let mut msi_auth = None;
let mut msi_params = None;
let mut az_cli = None;
let kv_str_pairs = connection_string
.split(';')
.filter(|s| !s.chars().all(char::is_whitespace));
for kv_pair_str in kv_str_pairs {
let mut kv = kv_pair_str.trim().split('=');
let k = match kv.next().filter(|k| !k.chars().all(char::is_whitespace)) {
None => {
return Err(ConnectionStringError::ParsingError {
msg: "No key found".to_owned(),
});
}
Some(k) => k,
};
let v = match kv.next().filter(|k| !k.chars().all(char::is_whitespace)) {
None => return Err(ConnectionStringError::MissingValue { key: k.to_owned() }),
Some(v) => v,
};
if let Some(key) = ALIAS_MAP.get(&*k.to_ascii_lowercase()) {
match key {
ConnectionStringKey::DataSource => data_source = Some(v),
e @ ConnectionStringKey::FederatedSecurity => {
federated_security = Some(parse_boolean(v, e.to_str())?)
}
ConnectionStringKey::UserId => user_id = Some(v),
ConnectionStringKey::UserToken => user_token = Some(v),
ConnectionStringKey::Password => password = Some(v),
ConnectionStringKey::ApplicationClientId => application_client_id = Some(v),
ConnectionStringKey::ApplicationToken => application_token = Some(v),
ConnectionStringKey::ApplicationKey => application_key = Some(v),
ConnectionStringKey::ApplicationCertificate => {
application_certificate = Some(v)
}
ConnectionStringKey::ApplicationCertificateThumbprint => {
application_certificate_thumbprint = Some(v)
}
ConnectionStringKey::AuthorityId => authority_id = Some(v),
e @ ConnectionStringKey::MsiAuth => {
msi_auth = Some(parse_boolean(v, e.to_str())?)
}
ConnectionStringKey::MsiParams => msi_params = Some(v),
e @ ConnectionStringKey::AzCli => az_cli = Some(parse_boolean(v, e.to_str())?),
}
} else {
return Err(ConnectionStringError::UnexpectedKey { key: k.to_owned() });
}
}
Ok(Self {
data_source,
federated_security,
user_id,
user_token,
password,
application_client_id,
application_key,
application_token,
application_certificate,
application_certificate_thumbprint,
authority_id,
msi_auth,
msi_params,
az_cli,
})
}
}
fn parse_boolean(term: &str, name: &str) -> Result<bool, ConnectionStringError> {
match term.to_lowercase().as_str() {
"true" => Ok(true),
"false" => Ok(false),
_ => Err(ConnectionStringError::ParsingError {
msg: format!(
"Unexpected value for {}: {}. Please specify either 'true' or 'false'.",
name, term
),
}),
}
}
#[cfg(test)]
mod tests {
#[allow(unused_imports)]
use super::*;
#[test]
fn it_parses_empty_connection_string() {
assert_eq!(
ConnectionString::new("").unwrap(),
ConnectionString::default()
);
}
#[test]
fn it_returns_expected_errors() {
assert!(matches!(
ConnectionString::new("Data Source="),
Err(ConnectionStringError::MissingValue { key }) if key == "Data Source"
));
assert!(matches!(
ConnectionString::new("="),
Err(ConnectionStringError::ParsingError { msg: _ })
));
assert!(matches!(
ConnectionString::new("x=123;"),
Err(ConnectionStringError::UnexpectedKey { key }) if key == "x"
));
}
#[test]
fn it_parses_basic_cases() {
assert!(matches!(
ConnectionString::new("Data Source=ds"),
Ok(ConnectionString {
data_source: Some("ds"),
..
})
));
assert!(matches!(
ConnectionString::new("addr=ds"),
Ok(ConnectionString {
data_source: Some("ds"),
..
})
));
assert!(matches!(
ConnectionString::new("Application Client Id=cid;Application Key=key"),
Ok(ConnectionString {
application_client_id: Some("cid"),
application_key: Some("key"),
..
})
));
assert!(matches!(
ConnectionString::new("Federated=True;AppToken=token"),
Ok(ConnectionString {
federated_security: Some(true),
application_token: Some("token"),
..
})
));
}
}

Просмотреть файл

@ -0,0 +1,47 @@
//! Defines `KustoRsError` for representing failures in various operations.
use std::fmt::Debug;
use thiserror;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Error converting Kusto response for {0}")]
ConversionError(String),
/// Error in external crate
#[error("Error in external crate {0}")]
ExternalError(String),
/// Error raised when an invalid argument / option is provided.
#[error("Type conversion not available")]
InvalidArgumentError(String),
/// Error raised when specific functionality is not (yet) implemented
#[error("Feature not implemented")]
NotImplemented(String),
/// Error relating to (de-)serialization of JSON data
#[error(transparent)]
JsonError(#[from] serde_json::Error),
/// Error occurring within core azure crates
#[error(transparent)]
AzureError(#[from] azure_core::Error),
/// Errors raised when parsing connection information
#[error("Configuration error: {0}")]
ConfigurationError(#[from] crate::connection_string::ConnectionStringError),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<azure_core::error::Error> for Error {
fn from(err: azure_core::error::Error) -> Self {
Self::AzureError(err.into())
}
}
impl From<azure_core::StreamError> for Error {
fn from(error: azure_core::StreamError) -> Self {
Self::AzureError(azure_core::Error::Stream(error))
}
}

Просмотреть файл

@ -1,14 +1,8 @@
fn nothing() -> u32 {
return 42;
}
#[cfg(test)]
mod tests {
use crate::nothing;
#[test]
fn it_works() {
let result = 2 + nothing();
assert_eq!(result, 44);
}
}
#[cfg(feature = "arrow")]
mod arrow;
pub mod authorization_policy;
pub mod client;
pub mod connection_string;
pub mod error;
mod operations;
pub mod prelude;

Просмотреть файл

@ -0,0 +1 @@
pub mod query;

Просмотреть файл

@ -0,0 +1,216 @@
#[cfg(feature = "arrow")]
use crate::arrow::convert_table;
use crate::client::KustoClient;
#[cfg(feature = "arrow")]
use arrow::record_batch::RecordBatch;
use async_convert::TryFrom;
use azure_core::prelude::*;
use azure_core::setters;
use azure_core::{collect_pinned_stream, Response as HttpResponse};
use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
type ExecuteQuery = BoxFuture<'static, crate::error::Result<KustoResponseDataSetV2>>;
#[derive(Debug, Serialize, Deserialize)]
struct QueryBody {
/// Name of the database in scope that is the target of the query or control command
db: String,
/// Text of the query or control command to execute
csl: String,
}
#[derive(Debug, Clone)]
pub struct ExecuteQueryBuilder {
client: KustoClient,
database: String,
query: String,
client_request_id: Option<ClientRequestId>,
app: Option<App>,
user: Option<User>,
context: Context,
}
impl ExecuteQueryBuilder {
pub(crate) fn new(
client: KustoClient,
database: String,
query: String,
context: Context,
) -> Self {
Self {
client,
database,
query: query.trim().into(),
client_request_id: None,
app: None,
user: None,
context,
}
}
setters! {
client_request_id: ClientRequestId => Some(client_request_id),
app: App => Some(app),
user: User => Some(user),
query: String => query,
database: String => database,
context: Context => context,
}
pub fn into_future(self) -> ExecuteQuery {
let this = self.clone();
let ctx = self.context.clone();
Box::pin(async move {
let url = this.client.query_url();
let mut request = this.client.prepare_request(url, http::Method::POST);
if let Some(request_id) = &this.client_request_id {
request.insert_headers(request_id);
};
if let Some(app) = &this.app {
request.insert_headers(app);
};
if let Some(user) = &this.user {
request.insert_headers(user);
};
let body = QueryBody {
db: this.database,
csl: this.query,
};
let bytes = bytes::Bytes::from(serde_json::to_string(&body)?);
request.insert_headers(&ContentLength::new(bytes.len() as i32));
request.set_body(bytes.into());
let response = self
.client
.pipeline()
.send(&mut ctx.clone(), &mut request)
.await?;
<KustoResponseDataSetV2 as TryFrom<HttpResponse>>::try_from(response).await
})
}
}
// TODO enable once in stable
// #[cfg(feature = "into_future")]
// impl std::future::IntoFuture for ExecuteQueryBuilder {
// type IntoFuture = ExecuteQuery;
// type Output = <ExecuteQuery as std::future::Future>::Output;
// fn into_future(self) -> Self::IntoFuture {
// Self::into_future(self)
// }
// }
#[derive(Debug, Clone)]
pub struct KustoResponseDataSetV2 {
pub tables: Vec<ResultTable>,
}
#[async_convert::async_trait]
impl async_convert::TryFrom<HttpResponse> for KustoResponseDataSetV2 {
type Error = crate::error::Error;
async fn try_from(response: HttpResponse) -> Result<Self, crate::error::Error> {
let (_status_code, _header_map, pinned_stream) = response.deconstruct();
let data = collect_pinned_stream(pinned_stream).await?;
let tables: Vec<ResultTable> = serde_json::from_slice(&data.to_vec())?;
Ok(Self { tables })
}
}
impl KustoResponseDataSetV2 {
pub fn table_count(&self) -> usize {
self.tables.len()
}
/// Consumes the response into an iterator over all PrimaryResult tables within the response dataset
pub fn into_primary_results(self) -> impl Iterator<Item = DataTable> {
self.tables
.into_iter()
.filter(|t| matches!(t, ResultTable::DataTable(tbl) if tbl.table_kind == TableKind::PrimaryResult))
.map(|t| match t {
ResultTable::DataTable(tbl) => tbl,
_ => unreachable!("All other variants are excluded by filter"),
})
}
#[cfg(feature = "arrow")]
pub fn into_record_batches(self) -> impl Iterator<Item = RecordBatch> {
self.into_primary_results().map(convert_table)
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename_all = "PascalCase", tag = "FrameType")]
#[allow(clippy::enum_variant_names)]
pub enum ResultTable {
DataSetHeader(DataSetHeader),
DataTable(DataTable),
DataSetCompletion(DataSetCompletion),
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct DataSetHeader {
pub is_progressive: bool,
pub version: String,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct DataTable {
pub table_id: i32,
pub table_name: String,
pub table_kind: TableKind,
pub columns: Vec<Column>,
pub rows: Vec<Vec<serde_json::Value>>,
}
/// Categorizes data tables according to the role they play in the data set that a Kusto query returns.
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub enum TableKind {
PrimaryResult,
QueryCompletionInformation,
QueryTraceLog,
QueryPerfLog,
TableOfContents,
QueryProperties,
QueryPlan,
Unknown,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct Column {
pub column_name: String,
pub column_type: ColumnType,
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum ColumnType {
Bool,
Boolean,
Datetime,
Date,
Dynamic,
Guid,
Int,
Long,
Real,
String,
Timespan,
Time,
Decimal,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(rename_all = "PascalCase")]
pub struct DataSetCompletion {
pub has_errors: bool,
pub cancelled: bool,
}

Просмотреть файл

@ -0,0 +1,16 @@
//! The kusto prelude.
//!
//! The prelude re-exports most commonly used items from this crate.
//!
//! # Examples
//!
//! Import the prelude with:
//!
//! ```
//! # #[allow(unused_imports)]
//! use azure_kusto_data::prelude::*;
//! ```
pub use crate::client::{KustoClient, KustoClientOptions};
pub use crate::connection_string::ConnectionStringBuilder;
pub use crate::operations::query::{KustoResponseDataSetV2, ResultTable};

Просмотреть файл

@ -0,0 +1,16 @@
#![cfg(feature = "arrow")]
use azure_kusto_data::prelude::{KustoResponseDataSetV2, ResultTable};
use std::path::PathBuf;
#[test]
fn asd() {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests/inputs/dataframe.json");
let data = std::fs::read_to_string(path).unwrap();
let tables: Vec<ResultTable> = serde_json::from_str(&data).unwrap();
let response = KustoResponseDataSetV2 { tables };
let record_batches = response.into_record_batches().collect::<Vec<_>>();
println!("{:?}", record_batches)
}

Просмотреть файл

@ -0,0 +1,194 @@
{
"Tables": [
{
"TableName": "Table_0",
"Columns": [
{
"ColumnName": "DatabaseName",
"DataType": "String"
},
{
"ColumnName": "TableName",
"DataType": "String"
}
],
"Rows": [
["Kuskus", "KustoLogs"],
["Kuskus", "LiorTmp"]
]
},
{
"TableName": "Table_1",
"Columns": [
{
"ColumnName": "Value",
"DataType": "String"
}
],
"Rows": [
[
"{\"Visualization\": null,\"Title\": null,\"XColumn\": null,\"Series\": null,\"YColumns\": null,\"XTitle\": null,\"YTitle\": null,\"XAxis\": null,\"YAxis\": null,\"Legend\": null,\"YSplit\": null,\"Accumulate\": false,\"IsQuerySorted\": false,\"Kind\": null}"
]
]
},
{
"TableName": "Table_2",
"Columns": [
{
"ColumnName": "Timestamp",
"DataType": "DateTime"
},
{
"ColumnName": "Severity",
"DataType": "Int32"
},
{
"ColumnName": "SeverityName",
"DataType": "String"
},
{
"ColumnName": "StatusCode",
"DataType": "Int32"
},
{
"ColumnName": "StatusDescription",
"DataType": "String"
},
{
"ColumnName": "Count",
"DataType": "Int32"
},
{
"ColumnName": "RequestId",
"DataType": "Guid"
},
{
"ColumnName": "ActivityId",
"DataType": "Guid"
},
{
"ColumnName": "SubActivityId",
"DataType": "Guid"
},
{
"ColumnName": "ClientActivityId",
"DataType": "String"
}
],
"Rows": [
[
"2018-08-12T09:13:19.5200972Z",
4,
"Info",
0,
"Querycompletedsuccessfully",
1,
"b6651693-9325-41c8-a5bf-dcc21202cdf2",
"b6651693-9325-41c8-a5bf-dcc21202cdf2",
"1cfa59c2-7f29-4e58-aef8-590d4609818f",
"KPC.execute;721add30-f61a-44d0-ad89-812d06736260"
],
[
"2018-08-12T09:13:19.5200972Z",
6,
"Stats",
0,
{
"ExecutionTime": 0.0,
"resource_usage": {
"cache": {
"memory": {
"hits": 0,
"misses": 0,
"total": 0
},
"disk": {
"hits": 0,
"misses": 0,
"total": 0
}
},
"cpu": {
"user": "00:00:00",
"kernel": "00:00:00",
"total cpu": "00:00:00"
},
"memory": {
"peak_per_node": 0
}
},
"input_dataset_statistics": {
"extents": {
"total": 0,
"scanned": 0
},
"rows": {
"total": 0,
"scanned": 0
}
},
"dataset_statistics": [
{
"table_row_count": 2,
"table_size": 46
}
]
},
1,
"b6651693-9325-41c8-a5bf-dcc21202cdf2",
"b6651693-9325-41c8-a5bf-dcc21202cdf2",
"1cfa59c2-7f29-4e58-aef8-590d4609818f",
"KPC.execute;721add30-f61a-44d0-ad89-812d06736260"
]
]
},
{
"TableName": "Table_3",
"Columns": [
{
"ColumnName": "Ordinal",
"DataType": "Int64"
},
{
"ColumnName": "Kind",
"DataType": "String"
},
{
"ColumnName": "Name",
"DataType": "String"
},
{
"ColumnName": "Id",
"DataType": "String"
},
{
"ColumnName": "PrettyName",
"DataType": "String"
}
],
"Rows": [
[
0,
"QueryResult",
"PrimaryResult",
"d6331ef2-d9f7-4d8c-8268-99f574babc82",
""
],
[
1,
"QueryProperties",
"@ExtendedProperties",
"876ccb1a-818e-431f-9147-6b72547cca3d",
""
],
[
2,
"QueryStatus",
"QueryStatus",
"00000000-0000-0000-0000-000000000000",
""
]
]
}
]
}

Просмотреть файл

@ -0,0 +1,216 @@
[
{
"FrameType": "DataSetHeader",
"IsProgressive": false,
"Version": "v2.0"
},
{
"FrameType": "DataTable",
"TableId": 0,
"TableName": "@ExtendedProperties",
"TableKind": "QueryProperties",
"Columns": [
{
"ColumnName": "TableId",
"ColumnType": "int"
},
{
"ColumnName": "Key",
"ColumnType": "string"
},
{
"ColumnName": "Value",
"ColumnType": "dynamic"
}
],
"Rows": [
[
1,
"Visualization",
"{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null}"
]
]
},
{
"FrameType": "DataTable",
"TableId": 1,
"TableName": "temp",
"TableKind": "PrimaryResult",
"Columns": [
{
"ColumnName": "RecordName",
"ColumnType": "string"
},
{
"ColumnName": "RecordTime",
"ColumnType": "datetime"
},
{
"ColumnName": "RecordOffset",
"ColumnType": "timespan"
},
{
"ColumnName": "RecordBool",
"ColumnType": "bool"
},
{
"ColumnName": "RecordInt",
"ColumnType": "int"
},
{
"ColumnName": "RecordReal",
"ColumnType": "real"
}
],
"Rows": [
[
"now",
"2021-12-22T11:43:00Z",
"1.01:01:01.0",
true,
5678,
3.14159
],
[
"earliest datetime",
"1677-09-21T00:12:44Z",
"1.01:01:01.0",
true,
5678,
"NaN"
],
[
"latest datetime",
"2021-12-31T23:59:59Z",
"1.01:01:01.0",
true,
5678,
"Infinity"
],
[
"earliest arrow datetime",
"1677-09-21T00:12:44Z",
"1.01:01:01.0",
true,
5678,
"-Infinity"
],
[
"latest pandas datetime",
"2262-04-11T23:47:16Z",
"1.01:01:01.0",
true,
5678,
3.14159
],
[
"timedelta ticks",
"2021-12-22T11:43:00Z",
"1.01:01:01.0",
true,
5678,
3.14159
],
[
"timedelta string",
"2021-12-22T11:43:00Z",
"1.01:01:01.0",
true,
5678,
3.14159
],
[null, "", "", false, 0, 0]
]
},
{
"FrameType": "DataTable",
"TableId": 2,
"TableName": "QueryCompletionInformation",
"TableKind": "QueryCompletionInformation",
"Columns": [
{
"ColumnName": "Timestamp",
"ColumnType": "datetime"
},
{
"ColumnName": "ClientRequestId",
"ColumnType": "string"
},
{
"ColumnName": "ActivityId",
"ColumnType": "guid"
},
{
"ColumnName": "SubActivityId",
"ColumnType": "guid"
},
{
"ColumnName": "ParentActivityId",
"ColumnType": "guid"
},
{
"ColumnName": "Level",
"ColumnType": "int"
},
{
"ColumnName": "LevelName",
"ColumnType": "string"
},
{
"ColumnName": "StatusCode",
"ColumnType": "int"
},
{
"ColumnName": "StatusCodeName",
"ColumnType": "string"
},
{
"ColumnName": "EventType",
"ColumnType": "int"
},
{
"ColumnName": "EventTypeName",
"ColumnType": "string"
},
{
"ColumnName": "Payload",
"ColumnType": "string"
}
],
"Rows": [
[
"2018-05-01T09:32:38.916566Z",
"unspecified;e8e72755-786b-4bdc-835d-ea49d63d09fd",
"5935a050-e466-48a0-991d-0ec26bd61c7e",
"8182b177-7a80-4158-aca8-ff4fd8e7d3f8",
"6f3c1072-2739-461c-8aa7-3cfc8ff528a8",
4,
"Info",
0,
"S_OK (0)",
4,
"QueryInfo",
"{\"Count\":1,\"Text\":\"Querycompletedsuccessfully\"}"
],
[
"2018-05-01T09:32:38.916566Z",
"unspecified;e8e72755-786b-4bdc-835d-ea49d63d09fd",
"5935a050-e466-48a0-991d-0ec26bd61c7e",
"8182b177-7a80-4158-aca8-ff4fd8e7d3f8",
"6f3c1072-2739-461c-8aa7-3cfc8ff528a8",
6,
"Stats",
0,
"S_OK (0)",
0,
"QueryResourceConsumption",
"{\"ExecutionTime\":0.0156222,\"resource_usage\":{\"cache\":{\"memory\":{\"hits\":13,\"misses\":0,\"total\":13},\"disk\":{\"hits\":0,\"misses\":0,\"total\":0}},\"cpu\":{\"user\":\"00: 00: 00\",\"kernel\":\"00: 00: 00\",\"totalcpu\":\"00: 00: 00\"},\"memory\":{\"peak_per_node\":16777312}},\"dataset_statistics\":[{\"table_row_count\":3,\"table_size\":191}]}"
]
]
},
{
"FrameType": "DataSetCompletion",
"HasErrors": false,
"Cancelled": false
}
]

Просмотреть файл

@ -0,0 +1,8 @@
coverage:
status:
project:
default:
informational: true
patch:
default:
informational: true