Initial commit of queued ingestion client

Author:    Krishan Mistry <krishan@krishanmistry.com>
This commit is contained in:
Krishan Mistry 2023-08-18 09:57:22 +01:00
Родитель 02ce97fa78
Коммит 846dd72cd1
11 изменённых файлов: 1006 добавлений и 1 удалений

Просмотреть файл

@ -1,2 +1,2 @@
[workspace]
members = ["azure-kusto-data"]
members = ["azure-kusto-data", "azure-kusto-ingest"]

Просмотреть файл

@ -0,0 +1,23 @@
[package]
name = "azure-kusto-ingest"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
uuid = {version = "1", features = ["v4", "serde"]}
serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
serde_repr = "0.1"
url = "2"
azure-kusto-data = {path = "../azure-kusto-data"}
anyhow = "1.0.72"
tokio = { version = "1", features = ["full"] }
azure_core = "0.13"
azure_storage = "0.13"
azure_storage_blobs = "0.13"
azure_storage_queues = "0.13"
time = { version = "0.3", features = ["serde"] }
azure_identity = "0.13.0"

Просмотреть файл

@ -0,0 +1,108 @@
use serde::Serialize;
#[derive(Serialize, Clone, Debug)]
pub enum IngestionMappingKind {
#[serde(rename = "Csv")]
CSV,
#[serde(rename = "Json")]
JSON,
Avro,
ApacheAvro,
Parquet,
SStream,
#[serde(rename = "Orc")]
ORC,
#[serde(rename = "W3CLogFile")]
W3CLOGFILE,
Unknown,
}
/// All data formats supported by Kusto
#[derive(Serialize, Clone, Debug)]
pub enum DataFormat {
#[serde(rename = "apacheavro")]
ApacheAvro,
#[serde(rename = "avro")]
Avro,
#[serde(rename = "csv")]
CSV,
#[serde(rename = "json")]
JSON,
#[serde(rename = "multijson")]
MultiJSON,
#[serde(rename = "orc")]
ORC,
#[serde(rename = "parquet")]
Parquet,
#[serde(rename = "psv")]
PSV,
#[serde(rename = "raw")]
RAW,
#[serde(rename = "scsv")]
SCSV,
#[serde(rename = "sohsv")]
SOHsv,
#[serde(rename = "singlejson")]
SingleJSON,
#[serde(rename = "sstream")]
SStream,
#[serde(rename = "tsv")]
TSV,
#[serde(rename = "tsve")]
TSVe,
#[serde(rename = "txt")]
TXT,
#[serde(rename = "w3clogfile")]
W3CLOGFILE,
}
impl DataFormat {
pub fn default() -> Self {
DataFormat::CSV
}
pub fn ingestion_mapping_kind(self) -> IngestionMappingKind {
match self {
DataFormat::CSV => IngestionMappingKind::CSV,
DataFormat::TSV => IngestionMappingKind::CSV,
DataFormat::SCSV => IngestionMappingKind::CSV,
DataFormat::SOHsv => IngestionMappingKind::CSV,
DataFormat::PSV => IngestionMappingKind::CSV,
DataFormat::TXT => IngestionMappingKind::CSV,
DataFormat::TSVe => IngestionMappingKind::CSV,
DataFormat::JSON => IngestionMappingKind::JSON,
DataFormat::SingleJSON => IngestionMappingKind::JSON,
DataFormat::MultiJSON => IngestionMappingKind::JSON,
DataFormat::Avro => IngestionMappingKind::Avro,
DataFormat::ApacheAvro => IngestionMappingKind::ApacheAvro,
DataFormat::Parquet => IngestionMappingKind::Parquet,
DataFormat::SStream => IngestionMappingKind::SStream,
DataFormat::ORC => IngestionMappingKind::ORC,
DataFormat::RAW => IngestionMappingKind::CSV,
DataFormat::W3CLOGFILE => IngestionMappingKind::W3CLOGFILE,
}
}
/// Binary formats should not be compressed
pub fn compressible(self) -> bool {
match self {
DataFormat::CSV => true,
DataFormat::TSV => true,
DataFormat::SCSV => true,
DataFormat::SOHsv => true,
DataFormat::PSV => true,
DataFormat::TXT => true,
DataFormat::TSVe => true,
DataFormat::JSON => true,
DataFormat::SingleJSON => true,
DataFormat::MultiJSON => true,
DataFormat::Avro => true,
DataFormat::ApacheAvro => true,
DataFormat::Parquet => false,
DataFormat::SStream => false,
DataFormat::ORC => false,
DataFormat::RAW => true,
DataFormat::W3CLOGFILE => true,
}
}
}

