From eb955f855ce96fbfab763fd938ac0c3d0c93d6dd Mon Sep 17 00:00:00 2001 From: Krishan Mistry Date: Fri, 25 Aug 2023 16:12:42 +0100 Subject: [PATCH] move ingest client resource around --- azure-kusto-ingest/src/resource_manager.rs | 263 +--------------- .../ingest_client_resources.rs | 297 +++++++++++++++++- 2 files changed, 282 insertions(+), 278 deletions(-) diff --git a/azure-kusto-ingest/src/resource_manager.rs b/azure-kusto-ingest/src/resource_manager.rs index e3cc98f..c648e9c 100644 --- a/azure-kusto-ingest/src/resource_manager.rs +++ b/azure-kusto-ingest/src/resource_manager.rs @@ -5,279 +5,20 @@ pub mod cache; pub mod ingest_client_resources; pub mod resource_uri; -use anyhow::{Ok, Result}; -use azure_core::ClientOptions; +use anyhow::Result; use azure_kusto_data::prelude::KustoClient; -use tokio::sync::RwLock; -use azure_data_tables::prelude::TableClient; -use azure_storage_blobs::prelude::ContainerClient; use azure_storage_queues::QueueClient; use crate::client_options::QueuedIngestClientOptions; use self::{ authorization_context::{AuthorizationContext, KustoIdentityToken}, - cache::{Cached, Refreshing}, - resource_uri::{ClientFromResourceUri, ResourceUri}, + ingest_client_resources::IngestClientResources, }; -use self::ingest_client_resources::RawIngestClientResources; - pub(crate) const RESOURCE_REFRESH_PERIOD: Duration = Duration::from_secs(60 * 60); -pub struct InnerIngestClientResources { - kusto_response: Option, - secured_ready_for_aggregation_queues: Vec, - temp_storage: Vec, - ingestions_status_tables: Vec, - successful_ingestions_queues: Vec, - failed_ingestions_queues: Vec, -} - -impl InnerIngestClientResources { - pub fn new() -> Self { - Self { - kusto_response: None, - secured_ready_for_aggregation_queues: Vec::new(), - temp_storage: Vec::new(), - ingestions_status_tables: Vec::new(), - successful_ingestions_queues: Vec::new(), - failed_ingestions_queues: Vec::new(), - } - } -} - -pub struct IngestClientResources { - client: KustoClient, - resources: Refreshing, - client_options: QueuedIngestClientOptions, -} - -impl IngestClientResources { - pub fn new(client: KustoClient, client_options: QueuedIngestClientOptions) -> Self { - Self { - client, - resources: Arc::new(RwLock::new(Cached::new( - InnerIngestClientResources::new(), - RESOURCE_REFRESH_PERIOD, - ))), - client_options, - } - } - - // TODO: Logic to get the Kusto identity token from Kusto management endpoint - handle any validation of the response from the query here - /// Executes a KQL management query that retrieves resource URIs for the various Azure resources used for ingestion - async fn execute_kql_mgmt_query(client: KustoClient) -> Result { - let results = client - .execute_command("NetDefaultDB", ".get ingestion resources", None) - .await?; - - let table = match results.tables.first() { - Some(a) => a, - None => { - return Err(anyhow::anyhow!( - "Kusto expected a table containing ingestion resource results, found no tables", - )) - } - }; - - RawIngestClientResources::try_from(table) - } - - fn create_clients_vec(resource_uris: &[ResourceUri], client_options: ClientOptions) -> Vec - where - T: ClientFromResourceUri, - { - resource_uris - .iter() - .map(|uri| T::create_client(uri.clone(), client_options.clone())) - .collect() - } - - fn update_clients_vec( - current_resources: Vec, - resource_uris: Vec, - client_options: ClientOptions, - ) -> Vec - where - T: ClientFromResourceUri, - { - if !current_resources.is_empty() { - Self::create_clients_vec(&resource_uris, client_options) - } else { - current_resources - } - } - - // 1. Get the kusto response - // 2. Update the kusto response, and the dependent resources if they are not empty, do this by a hashmap on the URI returned - // 3. Update the time - // 4. Return the kusto response - // As such, at any one time it is guaranteed that anything that has been queried before will be available and up to date - // Anything that has not been queried before will be available to create, but not as Azure clients until explicitly queried - /// - async fn update_from_kusto(&self) -> Result { - let resources = self.resources.read().await; - if !resources.is_expired() { - if let Some(ref inner_value) = resources.get().kusto_response { - return Ok(inner_value.clone()); - } - } - // otherwise, drop the read lock and get a write lock to refresh the kusto response - drop(resources); - let mut resources = self.resources.write().await; - - // check again in case another thread refreshed the while we were waiting on the write lock - if let Some(inner_value) = &resources.get().kusto_response { - return Ok(inner_value.clone()); - } - - let raw_ingest_client_resources = Self::execute_kql_mgmt_query(self.client.clone()).await?; - let mut_resources = resources.get_mut(); - - mut_resources.kusto_response = Some(raw_ingest_client_resources.clone()); - - mut_resources.secured_ready_for_aggregation_queues = Self::update_clients_vec( - mut_resources.secured_ready_for_aggregation_queues.clone(), - raw_ingest_client_resources - .secured_ready_for_aggregation_queues - .clone(), - self.client_options.queue_service.clone(), - ); - mut_resources.temp_storage = Self::update_clients_vec( - mut_resources.temp_storage.clone(), - raw_ingest_client_resources.temp_storage.clone(), - self.client_options.blob_service.clone(), - ); - mut_resources.ingestions_status_tables = Self::update_clients_vec( - mut_resources.ingestions_status_tables.clone(), - raw_ingest_client_resources.ingestions_status_tables.clone(), - self.client_options.table_service.clone(), - ); - mut_resources.successful_ingestions_queues = Self::update_clients_vec( - mut_resources.successful_ingestions_queues.clone(), - raw_ingest_client_resources - .successful_ingestions_queues - .clone(), - self.client_options.queue_service.clone(), - ); - mut_resources.failed_ingestions_queues = Self::update_clients_vec( - mut_resources.failed_ingestions_queues.clone(), - raw_ingest_client_resources.failed_ingestions_queues.clone(), - self.client_options.queue_service.clone(), - ); - Ok(raw_ingest_client_resources) - } - - // Logic here - // Get a read lock, try and return the secured ready for aggregation queues - // If they are not empty, return them - // Otherwise, drop the read lock and get a write lock - // Check again if they are empty, if not return them assuming something has changed in between - // Otherwise, get the kusto response, create the queues - // Store the queues, and also return them - pub async fn get_clients( - &self, - field_fn: F, - create_client_vec_fn: Fx, - set_value: Fy, - client_options: ClientOptions, - ) -> Result> - where - F: Fn(&InnerIngestClientResources) -> &Vec, - Fx: Fn(&RawIngestClientResources) -> &Vec, - Fy: Fn(&mut InnerIngestClientResources, &Vec), - T: ClientFromResourceUri + Clone, - { - let resources = self.resources.read().await; - if !resources.is_expired() { - let vecs = field_fn(resources.get()); - if !vecs.is_empty() { - return Ok(vecs.clone()); - } - } - - drop(resources); - - let raw_ingest_client_resources = self.update_from_kusto().await?; - - let mut resources = self.resources.write().await; - let vecs = field_fn(resources.get_mut()); - if !vecs.is_empty() { - return Ok(vecs.clone()); - } - - // First time, so create the resources outside - let mut_resources = resources.get_mut(); - let new_resources = Self::create_clients_vec( - create_client_vec_fn(&raw_ingest_client_resources), - client_options, - ); - set_value(mut_resources, &new_resources); - - Ok(new_resources) - } - - pub async fn get_secured_ready_for_aggregation_queues(&self) -> Result> { - self.get_clients( - |resources| &resources.secured_ready_for_aggregation_queues, - |resources| &resources.secured_ready_for_aggregation_queues, - |mut_resources, new_resources| { - mut_resources.secured_ready_for_aggregation_queues = new_resources.clone() - }, - self.client_options.queue_service.clone(), - ) - .await - } - - // pub async fn get_temp_storage(&self) -> Result> { - // self.get_clients( - // |resources| &resources.temp_storage, - // |resources| &resources.temp_storage, - // |mut_resources, new_resources| mut_resources.temp_storage = new_resources.clone(), - // self.client_options.blob_service.clone(), - // ) - // .await - // } - - // pub async fn get_ingestions_status_tables(&self) -> Result> { - // self.get_clients( - // |resources| &resources.ingestions_status_tables, - // |resources| &resources.ingestions_status_tables, - // |mut_resources, new_resources| { - // mut_resources.ingestions_status_tables = new_resources.clone() - // }, - // self.client_options.table_service.clone(), - // ) - // .await - // } - - // pub async fn get_successful_ingestions_queues(&self) -> Result> { - // self.get_clients( - // |resources| &resources.successful_ingestions_queues, - // |resources| &resources.successful_ingestions_queues, - // |mut_resources, new_resources| { - // mut_resources.successful_ingestions_queues = new_resources.clone() - // }, - // self.client_options.queue_service.clone(), - // ) - // .await - // } - - // pub async fn get_failed_ingestions_queues(&self) -> Result> { - // self.get_clients( - // |resources| &resources.failed_ingestions_queues, - // |resources| &resources.failed_ingestions_queues, - // |mut_resources, new_resources| { - // mut_resources.failed_ingestions_queues = new_resources.clone() - // }, - // self.client_options.queue_service.clone(), - // ) - // .await - // } -} - /// ResourceManager is a struct that keeps track of all the resources required for ingestion using the queued flavour pub struct ResourceManager { ingest_client_resources: Arc, diff --git a/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs b/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs index db57c8c..1b3ef42 100644 --- a/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs +++ b/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs @@ -1,6 +1,15 @@ -use super::resource_uri::ResourceUri; +use std::sync::Arc; + +use crate::client_options::QueuedIngestClientOptions; + +use super::{resource_uri::{ResourceUri, ClientFromResourceUri}, cache::{Refreshing, Cached}, RESOURCE_REFRESH_PERIOD}; use anyhow::Result; -use azure_kusto_data::models::TableV1; +use azure_core::ClientOptions; +use azure_data_tables::prelude::TableClient; +use azure_kusto_data::{models::TableV1, prelude::KustoClient}; +use azure_storage_blobs::prelude::ContainerClient; +use azure_storage_queues::QueueClient; +use tokio::sync::RwLock; #[derive(Debug, Clone)] pub struct RawIngestClientResources { @@ -12,26 +21,28 @@ pub struct RawIngestClientResources { } impl RawIngestClientResources { + /// Helper to get a column index from a table + // TODO: this could be moved upstream - would likely result in a change to the API of this function to return an Option + // As such, error handling would still need to be done at use + fn get_column_index(table: &TableV1, column_name: &str) -> Result { + table + .columns + .iter() + .position(|c| c.column_name == column_name) + .ok_or(anyhow::anyhow!( + "{} column is missing in the table", + column_name + )) + } + + /// Helper to get a resource URI from a table fn get_resource_by_name( table: &TableV1, resource_name: String, err_if_not_found: bool, ) -> Result> { - let storage_root_index = table - .columns - .iter() - .position(|c| c.column_name == "StorageRoot") - .ok_or(anyhow::anyhow!( - "StorageRoot column is missing in the table" - ))?; - - let resource_type_name_index = table - .columns - .iter() - .position(|c| c.column_name == "ResourceTypeName") - .ok_or(anyhow::anyhow!( - "ResourceTypeName column is missing in the table" - ))?; + let storage_root_index = Self::get_column_index(table, "StorageRoot")?; + let resource_type_name_index = Self::get_column_index(table, "ResourceTypeName")?; let resource_uris: Vec> = table .rows @@ -79,3 +90,255 @@ impl TryFrom<&TableV1> for RawIngestClientResources { }) } } + +pub struct InnerIngestClientResources { + kusto_response: Option, + secured_ready_for_aggregation_queues: Vec, + temp_storage: Vec, + ingestions_status_tables: Vec, + successful_ingestions_queues: Vec, + failed_ingestions_queues: Vec, +} + +impl InnerIngestClientResources { + pub fn new() -> Self { + Self { + kusto_response: None, + secured_ready_for_aggregation_queues: Vec::new(), + temp_storage: Vec::new(), + ingestions_status_tables: Vec::new(), + successful_ingestions_queues: Vec::new(), + failed_ingestions_queues: Vec::new(), + } + } +} + +pub struct IngestClientResources { + client: KustoClient, + resources: Refreshing, + client_options: QueuedIngestClientOptions, +} + +impl IngestClientResources { + pub fn new(client: KustoClient, client_options: QueuedIngestClientOptions) -> Self { + Self { + client, + resources: Arc::new(RwLock::new(Cached::new( + InnerIngestClientResources::new(), + RESOURCE_REFRESH_PERIOD, + ))), + client_options, + } + } + + // TODO: Logic to get the Kusto identity token from Kusto management endpoint - handle any validation of the response from the query here + /// Executes a KQL management query that retrieves resource URIs for the various Azure resources used for ingestion + async fn execute_kql_mgmt_query(client: KustoClient) -> Result { + let results = client + .execute_command("NetDefaultDB", ".get ingestion resources", None) + .await?; + + let table = match results.tables.first() { + Some(a) => a, + None => { + return Err(anyhow::anyhow!( + "Kusto expected a table containing ingestion resource results, found no tables", + )) + } + }; + + RawIngestClientResources::try_from(table) + } + + fn create_clients_vec(resource_uris: &[ResourceUri], client_options: ClientOptions) -> Vec + where + T: ClientFromResourceUri, + { + resource_uris + .iter() + .map(|uri| T::create_client(uri.clone(), client_options.clone())) + .collect() + } + + fn update_clients_vec( + current_resources: Vec, + resource_uris: Vec, + client_options: ClientOptions, + ) -> Vec + where + T: ClientFromResourceUri, + { + if !current_resources.is_empty() { + Self::create_clients_vec(&resource_uris, client_options) + } else { + current_resources + } + } + + // 1. Get the kusto response + // 2. Update the kusto response, and the dependent resources if they are not empty, do this by a hashmap on the URI returned + // 3. Update the time + // 4. Return the kusto response + // As such, at any one time it is guaranteed that anything that has been queried before will be available and up to date + // Anything that has not been queried before will be available to create, but not as Azure clients until explicitly queried + /// + async fn update_from_kusto(&self) -> Result { + let resources = self.resources.read().await; + if !resources.is_expired() { + if let Some(ref inner_value) = resources.get().kusto_response { + return Ok(inner_value.clone()); + } + } + // otherwise, drop the read lock and get a write lock to refresh the kusto response + drop(resources); + let mut resources = self.resources.write().await; + + // check again in case another thread refreshed the while we were waiting on the write lock + if let Some(inner_value) = &resources.get().kusto_response { + return Ok(inner_value.clone()); + } + + let raw_ingest_client_resources = Self::execute_kql_mgmt_query(self.client.clone()).await?; + let mut_resources = resources.get_mut(); + + mut_resources.kusto_response = Some(raw_ingest_client_resources.clone()); + + mut_resources.secured_ready_for_aggregation_queues = Self::update_clients_vec( + mut_resources.secured_ready_for_aggregation_queues.clone(), + raw_ingest_client_resources + .secured_ready_for_aggregation_queues + .clone(), + self.client_options.queue_service.clone(), + ); + mut_resources.temp_storage = Self::update_clients_vec( + mut_resources.temp_storage.clone(), + raw_ingest_client_resources.temp_storage.clone(), + self.client_options.blob_service.clone(), + ); + mut_resources.ingestions_status_tables = Self::update_clients_vec( + mut_resources.ingestions_status_tables.clone(), + raw_ingest_client_resources.ingestions_status_tables.clone(), + self.client_options.table_service.clone(), + ); + mut_resources.successful_ingestions_queues = Self::update_clients_vec( + mut_resources.successful_ingestions_queues.clone(), + raw_ingest_client_resources + .successful_ingestions_queues + .clone(), + self.client_options.queue_service.clone(), + ); + mut_resources.failed_ingestions_queues = Self::update_clients_vec( + mut_resources.failed_ingestions_queues.clone(), + raw_ingest_client_resources.failed_ingestions_queues.clone(), + self.client_options.queue_service.clone(), + ); + Ok(raw_ingest_client_resources) + } + + // Logic here + // Get a read lock, try and return the secured ready for aggregation queues + // If they are not empty, return them + // Otherwise, drop the read lock and get a write lock + // Check again if they are empty, if not return them assuming something has changed in between + // Otherwise, get the kusto response, create the queues + // Store the queues, and also return them + pub async fn get_clients( + &self, + field_fn: F, + create_client_vec_fn: Fx, + set_value: Fy, + client_options: ClientOptions, + ) -> Result> + where + F: Fn(&InnerIngestClientResources) -> &Vec, + Fx: Fn(&RawIngestClientResources) -> &Vec, + Fy: Fn(&mut InnerIngestClientResources, &Vec), + T: ClientFromResourceUri + Clone, + { + let resources = self.resources.read().await; + if !resources.is_expired() { + let vecs = field_fn(resources.get()); + if !vecs.is_empty() { + return Ok(vecs.clone()); + } + } + + drop(resources); + + let raw_ingest_client_resources = self.update_from_kusto().await?; + + let mut resources = self.resources.write().await; + let vecs = field_fn(resources.get_mut()); + if !vecs.is_empty() { + return Ok(vecs.clone()); + } + + // First time, so create the resources outside + let mut_resources = resources.get_mut(); + let new_resources = Self::create_clients_vec( + create_client_vec_fn(&raw_ingest_client_resources), + client_options, + ); + set_value(mut_resources, &new_resources); + + Ok(new_resources) + } + + pub async fn get_secured_ready_for_aggregation_queues(&self) -> Result> { + self.get_clients( + |resources| &resources.secured_ready_for_aggregation_queues, + |resources| &resources.secured_ready_for_aggregation_queues, + |mut_resources, new_resources| { + mut_resources.secured_ready_for_aggregation_queues = new_resources.clone() + }, + self.client_options.queue_service.clone(), + ) + .await + } + + // pub async fn get_temp_storage(&self) -> Result> { + // self.get_clients( + // |resources| &resources.temp_storage, + // |resources| &resources.temp_storage, + // |mut_resources, new_resources| mut_resources.temp_storage = new_resources.clone(), + // self.client_options.blob_service.clone(), + // ) + // .await + // } + + // pub async fn get_ingestions_status_tables(&self) -> Result> { + // self.get_clients( + // |resources| &resources.ingestions_status_tables, + // |resources| &resources.ingestions_status_tables, + // |mut_resources, new_resources| { + // mut_resources.ingestions_status_tables = new_resources.clone() + // }, + // self.client_options.table_service.clone(), + // ) + // .await + // } + + // pub async fn get_successful_ingestions_queues(&self) -> Result> { + // self.get_clients( + // |resources| &resources.successful_ingestions_queues, + // |resources| &resources.successful_ingestions_queues, + // |mut_resources, new_resources| { + // mut_resources.successful_ingestions_queues = new_resources.clone() + // }, + // self.client_options.queue_service.clone(), + // ) + // .await + // } + + // pub async fn get_failed_ingestions_queues(&self) -> Result> { + // self.get_clients( + // |resources| &resources.failed_ingestions_queues, + // |resources| &resources.failed_ingestions_queues, + // |mut_resources, new_resources| { + // mut_resources.failed_ingestions_queues = new_resources.clone() + // }, + // self.client_options.queue_service.clone(), + // ) + // .await + // } +}