choose a random queue + more changes

This commit is contained in:
Krishan Mistry 2023-08-30 12:54:52 +01:00
Родитель 9b4a9b2ce5
Коммит 691c255d9b
4 изменённых файлов: 18 добавлений и 27 удалений

Просмотреть файл

@ -20,4 +20,5 @@ azure_storage = "0.13"
azure_storage_blobs = "0.13"
azure_storage_queues = "0.13"
chrono = { version = "0.4", features = ["serde"] }
rand = "0.8"

Просмотреть файл

@ -13,16 +13,18 @@ use crate::{
#[derive(Serialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub struct QueuedIngestionMessage {
/// Message identifier (GUID)
/// Message identifier for this upload
id: uuid::Uuid,
/// Path (URI) to the blob, including the SAS key granting permissions to read/write/delete it.
/// Permissions are required so that the ingestion service can delete the blob once it has completed ingesting the data.
blob_path: String,
/// Target database name
// Name of the Kusto database the data will ingest into
database_name: String,
/// Target table name
// Name of the Kusto table the the data will ingest into
table_name: String,
/// Size of the uncompressed data in bytes. Providing this value allows the ingestion service to optimize ingestion by potentially aggregating multiple blobs. This property is optional, but if not given, the service will access the blob just to retrieve the size.
/// Size of the uncompressed data in bytes.
/// Providing this value allows the ingestion service to optimize ingestion by potentially aggregating multiple blobs.
/// Although this property is optional, it is recommended to provide the size as otherwise the service will access the blob just to retrieve the size.
#[serde(skip_serializing_if = "Option::is_none")]
raw_data_size: Option<u64>,
/// If set to `true`, the blob won't be deleted once ingestion is successfully completed. Default is `false`
@ -31,11 +33,13 @@ pub struct QueuedIngestionMessage {
/// If set to `true`, any aggregation will be skipped. Default is `false`
#[serde(skip_serializing_if = "Option::is_none")]
flush_immediately: Option<bool>,
/// Ignores the size limit for data ingestion
#[serde(skip_serializing_if = "Option::is_none")]
ignore_size_limit: Option<bool>,
// according to Go impl, the report level and method could be Option
/// Defines which if any ingestion states are reported
#[serde(skip_serializing_if = "Option::is_none")]
report_level: Option<ReportLevel>,
/// Defines which mechanisms are used to report the ingestion status
#[serde(skip_serializing_if = "Option::is_none")]
report_method: Option<ReportMethod>,
source_message_creation_time: DateTime<Utc>,

Просмотреть файл

@ -3,6 +3,7 @@ use std::sync::Arc;
use anyhow::Result;
use azure_core::base64;
use azure_kusto_data::prelude::KustoClient;
use rand::seq::SliceRandom;
use crate::client_options::QueuedIngestClientOptions;
use crate::descriptors::BlobDescriptor;
@ -56,21 +57,23 @@ impl QueuedIngestClient {
let message =
QueuedIngestionMessage::new(&blob_descriptor, &ingestion_properties, auth_context);
// println!("message as struct: {:#?}\n", message);
// TODO: pick a random queue from the queue clients returned by the resource manager
let queue_client = ingestion_queues.first().unwrap().clone();
// Pick a random queue from the queue clients returned by the resource manager
let mut rng = rand::thread_rng();
let queue_client = ingestion_queues
.choose(&mut rng)
.ok_or(anyhow::anyhow!("Failed to pick a random queue"))?;
// println!("queue_client: {:#?}\n", queue_client);
let message = serde_json::to_string(&message).unwrap();
// println!("message as string: {}\n", message);
// Base64 encode the ingestion message
let message = base64::encode(&message);
// println!("message as base64 encoded string: {}\n", message);
let _resp = queue_client.put_message(message).await?;
// println!("resp: {:#?}\n", resp);
Ok(IngestionResult {
@ -81,22 +84,4 @@ impl QueuedIngestClient {
blob_uri: Some(blob_descriptor.uri()),
})
}
// /// Ingest a local file into Kusto
// pub async fn ingest_from_file(
// &self,
// file_descriptor: FileDescriptor,
// ingestion_properties: IngestionProperties,
// ) -> Result<IngestionResult> {
// unimplemented!()
// }
// /// Ingest a stream into Kusto
// pub async fn ingest_from_stream(
// &self,
// stream_descriptor: StreamDescriptor,
// ingestion_properties: IngestionProperties,
// ) -> Result<IngestionResult> {
// unimplemented!()
// }
}

Просмотреть файл

@ -37,6 +37,7 @@ impl ResourceManager {
}
}
/// Returns the latest [QueueClient]s ready for posting ingestion messages to
pub async fn secured_ready_for_aggregation_queues(&self) -> Result<Vec<QueueClient>> {
Ok(self
.ingest_client_resources