Просмотреть файл

@ -0,0 +1,98 @@
use std::{io::Read, path::PathBuf, fmt::format};
use azure_storage::StorageCredentials;
use url::Url;
use uuid::Uuid;
#[derive(Clone, Debug)]
pub enum BlobAuth {
SASToken(),
// adds `;managed_identity=<identity>` to the blob path
UserAssignedManagedIdentity(String),
// adds `;managed_identity=system` to the blob path
SystemAssignedManagedIdentity
}
#[derive(Clone, Debug)]
pub struct BlobDescriptor {
uri: Url,
pub(crate) size: Option<u64>,
pub(crate) source_id: Uuid,
blob_auth: Option<BlobAuth>
}
impl BlobDescriptor {
pub fn new(uri: Url, size: Option<u64>, source_id: Option<Uuid>) -> Self {
let source_id = match source_id {
Some(source_id) => source_id,
None => Uuid::new_v4(),
};
Self {
uri,
size,
source_id,
blob_auth: None,
}
}
pub fn with_blob_auth(mut self, blob_auth: BlobAuth) -> Self {
self.blob_auth = Some(blob_auth);
self
}
pub fn uri(&self) -> String {
match &self.blob_auth {
Some(BlobAuth::SASToken()) => {
let mut uri = self.uri.clone();
uri.set_query(Some("sas_token"));
uri.to_string()
},
Some(BlobAuth::UserAssignedManagedIdentity(identity)) => {
format!("{};managed_identity={}", self.uri, identity)
},
Some(BlobAuth::SystemAssignedManagedIdentity) => {
format!("{};managed_identity=system", self.uri)
},
None => self.uri.to_string(),
}
}
}
#[derive(Clone, Debug)]
pub struct FileDescriptor {
pub path: PathBuf,
pub size: Option<u64>,
pub source_id: Uuid,
}
impl FileDescriptor {
pub fn new(path: PathBuf, size: Option<u64>, source_id: Option<Uuid>) -> Self {
unimplemented!()
}
}
// #[derive(Clone, Debug)]
pub struct StreamDescriptor {
stream: Box<dyn Read>,
size: Option<u64>,
source_id: Uuid,
compressed: bool,
stream_name: String,
}
impl StreamDescriptor {
pub fn new(
stream: Box<dyn Read>,
size: Option<u64>,
source_id: Option<Uuid>,
compressed: bool,
stream_name: String,
) -> Self {
unimplemented!()
}
pub fn from_file_descriptor(file_descriptor: FileDescriptor) -> Self {
unimplemented!()
}
}

Просмотреть файл

@ -0,0 +1,6 @@
pub enum KustoClientError {
KustoMappingError,
KustoDuplicateMappingError,
KustoMissingMappingError,
KustoInvalidEndpointError,
}

Просмотреть файл

