diff --git a/Cargo.toml b/Cargo.toml index 739a289..d5a5705 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = ["azure-kusto-data"] +members = ["azure-kusto-data", "azure-kusto-ingest"] diff --git a/azure-kusto-ingest/Cargo.toml b/azure-kusto-ingest/Cargo.toml new file mode 100644 index 0000000..c9b178a --- /dev/null +++ b/azure-kusto-ingest/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "azure-kusto-ingest" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +uuid = {version = "1", features = ["v4", "serde"]} +serde = { version = "1", features = ["serde_derive"] } +serde_json = "1" +serde_repr = "0.1" +url = "2" +azure-kusto-data = {path = "../azure-kusto-data"} +anyhow = "1.0.72" +tokio = { version = "1", features = ["full"] } + +azure_core = "0.13" +azure_storage = "0.13" +azure_storage_blobs = "0.13" +azure_storage_queues = "0.13" +time = { version = "0.3", features = ["serde"] } +azure_identity = "0.13.0" diff --git a/azure-kusto-ingest/src/data_format.rs b/azure-kusto-ingest/src/data_format.rs new file mode 100644 index 0000000..0a985e1 --- /dev/null +++ b/azure-kusto-ingest/src/data_format.rs @@ -0,0 +1,108 @@ +use serde::Serialize; + +#[derive(Serialize, Clone, Debug)] +pub enum IngestionMappingKind { + #[serde(rename = "Csv")] + CSV, + #[serde(rename = "Json")] + JSON, + Avro, + ApacheAvro, + Parquet, + SStream, + #[serde(rename = "Orc")] + ORC, + #[serde(rename = "W3CLogFile")] + W3CLOGFILE, + Unknown, +} + +/// All data formats supported by Kusto +#[derive(Serialize, Clone, Debug)] +pub enum DataFormat { + #[serde(rename = "apacheavro")] + ApacheAvro, + #[serde(rename = "avro")] + Avro, + #[serde(rename = "csv")] + CSV, + #[serde(rename = "json")] + JSON, + #[serde(rename = "multijson")] + MultiJSON, + #[serde(rename = "orc")] + ORC, + #[serde(rename = "parquet")] + Parquet, + #[serde(rename = "psv")] + PSV, + #[serde(rename = "raw")] + RAW, + #[serde(rename = "scsv")] + SCSV, + #[serde(rename = "sohsv")] + SOHsv, + #[serde(rename = "singlejson")] + SingleJSON, + #[serde(rename = "sstream")] + SStream, + #[serde(rename = "tsv")] + TSV, + #[serde(rename = "tsve")] + TSVe, + #[serde(rename = "txt")] + TXT, + #[serde(rename = "w3clogfile")] + W3CLOGFILE, +} + +impl DataFormat { + pub fn default() -> Self { + DataFormat::CSV + } + + pub fn ingestion_mapping_kind(self) -> IngestionMappingKind { + match self { + DataFormat::CSV => IngestionMappingKind::CSV, + DataFormat::TSV => IngestionMappingKind::CSV, + DataFormat::SCSV => IngestionMappingKind::CSV, + DataFormat::SOHsv => IngestionMappingKind::CSV, + DataFormat::PSV => IngestionMappingKind::CSV, + DataFormat::TXT => IngestionMappingKind::CSV, + DataFormat::TSVe => IngestionMappingKind::CSV, + DataFormat::JSON => IngestionMappingKind::JSON, + DataFormat::SingleJSON => IngestionMappingKind::JSON, + DataFormat::MultiJSON => IngestionMappingKind::JSON, + DataFormat::Avro => IngestionMappingKind::Avro, + DataFormat::ApacheAvro => IngestionMappingKind::ApacheAvro, + DataFormat::Parquet => IngestionMappingKind::Parquet, + DataFormat::SStream => IngestionMappingKind::SStream, + DataFormat::ORC => IngestionMappingKind::ORC, + DataFormat::RAW => IngestionMappingKind::CSV, + DataFormat::W3CLOGFILE => IngestionMappingKind::W3CLOGFILE, + } + } + + /// Binary formats should not be compressed + pub fn compressible(self) -> bool { + match self { + DataFormat::CSV => true, + DataFormat::TSV => true, + DataFormat::SCSV => true, + DataFormat::SOHsv => true, + DataFormat::PSV => true, + DataFormat::TXT => true, + DataFormat::TSVe => true, + DataFormat::JSON => true, + DataFormat::SingleJSON => true, + DataFormat::MultiJSON => true, + DataFormat::Avro => true, + DataFormat::ApacheAvro => true, + DataFormat::Parquet => false, + DataFormat::SStream => false, + DataFormat::ORC => false, + DataFormat::RAW => true, + DataFormat::W3CLOGFILE => true, + } + } +} diff --git a/azure-kusto-ingest/src/descriptors.rs b/azure-kusto-ingest/src/descriptors.rs new file mode 100644 index 0000000..bd4a7bf --- /dev/null +++ b/azure-kusto-ingest/src/descriptors.rs @@ -0,0 +1,98 @@ +use std::{io::Read, path::PathBuf, fmt::format}; + +use azure_storage::StorageCredentials; +use url::Url; +use uuid::Uuid; + +#[derive(Clone, Debug)] +pub enum BlobAuth { + SASToken(), + // adds `;managed_identity=` to the blob path + UserAssignedManagedIdentity(String), + // adds `;managed_identity=system` to the blob path + SystemAssignedManagedIdentity +} + +#[derive(Clone, Debug)] +pub struct BlobDescriptor { + uri: Url, + pub(crate) size: Option, + pub(crate) source_id: Uuid, + blob_auth: Option +} + +impl BlobDescriptor { + pub fn new(uri: Url, size: Option, source_id: Option) -> Self { + let source_id = match source_id { + Some(source_id) => source_id, + None => Uuid::new_v4(), + }; + + Self { + uri, + size, + source_id, + blob_auth: None, + } + } + + pub fn with_blob_auth(mut self, blob_auth: BlobAuth) -> Self { + self.blob_auth = Some(blob_auth); + self + } + + pub fn uri(&self) -> String { + match &self.blob_auth { + Some(BlobAuth::SASToken()) => { + let mut uri = self.uri.clone(); + uri.set_query(Some("sas_token")); + uri.to_string() + }, + Some(BlobAuth::UserAssignedManagedIdentity(identity)) => { + format!("{};managed_identity={}", self.uri, identity) + }, + Some(BlobAuth::SystemAssignedManagedIdentity) => { + format!("{};managed_identity=system", self.uri) + }, + None => self.uri.to_string(), + } + } +} + +#[derive(Clone, Debug)] +pub struct FileDescriptor { + pub path: PathBuf, + pub size: Option, + pub source_id: Uuid, +} + +impl FileDescriptor { + pub fn new(path: PathBuf, size: Option, source_id: Option) -> Self { + unimplemented!() + } +} + +// #[derive(Clone, Debug)] +pub struct StreamDescriptor { + stream: Box, + size: Option, + source_id: Uuid, + compressed: bool, + stream_name: String, +} + +impl StreamDescriptor { + pub fn new( + stream: Box, + size: Option, + source_id: Option, + compressed: bool, + stream_name: String, + ) -> Self { + unimplemented!() + } + + pub fn from_file_descriptor(file_descriptor: FileDescriptor) -> Self { + unimplemented!() + } +} diff --git a/azure-kusto-ingest/src/errors.rs b/azure-kusto-ingest/src/errors.rs new file mode 100644 index 0000000..f833f07 --- /dev/null +++ b/azure-kusto-ingest/src/errors.rs @@ -0,0 +1,6 @@ +pub enum KustoClientError { + KustoMappingError, + KustoDuplicateMappingError, + KustoMissingMappingError, + KustoInvalidEndpointError, +} \ No newline at end of file diff --git a/azure-kusto-ingest/src/ingestion_blob_info.rs b/azure-kusto-ingest/src/ingestion_blob_info.rs new file mode 100644 index 0000000..806e662 --- /dev/null +++ b/azure-kusto-ingest/src/ingestion_blob_info.rs @@ -0,0 +1,125 @@ +use std::collections::HashMap; + +use serde::Serialize; + +use crate::{ + data_format::DataFormat, + descriptors::BlobDescriptor, + ingestion_properties::{IngestionProperties, ReportLevel, ReportMethod, ValidationPolicy}, + resource_manager::KustoIdentityToken, +}; + +// Basing the ingestion message on +// https://learn.microsoft.com/en-us/azure/data-explorer/kusto/api/netfx/kusto-ingest-client-rest#ingestion-message-internal-structure +#[derive(Serialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct QueuedIngestionMessage { + id: uuid::Uuid, + blob_path: String, + database_name: String, + table_name: String, + #[serde(skip_serializing_if = "Option::is_none")] + raw_data_size: Option, + #[serde(skip_serializing_if = "Option::is_none")] + retain_blob_on_success: Option, + #[serde(skip_serializing_if = "Option::is_none")] + flush_immediately: Option, + #[serde(skip_serializing_if = "Option::is_none")] + ignore_size_limit: Option, + // according to Go impl, the report level and method could be Option + report_level: ReportLevel, + report_method: ReportMethod, + // TODO: implement this + // #[serde(skip_serializing_if = "Option::is_none")]s + // #[serde(skip_serializing_if = "Option::is_none")] + // #[serde(with= "time::serde::iso8601")] + source_message_creation_time: String, + // The additional properties struct is modelled on: + // https://learn.microsoft.com/en-us/azure/data-explorer/ingestion-properties + additional_properties: AdditionalProperties, +} + +impl QueuedIngestionMessage { + pub fn new( + blob_descriptor: BlobDescriptor, + ingestion_properties: &IngestionProperties, + auth_context: &KustoIdentityToken, + ) -> Self { + let additional_properties = AdditionalProperties { + ingestion_mapping: None, + ingestion_mapping_reference: None, + creation_time: None, + extend_schema: None, + folder: None, + data_format: ingestion_properties.data_format.clone(), + ingest_if_not_exists: None, + ignore_first_record: None, + policy_ingestiontime: None, + recreate_schema: None, + tags: vec![], + validation_policy: None, + zip_pattern: None, + authorization_context: auth_context.clone(), + extra_additional_properties: HashMap::new(), + }; + + Self { + id: blob_descriptor.source_id, + blob_path: blob_descriptor.uri().to_string(), + raw_data_size: blob_descriptor.size, + database_name: ingestion_properties.database_name.clone(), + table_name: ingestion_properties.table_name.clone(), + retain_blob_on_success: ingestion_properties.retain_blob_on_success, + flush_immediately: ingestion_properties.flush_immediately, + report_level: ingestion_properties.report_level.clone(), + report_method: ingestion_properties.report_method.clone(), + ignore_size_limit: Some(false), + // TODO: configurability of creation time + source_message_creation_time: String::from("2023-08-16T13:30:04.639714"), + additional_properties: additional_properties, + } + } +} + +// The additional properties struct is modelled on: https://learn.microsoft.com/en-us/azure/data-explorer/ingestion-properties +#[derive(Serialize, Clone, Debug)] +pub struct AdditionalProperties { + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "ingestionMapping")] + pub ingestion_mapping: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "ingestionMappingReference")] + pub ingestion_mapping_reference: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "creationTime")] + pub creation_time: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub extend_schema: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub folder: Option, + #[serde(rename = "format")] + pub data_format: DataFormat, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "ingestIfNotExists")] + pub ingest_if_not_exists: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "ignoreFirstRecord")] + pub ignore_first_record: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub policy_ingestiontime: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub recreate_schema: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + pub tags: Vec, + #[serde(rename = "validationPolicy")] + #[serde(skip_serializing_if = "Option::is_none")] + pub validation_policy: Option, + #[serde(rename = "zipPattern")] + #[serde(skip_serializing_if = "Option::is_none")] + pub zip_pattern: Option, + // TODO: the user shouldn't be able to set this, we should expose certain properties via IngestionProperties rather than just the AdditionalProperties struct + #[serde(rename = "authorizationContext")] + pub authorization_context: KustoIdentityToken, + #[serde(flatten)] + pub extra_additional_properties: HashMap, +} diff --git a/azure-kusto-ingest/src/ingestion_properties.rs b/azure-kusto-ingest/src/ingestion_properties.rs new file mode 100644 index 0000000..ee573a8 --- /dev/null +++ b/azure-kusto-ingest/src/ingestion_properties.rs @@ -0,0 +1,108 @@ +use crate::data_format::{DataFormat, IngestionMappingKind}; +use serde::Serialize; +use serde_repr::Serialize_repr; + +#[derive(Clone, Debug)] +pub struct IngestionProperties { + pub database_name: String, + pub table_name: String, + pub retain_blob_on_success: Option, + pub data_format: DataFormat, + // I think we could make this neater by using some enum wizardry to enforce certain checks that are being done currently + // I'm thinking of something like we give an ingestion mapping enum, with + pub ingestion_mapping: Option>, + pub ingestion_mapping_type: Option, + pub ingestion_mapping_reference: Option>, + pub additional_tags: Vec, + pub ingest_if_not_exists: Vec, + pub ingest_by_tags: Vec, + pub drop_by_tags: Vec, + pub flush_immediately: Option, + pub ignore_first_record: bool, + pub report_level: ReportLevel, + pub report_method: ReportMethod, + pub validation_policy: Option, + // TODO: don't expose AdditionalProperties to user... + // pub additional_properties: AdditionalProperties, + // pub additional_properties: AdditionalProperties, +} + +#[derive(Serialize, Clone, Debug)] +pub struct ValidationPolicy { + #[serde(rename = "ValidationOptions")] + validation_options: ValidationOptions, + #[serde(rename = "ValidationImplications")] + validation_implications: ValidationImplications, +} + +#[derive(Serialize_repr, Clone, Debug)] +#[repr(u8)] +pub enum ValidationOptions { + DoNotValidate = 0, + ValidateCsvInputConstantColumns = 1, + ValidateCsvInputColumnLevelOnly = 2, +} + +#[derive(Serialize_repr, Clone, Debug)] +#[repr(u8)] +pub enum ValidationImplications { + Fail = 0, + BestEffort = 1, +} + +#[derive(Serialize_repr, Clone, Debug)] +#[repr(u8)] +pub enum ReportLevel { + Failures = 0, + None = 1, + All = 2, +} + +#[derive(Serialize_repr, Clone, Debug)] +#[repr(u8)] +pub enum ReportMethod { + Queue = 0, + Table = 1, +} + +#[derive(Serialize, Clone, Debug)] +pub enum TransformationMethod { + PropertyBagArrayToDictionary, + SourceLocation, + SourceLineNumber, + DateTimeFromUnixSeconds, + DateTimeFromUnixMilliseconds, + DateTimeFromUnixMicroseconds, + DateTimeFromUnixNanoseconds, + DropMappedFields, + BytesAsBase64, +} + +/// Use this class to create mappings for IngestionProperties.ingestionMappings and utilize mappings that were not +/// pre-created (it is recommended to create the mappings in advance and use ingestionMappingReference). +/// To read more about mappings look here: https://docs.microsoft.com/en-us/azure/kusto/management/mappings +#[derive(Serialize, Clone, Debug)] +pub struct ColumnMapping { + #[serde(rename = "Column")] + column: String, + // TODO: can this be an enum? + #[serde(rename = "DataType")] + datatype: String, + #[serde(rename = "Properties")] + properties: ColumnMappingProperties, +} + +#[derive(Serialize, Clone, Debug)] +pub struct ColumnMappingProperties { + #[serde(rename = "Path")] + path: Option, + #[serde(rename = "Transform")] + transform: Option, + #[serde(rename = "Ordinal")] + // TODO: This should get serialized to a string + ordinal: Option, + #[serde(rename = "ConstValue")] + const_value: Option, + #[serde(rename = "Field")] + field: Option, +} diff --git a/azure-kusto-ingest/src/lib.rs b/azure-kusto-ingest/src/lib.rs new file mode 100644 index 0000000..1a250ae --- /dev/null +++ b/azure-kusto-ingest/src/lib.rs @@ -0,0 +1,7 @@ +pub mod descriptors; +pub mod ingestion_properties; +pub mod queued_ingest; +pub(crate) mod result; +pub(crate) mod resource_manager; +pub mod data_format; +pub(crate) mod ingestion_blob_info; \ No newline at end of file diff --git a/azure-kusto-ingest/src/queued_ingest.rs b/azure-kusto-ingest/src/queued_ingest.rs new file mode 100644 index 0000000..48e1ba9 --- /dev/null +++ b/azure-kusto-ingest/src/queued_ingest.rs @@ -0,0 +1,126 @@ +use std::time::Duration; + +use anyhow::Result; +use azure_core::base64; +use azure_kusto_data::prelude::KustoClient; + +use crate::descriptors::{BlobDescriptor, FileDescriptor, StreamDescriptor}; +use crate::ingestion_blob_info::QueuedIngestionMessage; +use crate::ingestion_properties::IngestionProperties; +use crate::resource_manager::ResourceManager; +use crate::result::{IngestionResult, IngestionStatus}; + +pub struct QueuedIngestClient { + // The KustoClient is used to get the ingestion resources, it should be a client against the ingestion cluster endpoint + // kusto_client: KustoClient, + resource_manager: ResourceManager, +} + +impl QueuedIngestClient { + pub fn new(kusto_client: KustoClient, refresh_period: Duration) -> Self { + let resource_manager = ResourceManager::new(kusto_client, refresh_period); + + Self { resource_manager } + } + + pub async fn ingest_from_blob( + mut self, + blob_descriptor: BlobDescriptor, + ingestion_properties: &IngestionProperties, + ) -> Result { + // the queues returned here should ideally be the storage queue client from azure-storage-queue + // as such, it may be better for ResourceManager to return a struct that contains the storage queue client + let ingestion_queues = self + .resource_manager + .secured_ready_for_aggregation_queues() + .await?; + + let auth_context = self.resource_manager.authorization_context().await?; + + println!("queues: {:#?}", ingestion_queues); + + let message = QueuedIngestionMessage::new( + blob_descriptor.clone(), + ingestion_properties, + auth_context, + ); + + println!("message as struct: {:#?}\n", message); + + // TODO: pick a random queue from the queue clients returned by the resource manager + let queue_client = ingestion_queues.first().unwrap().clone(); + println!("queue_client: {:#?}\n", queue_client); + + let message = serde_json::to_string(&message).unwrap(); + println!("message as string: {}\n", message); + // Base64 encode the ingestion message + let message = base64::encode(&message); + println!("message as base64 encoded string: {}\n", message); + + let resp = queue_client.put_message(message).await?; + + println!("resp: {:#?}\n", resp); + + Ok(IngestionResult::new( + IngestionStatus::Queued, + &ingestion_properties.database_name, + &ingestion_properties.table_name, + blob_descriptor.source_id, + Some(blob_descriptor.uri()), + )) + } + + pub async fn ingest_from_file( + self, + file_descriptor: FileDescriptor, + ingestion_properties: IngestionProperties, + ) -> Result { + unimplemented!() + // This function needs to upload the blob from the file, and then call on ingest_from_blob + + // self.ingest_from_blob(blob_descriptor, &ingestion_properties) + // .await + } + + pub async fn ingest_from_stream( + self, + stream_descriptor: StreamDescriptor, + ingestion_properties: IngestionProperties, + ) -> Result { + unimplemented!() + // This function needs to upload the blob from the stream, and then call on ingest_from_blob + + // self.ingest_from_blob(blob_descriptor, &ingestion_properties) + // .await + } + + async fn upload_from_different_descriptor( + self, + descriptor: FileDescriptor, + ingestion_properties: &IngestionProperties, + ) -> Result { + unimplemented!() + // WIP + // let blob_name = format!( + // "{database_name}_{table_name}_{source_id}_{stream_name}", + // database_name = ingestion_properties.database_name, + // table_name = ingestion_properties.table_name, + // source_id = descriptor.source_id, + // stream_name = descriptor.stream_name.to_str().unwrap().to_string() + // ); + + // let container_clients = self.resource_manager.temp_storage().await?; + // // TODO: pick a random container client from the container clients returned by the resource manager + // let container_client = container_clients.first().unwrap().clone(); + // let blob_client = container_client.blob_client(blob_name); + + // blob_client.put_block_blob(body) + + // blob_url = ""; + + // Ok(BlobDescriptor::new( + // blob_url, + // ingestion_properties.source_id, + // )) + } +} diff --git a/azure-kusto-ingest/src/resource_manager.rs b/azure-kusto-ingest/src/resource_manager.rs new file mode 100644 index 0000000..cbaef70 --- /dev/null +++ b/azure-kusto-ingest/src/resource_manager.rs @@ -0,0 +1,364 @@ +use std::time::{Duration, Instant}; + +use anyhow::{Ok, Result}; +use azure_kusto_data::{models::TableV1, prelude::KustoClient}; +use azure_storage::StorageCredentials; +use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient}; +use url::Url; + +use azure_storage_queues::{QueueClient, QueueServiceClientBuilder}; + +#[derive(Debug, Clone)] +pub struct ResourceUri { + uri: String, + // parsed_uri: Url, + service_uri: String, + object_name: String, + sas_token: StorageCredentials, +} + +impl ResourceUri { + pub fn new(uri: String) -> Self { + println!("uri: {:#?}", uri); + let parsed_uri = Url::parse(&uri).unwrap(); + println!("parsed_uri: {:#?}", parsed_uri); + + let service_uri = parsed_uri.scheme().to_string() + + "://" + + parsed_uri.host_str().expect("We should get result here"); + let object_name = parsed_uri + .path() + .trim_start() + .trim_start_matches("/") + .to_string(); + let sas_token = parsed_uri + .query() + .expect("Returned URI should contain SAS token as query") + .to_string(); + let sas_token = StorageCredentials::sas_token(sas_token).unwrap(); + + Self { + uri, + // parsed_uri, + service_uri, + object_name, + sas_token, + } + } + + pub fn uri(&self) -> &str { + self.uri.as_str() + } + + pub fn service_uri(&self) -> &str { + self.service_uri.as_str() + } + + pub fn object_name(&self) -> &str { + self.object_name.as_str() + } + + pub fn sas_token(&self) -> &StorageCredentials { + &self.sas_token + } +} + +impl From<&ResourceUri> for QueueClient { + fn from(resource_uri: &ResourceUri) -> Self { + let queue_service = + QueueServiceClientBuilder::with_location(azure_storage::CloudLocation::Custom { + uri: resource_uri.service_uri().to_string(), + credentials: resource_uri.sas_token().clone(), + }) + .build(); + + queue_service.queue_client(resource_uri.object_name()) + } +} + +impl From<&ResourceUri> for ContainerClient { + fn from(resource_uri: &ResourceUri) -> Self { + ClientBuilder::with_location(azure_storage::CloudLocation::Custom { + uri: resource_uri.service_uri().to_string(), + credentials: resource_uri.sas_token().clone(), + }) + .container_client(resource_uri.object_name()) + } +} + +fn get_resource_by_name(table: &TableV1, resource_name: String) -> Vec { + let storage_root_index = table + .columns + .iter() + .position(|c| c.column_name == "StorageRoot") + .unwrap(); + let resource_type_name_index = table + .columns + .iter() + .position(|c| c.column_name == "ResourceTypeName") + .unwrap(); + + println!("table: {:#?}", table); + let resource_uris: Vec = table + .rows + .iter() + .filter(|r| r[resource_type_name_index] == resource_name) + .map(|r| { + ResourceUri::new( + r[storage_root_index] + .as_str() + .expect("We should get result here") + .to_string(), + ) + }) + .collect(); + + resource_uris +} + +pub struct IngestClientResources { + client: KustoClient, + secured_ready_for_aggregation_queues: Vec, + failed_ingestions_queues: Vec, + successful_ingestions_queues: Vec, + temp_storage: Vec, + ingestions_status_tables: Vec, + last_update: Option, + refresh_period: Duration, +} + +impl IngestClientResources { + pub fn new(client: KustoClient, refresh_period: Duration) -> Self { + Self { + client, + secured_ready_for_aggregation_queues: Vec::new(), + failed_ingestions_queues: Vec::new(), + successful_ingestions_queues: Vec::new(), + temp_storage: Vec::new(), + ingestions_status_tables: Vec::new(), + last_update: None, + refresh_period, + } + } + + fn is_not_applicable(&self) -> bool { + self.secured_ready_for_aggregation_queues.is_empty() + || self.failed_ingestions_queues.is_empty() + || self.successful_ingestions_queues.is_empty() + || self.temp_storage.is_empty() + || self.ingestions_status_tables.is_empty() + } + + // TODO: figure out refresh logic + // async fn refresh(&mut self) { + // self.get_ingest_client_resources().await + // // let interval = tokio::time::interval(self.refresh_period); + // // loop { + // // match self.get_ingest_client_resources(self.client.clone()).await { + // // Ok(_) => todo!(), + // // Err(e) => println!("Error: {}", e), + // // }; + + // // interval.tick().await; + // // } + + // // if self.last_update.is_none() + // // || self.last_update.unwrap().elapsed() > self.refresh_period + // // || self.is_not_applicable() + // // { + // // self.get_ingest_client_resources(client).await?; + // // self.last_update = Some(Instant::now()); + // // } + // // Ok(()) + // } + + // async fn refresh(&mut self, client: KustoClient) -> Result<()> { + // if self.last_update.is_none() + // || self.last_update.unwrap().elapsed() > self.refresh_period + // || self.is_not_applicable() + // { + // self.get_ingest_client_resources(client).await?; + // self.last_update = Some(Instant::now()); + // } + // Ok(()) + // } + + async fn get_ingest_client_resources(&mut self) -> Result<()> { + let results = self + .client + .execute_command("NetDefaultDB", ".get ingestion resources", None) + .await?; + let table = results.tables.first().unwrap(); + + self.secured_ready_for_aggregation_queues = + get_resource_by_name(table, "SecuredReadyForAggregationQueue".to_string()); + self.failed_ingestions_queues = + get_resource_by_name(table, "FailedIngestionsQueue".to_string()); + self.successful_ingestions_queues = + get_resource_by_name(table, "SuccessfulIngestionsQueue".to_string()); + self.temp_storage = get_resource_by_name(table, "TempStorage".to_string()); + self.ingestions_status_tables = + get_resource_by_name(table, "IngestionsStatusTable".to_string()); + + Ok(()) + } +} + +pub type KustoIdentityToken = String; +#[derive(Debug, Clone)] +pub struct AuthorizationContext { + client: KustoClient, + pub kusto_identity_token: KustoIdentityToken, + last_update: Option, + refresh_period: Duration, +} + +impl AuthorizationContext { + pub fn new(client: KustoClient, refresh_period: Duration) -> Self { + Self { + client, + kusto_identity_token: String::new(), + last_update: None, + refresh_period, + } + } + + // TODO: figure out refresh logic + // Make this spawn a tokio task to refresh the token based on elapsed time + async fn refresh(&mut self, client: KustoClient) -> Result<()> { + if self.last_update.is_none() + || self.kusto_identity_token.chars().all(char::is_whitespace) + || self.last_update.unwrap().elapsed() > self.refresh_period + { + self.get_authorization_context(client).await?; + self.last_update = Some(Instant::now()); + } + Ok(()) + } + + async fn get_authorization_context(&mut self, client: KustoClient) -> Result<()> { + let results = client + .execute_command("NetDefaultDB", ".get kusto identity token", None) + .await?; + let table = results.tables.first().unwrap(); + + println!("table: {:#?}", table); + + self.kusto_identity_token = table + .rows + .first() + .unwrap() + .first() + .unwrap() + .as_str() + .unwrap() + .to_string(); + + Ok(()) + } + + pub async fn kusto_identity_token( + &mut self, + client: KustoClient, + ) -> Result { + self.refresh(client).await?; + Ok(self.kusto_identity_token.clone()) + } +} + +pub struct ResourceManager { + // client: KustoClient, + pub ingest_client_resources: IngestClientResources, + pub authorization_context: AuthorizationContext, +} + +impl ResourceManager { + pub fn new(client: KustoClient, refresh_period: Duration) -> Self { + Self { + ingest_client_resources: IngestClientResources::new(client.clone(), refresh_period), + authorization_context: AuthorizationContext::new(client, refresh_period), + } + } + + // pub async fn secured_ready_for_aggregation_queues(&mut self) -> Result> { + pub async fn secured_ready_for_aggregation_queues(&mut self) -> Result> { + // TODO: proper refresh and caching logic so we don't need to generate new clients every time + self.ingest_client_resources + .get_ingest_client_resources() + .await?; + + // We should return Azure SDK QueueClient's here. + // Although it's recommended to share the same transport, we can't as the storage credentials (SAS tokens) differ per queue. + // So the best we can do is store the individual QueueClient's so multiple requests + + let queue_uris = self + .ingest_client_resources + .secured_ready_for_aggregation_queues + .clone(); + + Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect()) + } + + pub async fn failed_ingestions_queues(&mut self) -> Result> { + // TODO: proper refresh and caching logic so we don't need to generate new clients every time + self.ingest_client_resources + .get_ingest_client_resources() + .await?; + + let queue_uris = self + .ingest_client_resources + .failed_ingestions_queues + .clone(); + + Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect()) + } + + pub async fn successful_ingestions_queues(&mut self) -> Result> { + // TODO: proper refresh and caching logic so we don't need to generate new clients every time + self.ingest_client_resources + .get_ingest_client_resources() + .await?; + + let queue_uris = self + .ingest_client_resources + .successful_ingestions_queues + .clone(); + + Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect()) + } + + pub async fn temp_storage(&mut self) -> Result> { + // TODO: proper refresh and caching logic so we don't need to generate new clients every time + self.ingest_client_resources + .get_ingest_client_resources() + .await?; + + let container_uris = self.ingest_client_resources.temp_storage.clone(); + + Ok(container_uris + .iter() + .map(|c| ContainerClient::from(c)) + .collect()) + } + + // pub async fn ingestions_status_tables( + // &mut self, + // client: KustoClient, + // ) -> Result> { + // self.refresh(client).await?; + // Ok(self.ingestions_status_tables.clone()) + // } + + // pub fn retrieve_service_type(self) -> ServiceType { + // unimplemented!() + // } + + pub async fn authorization_context(&mut self) -> Result<&KustoIdentityToken> { + // TODO: proper refresh and caching logic so we don't need to query Kusto for the token every time + self.authorization_context + .get_authorization_context(self.ingest_client_resources.client.clone()) + .await?; + + Ok(&self.authorization_context.kusto_identity_token) + } +} diff --git a/azure-kusto-ingest/src/result.rs b/azure-kusto-ingest/src/result.rs new file mode 100644 index 0000000..64fb47c --- /dev/null +++ b/azure-kusto-ingest/src/result.rs @@ -0,0 +1,40 @@ +use uuid::Uuid; + +pub enum IngestionStatus { + // The ingestion was queued. + Queued, + // The ingestion was successfully streamed + Success +} + +// The result of an ingestion. +pub struct IngestionResult { + // Will be `Queued` if the ingestion is queued, or `Success` if the ingestion is streaming and successful. + status: IngestionStatus, + // The name of the database where the ingestion was performed. + database: String, + // The name of the table where the ingestion was performed. + table: String, + // The source id of the ingestion. + source_id: Uuid, + // The blob uri of the ingestion, if exists. + blob_uri: Option +} + +impl IngestionResult { + pub fn new( + status: IngestionStatus, + database: &String, + table: &String, + source_id: Uuid, + blob_uri: Option, + ) -> Self { + Self { + status, + database: database.clone(), + table: table.clone(), + source_id, + blob_uri, + } + } +}