@ -0,0 +1,125 @@
use std::collections::HashMap;
use serde::Serialize;
use crate::{
data_format::DataFormat,
descriptors::BlobDescriptor,
ingestion_properties::{IngestionProperties, ReportLevel, ReportMethod, ValidationPolicy},
resource_manager::KustoIdentityToken,
};
// Basing the ingestion message on
// https://learn.microsoft.com/en-us/azure/data-explorer/kusto/api/netfx/kusto-ingest-client-rest#ingestion-message-internal-structure
#[derive(Serialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct QueuedIngestionMessage {
id: uuid::Uuid,
blob_path: String,
database_name: String,
table_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
raw_data_size: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
retain_blob_on_success: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
flush_immediately: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
ignore_size_limit: Option<bool>,
// according to Go impl, the report level and method could be Option
report_level: ReportLevel,
report_method: ReportMethod,
// TODO: implement this
// #[serde(skip_serializing_if = "Option::is_none")]s
// #[serde(skip_serializing_if = "Option::is_none")]
// #[serde(with= "time::serde::iso8601")]
source_message_creation_time: String,
// The additional properties struct is modelled on:
// https://learn.microsoft.com/en-us/azure/data-explorer/ingestion-properties
additional_properties: AdditionalProperties,
}
impl QueuedIngestionMessage {
pub fn new(
blob_descriptor: BlobDescriptor,
ingestion_properties: &IngestionProperties,
auth_context: &KustoIdentityToken,
) -> Self {
let additional_properties = AdditionalProperties {
ingestion_mapping: None,
ingestion_mapping_reference: None,
creation_time: None,
extend_schema: None,
folder: None,
data_format: ingestion_properties.data_format.clone(),
ingest_if_not_exists: None,
ignore_first_record: None,
policy_ingestiontime: None,
recreate_schema: None,
tags: vec![],
validation_policy: None,
zip_pattern: None,
authorization_context: auth_context.clone(),
extra_additional_properties: HashMap::new(),
};
Self {
id: blob_descriptor.source_id,
blob_path: blob_descriptor.uri().to_string(),
raw_data_size: blob_descriptor.size,
database_name: ingestion_properties.database_name.clone(),
table_name: ingestion_properties.table_name.clone(),
retain_blob_on_success: ingestion_properties.retain_blob_on_success,
flush_immediately: ingestion_properties.flush_immediately,
report_level: ingestion_properties.report_level.clone(),
report_method: ingestion_properties.report_method.clone(),
ignore_size_limit: Some(false),
// TODO: configurability of creation time
source_message_creation_time: String::from("2023-08-16T13:30:04.639714"),
additional_properties: additional_properties,
}
}
}
// The additional properties struct is modelled on: https://learn.microsoft.com/en-us/azure/data-explorer/ingestion-properties
#[derive(Serialize, Clone, Debug)]
pub struct AdditionalProperties {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "ingestionMapping")]
pub ingestion_mapping: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "ingestionMappingReference")]
pub ingestion_mapping_reference: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "creationTime")]
pub creation_time: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extend_schema: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub folder: Option<String>,
#[serde(rename = "format")]
pub data_format: DataFormat,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "ingestIfNotExists")]
pub ingest_if_not_exists: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "ignoreFirstRecord")]
pub ignore_first_record: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub policy_ingestiontime: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub recreate_schema: Option<bool>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub tags: Vec<String>,
#[serde(rename = "validationPolicy")]
#[serde(skip_serializing_if = "Option::is_none")]
pub validation_policy: Option<ValidationPolicy>,
#[serde(rename = "zipPattern")]
#[serde(skip_serializing_if = "Option::is_none")]
pub zip_pattern: Option<String>,
// TODO: the user shouldn't be able to set this, we should expose certain properties via IngestionProperties rather than just the AdditionalProperties struct
#[serde(rename = "authorizationContext")]
pub authorization_context: KustoIdentityToken,
#[serde(flatten)]
pub extra_additional_properties: HashMap<String, String>,
}

Просмотреть файл

@ -0,0 +1,108 @@
use crate::data_format::{DataFormat, IngestionMappingKind};
use serde::Serialize;
use serde_repr::Serialize_repr;
#[derive(Clone, Debug)]
pub struct IngestionProperties {
pub database_name: String,
pub table_name: String,
pub retain_blob_on_success: Option<bool>,
pub data_format: DataFormat,
// I think we could make this neater by using some enum wizardry to enforce certain checks that are being done currently
// I'm thinking of something like we give an ingestion mapping enum, with
pub ingestion_mapping: Option<Vec<ColumnMapping>>,
pub ingestion_mapping_type: Option<IngestionMappingKind>,
pub ingestion_mapping_reference: Option<Vec<String>>,
pub additional_tags: Vec<String>,
pub ingest_if_not_exists: Vec<String>,
pub ingest_by_tags: Vec<String>,
pub drop_by_tags: Vec<String>,
pub flush_immediately: Option<bool>,
pub ignore_first_record: bool,
pub report_level: ReportLevel,
pub report_method: ReportMethod,
pub validation_policy: Option<ValidationPolicy>,
// TODO: don't expose AdditionalProperties to user...
// pub additional_properties: AdditionalProperties,
// pub additional_properties: AdditionalProperties,
}
#[derive(Serialize, Clone, Debug)]
pub struct ValidationPolicy {
#[serde(rename = "ValidationOptions")]
validation_options: ValidationOptions,
#[serde(rename = "ValidationImplications")]
validation_implications: ValidationImplications,
}
#[derive(Serialize_repr, Clone, Debug)]
#[repr(u8)]
pub enum ValidationOptions {
DoNotValidate = 0,
ValidateCsvInputConstantColumns = 1,
ValidateCsvInputColumnLevelOnly = 2,
}
#[derive(Serialize_repr, Clone, Debug)]
#[repr(u8)]
pub enum ValidationImplications {
Fail = 0,
BestEffort = 1,
}
#[derive(Serialize_repr, Clone, Debug)]
#[repr(u8)]
pub enum ReportLevel {
Failures = 0,
None = 1,
All = 2,
}
#[derive(Serialize_repr, Clone, Debug)]
#[repr(u8)]
pub enum ReportMethod {
Queue = 0,
Table = 1,
}
#[derive(Serialize, Clone, Debug)]
pub enum TransformationMethod {
PropertyBagArrayToDictionary,
SourceLocation,
SourceLineNumber,
DateTimeFromUnixSeconds,
DateTimeFromUnixMilliseconds,
DateTimeFromUnixMicroseconds,
DateTimeFromUnixNanoseconds,
DropMappedFields,
BytesAsBase64,
}
/// Use this class to create mappings for IngestionProperties.ingestionMappings and utilize mappings that were not
/// pre-created (it is recommended to create the mappings in advance and use ingestionMappingReference).
/// To read more about mappings look here: https://docs.microsoft.com/en-us/azure/kusto/management/mappings
#[derive(Serialize, Clone, Debug)]
pub struct ColumnMapping {
#[serde(rename = "Column")]
column: String,
// TODO: can this be an enum?
#[serde(rename = "DataType")]
datatype: String,
#[serde(rename = "Properties")]
properties: ColumnMappingProperties,
}
#[derive(Serialize, Clone, Debug)]
pub struct ColumnMappingProperties {
#[serde(rename = "Path")]
path: Option<String>,
#[serde(rename = "Transform")]
transform: Option<TransformationMethod>,
#[serde(rename = "Ordinal")]
// TODO: This should get serialized to a string
ordinal: Option<u32>,
#[serde(rename = "ConstValue")]
const_value: Option<String>,
#[serde(rename = "Field")]
field: Option<String>,
}

Просмотреть файл

@ -0,0 +1,7 @@
pub mod descriptors;
pub mod ingestion_properties;
pub mod queued_ingest;
pub(crate) mod result;
pub(crate) mod resource_manager;
pub mod data_format;
pub(crate) mod ingestion_blob_info;

Просмотреть файл

@ -0,0 +1,126 @@
use std::time::Duration;
use anyhow::Result;
use azure_core::base64;
use azure_kusto_data::prelude::KustoClient;
use crate::descriptors::{BlobDescriptor, FileDescriptor, StreamDescriptor};
use crate::ingestion_blob_info::QueuedIngestionMessage;
use crate::ingestion_properties::IngestionProperties;
use crate::resource_manager::ResourceManager;
use crate::result::{IngestionResult, IngestionStatus};
pub struct QueuedIngestClient {
// The KustoClient is used to get the ingestion resources, it should be a client against the ingestion cluster endpoint
// kusto_client: KustoClient,
resource_manager: ResourceManager,
}
impl QueuedIngestClient {
pub fn new(kusto_client: KustoClient, refresh_period: Duration) -> Self {
let resource_manager = ResourceManager::new(kusto_client, refresh_period);
Self { resource_manager }
}
pub async fn ingest_from_blob(
mut self,
blob_descriptor: BlobDescriptor,
ingestion_properties: &IngestionProperties,
) -> Result<IngestionResult> {
// the queues returned here should ideally be the storage queue client from azure-storage-queue
// as such, it may be better for ResourceManager to return a struct that contains the storage queue client
let ingestion_queues = self
.resource_manager
.secured_ready_for_aggregation_queues()
.await?;
let auth_context = self.resource_manager.authorization_context().await?;
println!("queues: {:#?}", ingestion_queues);
let message = QueuedIngestionMessage::new(
blob_descriptor.clone(),
ingestion_properties,
auth_context,
);
println!("message as struct: {:#?}\n", message);
// TODO: pick a random queue from the queue clients returned by the resource manager
let queue_client = ingestion_queues.first().unwrap().clone();
println!("queue_client: {:#?}\n", queue_client);
let message = serde_json::to_string(&message).unwrap();
println!("message as string: {}\n", message);
// Base64 encode the ingestion message
let message = base64::encode(&message);
println!("message as base64 encoded string: {}\n", message);
let resp = queue_client.put_message(message).await?;
println!("resp: {:#?}\n", resp);
Ok(IngestionResult::new(
IngestionStatus::Queued,
&ingestion_properties.database_name,
&ingestion_properties.table_name,
blob_descriptor.source_id,
Some(blob_descriptor.uri()),
))
}
pub async fn ingest_from_file(
self,
file_descriptor: FileDescriptor,
ingestion_properties: IngestionProperties,
) -> Result<IngestionResult> {
unimplemented!()
// This function needs to upload the blob from the file, and then call on ingest_from_blob
// self.ingest_from_blob(blob_descriptor, &ingestion_properties)
// .await
}
pub async fn ingest_from_stream(
self,
stream_descriptor: StreamDescriptor,
ingestion_properties: IngestionProperties,
) -> Result<IngestionResult> {
unimplemented!()
// This function needs to upload the blob from the stream, and then call on ingest_from_blob
// self.ingest_from_blob(blob_descriptor, &ingestion_properties)
// .await
}
async fn upload_from_different_descriptor(
self,
descriptor: FileDescriptor,
ingestion_properties: &IngestionProperties,
) -> Result<BlobDescriptor> {
unimplemented!()
// WIP
// let blob_name = format!(
// "{database_name}_{table_name}_{source_id}_{stream_name}",
// database_name = ingestion_properties.database_name,
// table_name = ingestion_properties.table_name,
// source_id = descriptor.source_id,
// stream_name = descriptor.stream_name.to_str().unwrap().to_string()
// );
// let container_clients = self.resource_manager.temp_storage().await?;
// // TODO: pick a random container client from the container clients returned by the resource manager
// let container_client = container_clients.first().unwrap().clone();
// let blob_client = container_client.blob_client(blob_name);
// blob_client.put_block_blob(body)
// blob_url = "";
// Ok(BlobDescriptor::new(
// blob_url,
// ingestion_properties.source_id,
// ))
}
}

Просмотреть файл

@ -0,0 +1,364 @@
use std::time::{Duration, Instant};
use anyhow::{Ok, Result};
use azure_kusto_data::{models::TableV1, prelude::KustoClient};
use azure_storage::StorageCredentials;
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
use url::Url;
use azure_storage_queues::{QueueClient, QueueServiceClientBuilder};
#[derive(Debug, Clone)]
pub struct ResourceUri {
uri: String,
// parsed_uri: Url,
service_uri: String,
object_name: String,
sas_token: StorageCredentials,
}
impl ResourceUri {
pub fn new(uri: String) -> Self {
println!("uri: {:#?}", uri);
let parsed_uri = Url::parse(&uri).unwrap();
println!("parsed_uri: {:#?}", parsed_uri);
let service_uri = parsed_uri.scheme().to_string()
+ "://"
+ parsed_uri.host_str().expect("We should get result here");
let object_name = parsed_uri
.path()
.trim_start()
.trim_start_matches("/")
.to_string();
let sas_token = parsed_uri
.query()
.expect("Returned URI should contain SAS token as query")
.to_string();
let sas_token = StorageCredentials::sas_token(sas_token).unwrap();
Self {
uri,
// parsed_uri,
service_uri,
object_name,
sas_token,
}
}
pub fn uri(&self) -> &str {
self.uri.as_str()
}
pub fn service_uri(&self) -> &str {
self.service_uri.as_str()
}
pub fn object_name(&self) -> &str {
self.object_name.as_str()
}
pub fn sas_token(&self) -> &StorageCredentials {
&self.sas_token
}
}
impl From<&ResourceUri> for QueueClient {
fn from(resource_uri: &ResourceUri) -> Self {
let queue_service =
QueueServiceClientBuilder::with_location(azure_storage::CloudLocation::Custom {
uri: resource_uri.service_uri().to_string(),
credentials: resource_uri.sas_token().clone(),
})
.build();
queue_service.queue_client(resource_uri.object_name())
}
}
impl From<&ResourceUri> for ContainerClient {
fn from(resource_uri: &ResourceUri) -> Self {
ClientBuilder::with_location(azure_storage::CloudLocation::Custom {
uri: resource_uri.service_uri().to_string(),
credentials: resource_uri.sas_token().clone(),
})
.container_client(resource_uri.object_name())
}
}
fn get_resource_by_name(table: &TableV1, resource_name: String) -> Vec<ResourceUri> {
let storage_root_index = table
.columns
.iter()
.position(|c| c.column_name == "StorageRoot")
.unwrap();
let resource_type_name_index = table
.columns
.iter()
.position(|c| c.column_name == "ResourceTypeName")
.unwrap();
println!("table: {:#?}", table);
let resource_uris: Vec<ResourceUri> = table
.rows
.iter()
.filter(|r| r[resource_type_name_index] == resource_name)
.map(|r| {
ResourceUri::new(
r[storage_root_index]
.as_str()
.expect("We should get result here")
.to_string(),
)
})
.collect();
resource_uris
}
pub struct IngestClientResources {
client: KustoClient,
secured_ready_for_aggregation_queues: Vec<ResourceUri>,
failed_ingestions_queues: Vec<ResourceUri>,
successful_ingestions_queues: Vec<ResourceUri>,
temp_storage: Vec<ResourceUri>,
ingestions_status_tables: Vec<ResourceUri>,
last_update: Option<Instant>,
refresh_period: Duration,
}
impl IngestClientResources {
pub fn new(client: KustoClient, refresh_period: Duration) -> Self {
Self {
client,
secured_ready_for_aggregation_queues: Vec::new(),
failed_ingestions_queues: Vec::new(),
successful_ingestions_queues: Vec::new(),
temp_storage: Vec::new(),
ingestions_status_tables: Vec::new(),
last_update: None,
refresh_period,
}
}
fn is_not_applicable(&self) -> bool {
self.secured_ready_for_aggregation_queues.is_empty()
|| self.failed_ingestions_queues.is_empty()
|| self.successful_ingestions_queues.is_empty()
|| self.temp_storage.is_empty()
|| self.ingestions_status_tables.is_empty()
}
// TODO: figure out refresh logic
// async fn refresh(&mut self) {
// self.get_ingest_client_resources().await
// // let interval = tokio::time::interval(self.refresh_period);
// // loop {
// // match self.get_ingest_client_resources(self.client.clone()).await {
// // Ok(_) => todo!(),
// // Err(e) => println!("Error: {}", e),
// // };
// // interval.tick().await;
// // }
// // if self.last_update.is_none()
// // || self.last_update.unwrap().elapsed() > self.refresh_period
// // || self.is_not_applicable()
// // {
// // self.get_ingest_client_resources(client).await?;
// // self.last_update = Some(Instant::now());
// // }
// // Ok(())
// }
// async fn refresh(&mut self, client: KustoClient) -> Result<()> {
// if self.last_update.is_none()
// || self.last_update.unwrap().elapsed() > self.refresh_period
// || self.is_not_applicable()
// {
// self.get_ingest_client_resources(client).await?;
// self.last_update = Some(Instant::now());
// }
// Ok(())
// }
async fn get_ingest_client_resources(&mut self) -> Result<()> {
let results = self
.client
.execute_command("NetDefaultDB", ".get ingestion resources", None)
.await?;
let table = results.tables.first().unwrap();
self.secured_ready_for_aggregation_queues =
get_resource_by_name(table, "SecuredReadyForAggregationQueue".to_string());
self.failed_ingestions_queues =
get_resource_by_name(table, "FailedIngestionsQueue".to_string());
self.successful_ingestions_queues =
get_resource_by_name(table, "SuccessfulIngestionsQueue".to_string());
self.temp_storage = get_resource_by_name(table, "TempStorage".to_string());
self.ingestions_status_tables =
get_resource_by_name(table, "IngestionsStatusTable".to_string());
Ok(())
}
}
pub type KustoIdentityToken = String;
#[derive(Debug, Clone)]
pub struct AuthorizationContext {
client: KustoClient,
pub kusto_identity_token: KustoIdentityToken,
last_update: Option<Instant>,
refresh_period: Duration,
}
impl AuthorizationContext {
pub fn new(client: KustoClient, refresh_period: Duration) -> Self {
Self {
client,
kusto_identity_token: String::new(),
last_update: None,
refresh_period,
}
}
// TODO: figure out refresh logic
// Make this spawn a tokio task to refresh the token based on elapsed time
async fn refresh(&mut self, client: KustoClient) -> Result<()> {
if self.last_update.is_none()
|| self.kusto_identity_token.chars().all(char::is_whitespace)
|| self.last_update.unwrap().elapsed() > self.refresh_period
{
self.get_authorization_context(client).await?;
self.last_update = Some(Instant::now());
}
Ok(())
}
async fn get_authorization_context(&mut self, client: KustoClient) -> Result<()> {
let results = client
.execute_command("NetDefaultDB", ".get kusto identity token", None)
.await?;
let table = results.tables.first().unwrap();
println!("table: {:#?}", table);
self.kusto_identity_token = table
.rows
.first()
.unwrap()
.first()
.unwrap()
.as_str()
.unwrap()
.to_string();
Ok(())
}
pub async fn kusto_identity_token(
&mut self,
client: KustoClient,
) -> Result<KustoIdentityToken> {
self.refresh(client).await?;
Ok(self.kusto_identity_token.clone())
}
}
pub struct ResourceManager {
// client: KustoClient,
pub ingest_client_resources: IngestClientResources,
pub authorization_context: AuthorizationContext,
}
impl ResourceManager {
pub fn new(client: KustoClient, refresh_period: Duration) -> Self {
Self {
ingest_client_resources: IngestClientResources::new(client.clone(), refresh_period),
authorization_context: AuthorizationContext::new(client, refresh_period),
}
}
// pub async fn secured_ready_for_aggregation_queues(&mut self) -> Result<Vec<ResourceUri>> {
pub async fn secured_ready_for_aggregation_queues(&mut self) -> Result<Vec<QueueClient>> {
// TODO: proper refresh and caching logic so we don't need to generate new clients every time
self.ingest_client_resources
.get_ingest_client_resources()
.await?;
// We should return Azure SDK QueueClient's here.
// Although it's recommended to share the same transport, we can't as the storage credentials (SAS tokens) differ per queue.
// So the best we can do is store the individual QueueClient's so multiple requests
let queue_uris = self
.ingest_client_resources
.secured_ready_for_aggregation_queues
.clone();
Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect())
}
pub async fn failed_ingestions_queues(&mut self) -> Result<Vec<QueueClient>> {
// TODO: proper refresh and caching logic so we don't need to generate new clients every time
self.ingest_client_resources
.get_ingest_client_resources()
.await?;
let queue_uris = self
.ingest_client_resources
.failed_ingestions_queues
.clone();
Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect())
}
pub async fn successful_ingestions_queues(&mut self) -> Result<Vec<QueueClient>> {
// TODO: proper refresh and caching logic so we don't need to generate new clients every time
self.ingest_client_resources
.get_ingest_client_resources()
.await?;
let queue_uris = self
.ingest_client_resources
.successful_ingestions_queues
.clone();
Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect())
}
pub async fn temp_storage(&mut self) -> Result<Vec<ContainerClient>> {
// TODO: proper refresh and caching logic so we don't need to generate new clients every time
self.ingest_client_resources
.get_ingest_client_resources()
.await?;
let container_uris = self.ingest_client_resources.temp_storage.clone();
Ok(container_uris
.iter()
.map(|c| ContainerClient::from(c))
.collect())
}
// pub async fn ingestions_status_tables(
// &mut self,
// client: KustoClient,
// ) -> Result<Vec<ResourceUri>> {
// self.refresh(client).await?;
// Ok(self.ingestions_status_tables.clone())
// }
// pub fn retrieve_service_type(self) -> ServiceType {
// unimplemented!()
// }
pub async fn authorization_context(&mut self) -> Result<&KustoIdentityToken> {
// TODO: proper refresh and caching logic so we don't need to query Kusto for the token every time
self.authorization_context
.get_authorization_context(self.ingest_client_resources.client.clone())
.await?;
Ok(&self.authorization_context.kusto_identity_token)
}
}

Просмотреть файл

@ -0,0 +1,40 @@
use uuid::Uuid;
pub enum IngestionStatus {
// The ingestion was queued.
Queued,
// The ingestion was successfully streamed
Success
}
// The result of an ingestion.
pub struct IngestionResult {
// Will be `Queued` if the ingestion is queued, or `Success` if the ingestion is streaming and successful.
status: IngestionStatus,
// The name of the database where the ingestion was performed.
database: String,
// The name of the table where the ingestion was performed.
table: String,
// The source id of the ingestion.
source_id: Uuid,
// The blob uri of the ingestion, if exists.
blob_uri: Option<String>
}
impl IngestionResult {
pub fn new(
status: IngestionStatus,
database: &String,
table: &String,
source_id: Uuid,
blob_uri: Option<String>,
) -> Self {
Self {
status,
database: database.clone(),
table: table.clone(),
source_id,
blob_uri,
}
}
}