Initial logic for caching resources
This commit is contained in:
Родитель
846dd72cd1
Коммит
b67607d9e9
|
@ -43,7 +43,7 @@ impl QueuedIngestionMessage {
|
|||
pub fn new(
|
||||
blob_descriptor: BlobDescriptor,
|
||||
ingestion_properties: &IngestionProperties,
|
||||
auth_context: &KustoIdentityToken,
|
||||
auth_context: KustoIdentityToken,
|
||||
) -> Self {
|
||||
let additional_properties = AdditionalProperties {
|
||||
ingestion_mapping: None,
|
||||
|
@ -76,7 +76,7 @@ impl QueuedIngestionMessage {
|
|||
ignore_size_limit: Some(false),
|
||||
// TODO: configurability of creation time
|
||||
source_message_creation_time: String::from("2023-08-16T13:30:04.639714"),
|
||||
additional_properties: additional_properties,
|
||||
additional_properties,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::time::Duration;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use azure_core::base64;
|
||||
|
@ -13,31 +13,31 @@ use crate::result::{IngestionResult, IngestionStatus};
|
|||
pub struct QueuedIngestClient {
|
||||
// The KustoClient is used to get the ingestion resources, it should be a client against the ingestion cluster endpoint
|
||||
// kusto_client: KustoClient,
|
||||
resource_manager: ResourceManager,
|
||||
resource_manager: Arc<ResourceManager>,
|
||||
}
|
||||
|
||||
impl QueuedIngestClient {
|
||||
pub fn new(kusto_client: KustoClient, refresh_period: Duration) -> Self {
|
||||
let resource_manager = ResourceManager::new(kusto_client, refresh_period);
|
||||
pub fn new(kusto_client: KustoClient) -> Self {
|
||||
let resource_manager = Arc::new(ResourceManager::new(kusto_client));
|
||||
|
||||
Self { resource_manager }
|
||||
}
|
||||
|
||||
pub async fn ingest_from_blob(
|
||||
mut self,
|
||||
self,
|
||||
blob_descriptor: BlobDescriptor,
|
||||
ingestion_properties: &IngestionProperties,
|
||||
) -> Result<IngestionResult> {
|
||||
// the queues returned here should ideally be the storage queue client from azure-storage-queue
|
||||
// as such, it may be better for ResourceManager to return a struct that contains the storage queue client
|
||||
// The queues returned here should ideally be the storage queue client from azure-storage-queue
|
||||
// As such, it may be better for ResourceManager to return a struct that contains the storage queue client
|
||||
let ingestion_queues = self
|
||||
.resource_manager
|
||||
.secured_ready_for_aggregation_queues()
|
||||
.await?;
|
||||
println!("queues: {:#?}", ingestion_queues);
|
||||
|
||||
let auth_context = self.resource_manager.authorization_context().await?;
|
||||
|
||||
println!("queues: {:#?}", ingestion_queues);
|
||||
println!("auth_context: {:#?}\n", auth_context);
|
||||
|
||||
let message = QueuedIngestionMessage::new(
|
||||
blob_descriptor.clone(),
|
||||
|
|
|
@ -1,364 +1,205 @@
|
|||
use std::time::{Duration, Instant};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
pub mod authorization_context;
|
||||
pub mod cache;
|
||||
pub mod ingest_client_resources;
|
||||
pub mod resource_uri;
|
||||
|
||||
use anyhow::{Ok, Result};
|
||||
use azure_kusto_data::{models::TableV1, prelude::KustoClient};
|
||||
use azure_storage::StorageCredentials;
|
||||
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
|
||||
use url::Url;
|
||||
use azure_kusto_data::prelude::KustoClient;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use azure_storage_queues::{QueueClient, QueueServiceClientBuilder};
|
||||
use azure_storage_queues::QueueClient;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ResourceUri {
|
||||
uri: String,
|
||||
// parsed_uri: Url,
|
||||
service_uri: String,
|
||||
object_name: String,
|
||||
sas_token: StorageCredentials,
|
||||
}
|
||||
use self::{
|
||||
authorization_context::AuthorizationContext,
|
||||
cache::{Cached, Refreshing},
|
||||
};
|
||||
|
||||
impl ResourceUri {
|
||||
pub fn new(uri: String) -> Self {
|
||||
println!("uri: {:#?}", uri);
|
||||
let parsed_uri = Url::parse(&uri).unwrap();
|
||||
println!("parsed_uri: {:#?}", parsed_uri);
|
||||
use self::ingest_client_resources::RawIngestClientResources;
|
||||
|
||||
let service_uri = parsed_uri.scheme().to_string()
|
||||
+ "://"
|
||||
+ parsed_uri.host_str().expect("We should get result here");
|
||||
let object_name = parsed_uri
|
||||
.path()
|
||||
.trim_start()
|
||||
.trim_start_matches("/")
|
||||
.to_string();
|
||||
let sas_token = parsed_uri
|
||||
.query()
|
||||
.expect("Returned URI should contain SAS token as query")
|
||||
.to_string();
|
||||
let sas_token = StorageCredentials::sas_token(sas_token).unwrap();
|
||||
|
||||
Self {
|
||||
uri,
|
||||
// parsed_uri,
|
||||
service_uri,
|
||||
object_name,
|
||||
sas_token,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn uri(&self) -> &str {
|
||||
self.uri.as_str()
|
||||
}
|
||||
|
||||
pub fn service_uri(&self) -> &str {
|
||||
self.service_uri.as_str()
|
||||
}
|
||||
|
||||
pub fn object_name(&self) -> &str {
|
||||
self.object_name.as_str()
|
||||
}
|
||||
|
||||
pub fn sas_token(&self) -> &StorageCredentials {
|
||||
&self.sas_token
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&ResourceUri> for QueueClient {
|
||||
fn from(resource_uri: &ResourceUri) -> Self {
|
||||
let queue_service =
|
||||
QueueServiceClientBuilder::with_location(azure_storage::CloudLocation::Custom {
|
||||
uri: resource_uri.service_uri().to_string(),
|
||||
credentials: resource_uri.sas_token().clone(),
|
||||
})
|
||||
.build();
|
||||
|
||||
queue_service.queue_client(resource_uri.object_name())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&ResourceUri> for ContainerClient {
|
||||
fn from(resource_uri: &ResourceUri) -> Self {
|
||||
ClientBuilder::with_location(azure_storage::CloudLocation::Custom {
|
||||
uri: resource_uri.service_uri().to_string(),
|
||||
credentials: resource_uri.sas_token().clone(),
|
||||
})
|
||||
.container_client(resource_uri.object_name())
|
||||
}
|
||||
}
|
||||
|
||||
fn get_resource_by_name(table: &TableV1, resource_name: String) -> Vec<ResourceUri> {
|
||||
let storage_root_index = table
|
||||
.columns
|
||||
.iter()
|
||||
.position(|c| c.column_name == "StorageRoot")
|
||||
.unwrap();
|
||||
let resource_type_name_index = table
|
||||
.columns
|
||||
.iter()
|
||||
.position(|c| c.column_name == "ResourceTypeName")
|
||||
.unwrap();
|
||||
|
||||
println!("table: {:#?}", table);
|
||||
let resource_uris: Vec<ResourceUri> = table
|
||||
.rows
|
||||
.iter()
|
||||
.filter(|r| r[resource_type_name_index] == resource_name)
|
||||
.map(|r| {
|
||||
ResourceUri::new(
|
||||
r[storage_root_index]
|
||||
.as_str()
|
||||
.expect("We should get result here")
|
||||
.to_string(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
resource_uris
|
||||
}
|
||||
pub(crate) const RESOURCE_REFRESH_PERIOD: Duration = Duration::from_secs(60 * 60);
|
||||
|
||||
pub struct IngestClientResources {
|
||||
client: KustoClient,
|
||||
secured_ready_for_aggregation_queues: Vec<ResourceUri>,
|
||||
failed_ingestions_queues: Vec<ResourceUri>,
|
||||
successful_ingestions_queues: Vec<ResourceUri>,
|
||||
temp_storage: Vec<ResourceUri>,
|
||||
ingestions_status_tables: Vec<ResourceUri>,
|
||||
last_update: Option<Instant>,
|
||||
refresh_period: Duration,
|
||||
kusto_response: Refreshing<Option<RawIngestClientResources>>,
|
||||
secured_ready_for_aggregation_queues: Refreshing<Vec<QueueClient>>,
|
||||
// secured_ready_for_aggregation_queues: Vec<ResourceUri>,
|
||||
// failed_ingestions_queues: Vec<ResourceUri>,
|
||||
// successful_ingestions_queues: Vec<ResourceUri>,
|
||||
// temp_storage: Vec<ResourceUri>,
|
||||
// ingestions_status_tables: Vec<ResourceUri>,
|
||||
}
|
||||
|
||||
impl IngestClientResources {
|
||||
pub fn new(client: KustoClient, refresh_period: Duration) -> Self {
|
||||
pub fn new(client: KustoClient) -> Self {
|
||||
Self {
|
||||
client,
|
||||
secured_ready_for_aggregation_queues: Vec::new(),
|
||||
failed_ingestions_queues: Vec::new(),
|
||||
successful_ingestions_queues: Vec::new(),
|
||||
temp_storage: Vec::new(),
|
||||
ingestions_status_tables: Vec::new(),
|
||||
last_update: None,
|
||||
refresh_period,
|
||||
kusto_response: Arc::new(RwLock::new(Cached::new(None, RESOURCE_REFRESH_PERIOD))),
|
||||
secured_ready_for_aggregation_queues: Arc::new(RwLock::new(Cached::new(
|
||||
Vec::new(),
|
||||
RESOURCE_REFRESH_PERIOD,
|
||||
))),
|
||||
// secured_ready_for_aggregation_queues: Vec::new(),
|
||||
// failed_ingestions_queues: Vec::new(),
|
||||
// successful_ingestions_queues: Vec::new(),
|
||||
// temp_storage: Vec::new(),
|
||||
// ingestions_status_tables: Vec::new(),
|
||||
// last_update: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn is_not_applicable(&self) -> bool {
|
||||
self.secured_ready_for_aggregation_queues.is_empty()
|
||||
|| self.failed_ingestions_queues.is_empty()
|
||||
|| self.successful_ingestions_queues.is_empty()
|
||||
|| self.temp_storage.is_empty()
|
||||
|| self.ingestions_status_tables.is_empty()
|
||||
}
|
||||
|
||||
// TODO: figure out refresh logic
|
||||
// async fn refresh(&mut self) {
|
||||
// self.get_ingest_client_resources().await
|
||||
// // let interval = tokio::time::interval(self.refresh_period);
|
||||
// // loop {
|
||||
// // match self.get_ingest_client_resources(self.client.clone()).await {
|
||||
// // Ok(_) => todo!(),
|
||||
// // Err(e) => println!("Error: {}", e),
|
||||
// // };
|
||||
|
||||
// // interval.tick().await;
|
||||
// // }
|
||||
|
||||
// // if self.last_update.is_none()
|
||||
// // || self.last_update.unwrap().elapsed() > self.refresh_period
|
||||
// // || self.is_not_applicable()
|
||||
// // {
|
||||
// // self.get_ingest_client_resources(client).await?;
|
||||
// // self.last_update = Some(Instant::now());
|
||||
// // }
|
||||
// // Ok(())
|
||||
// }
|
||||
|
||||
// async fn refresh(&mut self, client: KustoClient) -> Result<()> {
|
||||
// if self.last_update.is_none()
|
||||
// || self.last_update.unwrap().elapsed() > self.refresh_period
|
||||
// || self.is_not_applicable()
|
||||
// {
|
||||
// self.get_ingest_client_resources(client).await?;
|
||||
// self.last_update = Some(Instant::now());
|
||||
// }
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
async fn get_ingest_client_resources(&mut self) -> Result<()> {
|
||||
let results = self
|
||||
.client
|
||||
// TODO: Logic to get the Kusto identity token from Kusto management endpoint - handle validation here
|
||||
async fn execute_kql_mgmt_query(client: KustoClient) -> Result<RawIngestClientResources> {
|
||||
let results = client
|
||||
.execute_command("NetDefaultDB", ".get ingestion resources", None)
|
||||
.await?;
|
||||
let table = results.tables.first().unwrap();
|
||||
|
||||
self.secured_ready_for_aggregation_queues =
|
||||
get_resource_by_name(table, "SecuredReadyForAggregationQueue".to_string());
|
||||
self.failed_ingestions_queues =
|
||||
get_resource_by_name(table, "FailedIngestionsQueue".to_string());
|
||||
self.successful_ingestions_queues =
|
||||
get_resource_by_name(table, "SuccessfulIngestionsQueue".to_string());
|
||||
self.temp_storage = get_resource_by_name(table, "TempStorage".to_string());
|
||||
self.ingestions_status_tables =
|
||||
get_resource_by_name(table, "IngestionsStatusTable".to_string());
|
||||
println!("table: {:#?}", table);
|
||||
RawIngestClientResources::try_from(table)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
async fn get(&self) -> Result<(RawIngestClientResources, Instant)> {
|
||||
let kusto_response = self.kusto_response.read().await;
|
||||
if !kusto_response.is_expired() {
|
||||
if let Some(inner_value) = kusto_response.get() {
|
||||
return Ok((
|
||||
inner_value.clone(),
|
||||
kusto_response.get_last_updated().clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
// otherwise, drop the read lock and get a write lock to refresh the token
|
||||
drop(kusto_response);
|
||||
let mut kusto_response = self.kusto_response.write().await;
|
||||
|
||||
// check again in case another thread refreshed the token while we were
|
||||
// waiting on the write lock
|
||||
if let Some(inner_value) = kusto_response.get() {
|
||||
return Ok((
|
||||
inner_value.clone(),
|
||||
kusto_response.get_last_updated().clone(),
|
||||
));
|
||||
}
|
||||
|
||||
let raw_ingest_client_resources = Self::execute_kql_mgmt_query(self.client.clone()).await?;
|
||||
let last_updated = Instant::now();
|
||||
kusto_response.update_with_time(
|
||||
Some(raw_ingest_client_resources.clone()),
|
||||
last_updated.clone(),
|
||||
);
|
||||
|
||||
Ok((raw_ingest_client_resources, last_updated))
|
||||
}
|
||||
|
||||
pub async fn get_ingestion_queues(&self) -> Result<Vec<QueueClient>> {
|
||||
let secured_ready_for_aggregation_queues =
|
||||
self.secured_ready_for_aggregation_queues.read().await;
|
||||
|
||||
if !secured_ready_for_aggregation_queues.is_expired() {
|
||||
let vecs = secured_ready_for_aggregation_queues.get();
|
||||
if !vecs.is_empty() {
|
||||
return Ok(vecs.clone());
|
||||
}
|
||||
}
|
||||
|
||||
drop(secured_ready_for_aggregation_queues);
|
||||
let mut secured_ready_for_aggregation_queues =
|
||||
self.secured_ready_for_aggregation_queues.write().await;
|
||||
|
||||
let vecs = secured_ready_for_aggregation_queues.get();
|
||||
if !vecs.is_empty() {
|
||||
return Ok(vecs.clone());
|
||||
}
|
||||
|
||||
let (raw_ingest_client_resources, last_updated) = self.get().await?;
|
||||
let queue_uris = raw_ingest_client_resources.secured_ready_for_aggregation_queues;
|
||||
let queue_clients: Vec<QueueClient> =
|
||||
queue_uris.iter().map(|q| QueueClient::from(q)).collect();
|
||||
|
||||
secured_ready_for_aggregation_queues.update_with_time(queue_clients.clone(), last_updated);
|
||||
|
||||
Ok(queue_clients)
|
||||
}
|
||||
}
|
||||
|
||||
pub type KustoIdentityToken = String;
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AuthorizationContext {
|
||||
client: KustoClient,
|
||||
pub kusto_identity_token: KustoIdentityToken,
|
||||
last_update: Option<Instant>,
|
||||
refresh_period: Duration,
|
||||
}
|
||||
|
||||
impl AuthorizationContext {
|
||||
pub fn new(client: KustoClient, refresh_period: Duration) -> Self {
|
||||
Self {
|
||||
client,
|
||||
kusto_identity_token: String::new(),
|
||||
last_update: None,
|
||||
refresh_period,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: figure out refresh logic
|
||||
// Make this spawn a tokio task to refresh the token based on elapsed time
|
||||
async fn refresh(&mut self, client: KustoClient) -> Result<()> {
|
||||
if self.last_update.is_none()
|
||||
|| self.kusto_identity_token.chars().all(char::is_whitespace)
|
||||
|| self.last_update.unwrap().elapsed() > self.refresh_period
|
||||
{
|
||||
self.get_authorization_context(client).await?;
|
||||
self.last_update = Some(Instant::now());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_authorization_context(&mut self, client: KustoClient) -> Result<()> {
|
||||
let results = client
|
||||
.execute_command("NetDefaultDB", ".get kusto identity token", None)
|
||||
.await?;
|
||||
let table = results.tables.first().unwrap();
|
||||
|
||||
println!("table: {:#?}", table);
|
||||
|
||||
self.kusto_identity_token = table
|
||||
.rows
|
||||
.first()
|
||||
.unwrap()
|
||||
.first()
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn kusto_identity_token(
|
||||
&mut self,
|
||||
client: KustoClient,
|
||||
) -> Result<KustoIdentityToken> {
|
||||
self.refresh(client).await?;
|
||||
Ok(self.kusto_identity_token.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ResourceManager {
|
||||
// client: KustoClient,
|
||||
pub ingest_client_resources: IngestClientResources,
|
||||
pub authorization_context: AuthorizationContext,
|
||||
ingest_client_resources: Arc<IngestClientResources>,
|
||||
authorization_context: Arc<AuthorizationContext>,
|
||||
}
|
||||
|
||||
impl ResourceManager {
|
||||
pub fn new(client: KustoClient, refresh_period: Duration) -> Self {
|
||||
pub fn new(client: KustoClient) -> Self {
|
||||
Self {
|
||||
ingest_client_resources: IngestClientResources::new(client.clone(), refresh_period),
|
||||
authorization_context: AuthorizationContext::new(client, refresh_period),
|
||||
ingest_client_resources: Arc::new(IngestClientResources::new(client.clone())),
|
||||
authorization_context: Arc::new(AuthorizationContext::new(client)),
|
||||
}
|
||||
}
|
||||
|
||||
// pub async fn secured_ready_for_aggregation_queues(&mut self) -> Result<Vec<ResourceUri>> {
|
||||
pub async fn secured_ready_for_aggregation_queues(&mut self) -> Result<Vec<QueueClient>> {
|
||||
// TODO: proper refresh and caching logic so we don't need to generate new clients every time
|
||||
self.ingest_client_resources
|
||||
.get_ingest_client_resources()
|
||||
.await?;
|
||||
|
||||
// We should return Azure SDK QueueClient's here.
|
||||
// Although it's recommended to share the same transport, we can't as the storage credentials (SAS tokens) differ per queue.
|
||||
// So the best we can do is store the individual QueueClient's so multiple requests
|
||||
|
||||
let queue_uris = self
|
||||
.ingest_client_resources
|
||||
.secured_ready_for_aggregation_queues
|
||||
.clone();
|
||||
|
||||
Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect())
|
||||
pub async fn secured_ready_for_aggregation_queues(&self) -> Result<Vec<QueueClient>> {
|
||||
self.ingest_client_resources.get_ingestion_queues().await
|
||||
}
|
||||
|
||||
pub async fn failed_ingestions_queues(&mut self) -> Result<Vec<QueueClient>> {
|
||||
// TODO: proper refresh and caching logic so we don't need to generate new clients every time
|
||||
self.ingest_client_resources
|
||||
.get_ingest_client_resources()
|
||||
.await?;
|
||||
// pub async fn failed_ingestions_queues(&mut self) -> Result<Vec<QueueClient>> {
|
||||
// // TODO: proper refresh and caching logic so we don't need to generate new clients every time
|
||||
// self.ingest_client_resources
|
||||
// .get_ingest_client_resources()
|
||||
// .await?;
|
||||
|
||||
let queue_uris = self
|
||||
.ingest_client_resources
|
||||
.failed_ingestions_queues
|
||||
.clone();
|
||||
// let queue_uris = self
|
||||
// .ingest_client_resources
|
||||
// .failed_ingestions_queues
|
||||
// .clone();
|
||||
|
||||
Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect())
|
||||
}
|
||||
// Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect())
|
||||
// }
|
||||
|
||||
pub async fn successful_ingestions_queues(&mut self) -> Result<Vec<QueueClient>> {
|
||||
// TODO: proper refresh and caching logic so we don't need to generate new clients every time
|
||||
self.ingest_client_resources
|
||||
.get_ingest_client_resources()
|
||||
.await?;
|
||||
// pub async fn successful_ingestions_queues(&mut self) -> Result<Vec<QueueClient>> {
|
||||
// // TODO: proper refresh and caching logic so we don't need to generate new clients every time
|
||||
// self.ingest_client_resources
|
||||
// .get_ingest_client_resources()
|
||||
// .await?;
|
||||
|
||||
let queue_uris = self
|
||||
.ingest_client_resources
|
||||
.successful_ingestions_queues
|
||||
.clone();
|
||||
// let queue_uris = self
|
||||
// .ingest_client_resources
|
||||
// .successful_ingestions_queues
|
||||
// .clone();
|
||||
|
||||
Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect())
|
||||
}
|
||||
// Ok(queue_uris.iter().map(|q| QueueClient::from(q)).collect())
|
||||
// }
|
||||
|
||||
pub async fn temp_storage(&mut self) -> Result<Vec<ContainerClient>> {
|
||||
// TODO: proper refresh and caching logic so we don't need to generate new clients every time
|
||||
self.ingest_client_resources
|
||||
.get_ingest_client_resources()
|
||||
.await?;
|
||||
// pub async fn temp_storage(&mut self) -> Result<Vec<ContainerClient>> {
|
||||
// // TODO: proper refresh and caching logic so we don't need to generate new clients every time
|
||||
// self.ingest_client_resources
|
||||
// .get_ingest_client_resources()
|
||||
// .await?;
|
||||
|
||||
let container_uris = self.ingest_client_resources.temp_storage.clone();
|
||||
// let container_uris = self.ingest_client_resources.temp_storage.clone();
|
||||
|
||||
Ok(container_uris
|
||||
.iter()
|
||||
.map(|c| ContainerClient::from(c))
|
||||
.collect())
|
||||
}
|
||||
// Ok(container_uris
|
||||
// .iter()
|
||||
// .map(|c| ContainerClient::from(c))
|
||||
// .collect())
|
||||
// }
|
||||
|
||||
// pub async fn ingestions_status_tables(
|
||||
// &mut self,
|
||||
// client: KustoClient,
|
||||
// ) -> Result<Vec<ResourceUri>> {
|
||||
// self.refresh(client).await?;
|
||||
// Ok(self.ingestions_status_tables.clone())
|
||||
// unimplemented!()
|
||||
// }
|
||||
|
||||
// pub fn retrieve_service_type(self) -> ServiceType {
|
||||
// unimplemented!()
|
||||
// }
|
||||
|
||||
pub async fn authorization_context(&mut self) -> Result<&KustoIdentityToken> {
|
||||
// TODO: proper refresh and caching logic so we don't need to query Kusto for the token every time
|
||||
self.authorization_context
|
||||
.get_authorization_context(self.ingest_client_resources.client.clone())
|
||||
.await?;
|
||||
|
||||
Ok(&self.authorization_context.kusto_identity_token)
|
||||
pub async fn authorization_context(&self) -> Result<KustoIdentityToken> {
|
||||
self.authorization_context.get().await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use azure_kusto_data::prelude::KustoClient;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::cache::{Cached, Refreshing};
|
||||
use super::RESOURCE_REFRESH_PERIOD;
|
||||
|
||||
pub type KustoIdentityToken = String;
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AuthorizationContext {
|
||||
client: KustoClient,
|
||||
auth_context_cache: Refreshing<Option<KustoIdentityToken>>,
|
||||
}
|
||||
|
||||
impl AuthorizationContext {
|
||||
pub fn new(client: KustoClient) -> Self {
|
||||
Self {
|
||||
client,
|
||||
auth_context_cache: Arc::new(RwLock::new(Cached::new(None, RESOURCE_REFRESH_PERIOD))),
|
||||
}
|
||||
}
|
||||
|
||||
// Logic to get the Kusto identity token from Kusto management endpoint - handle validation here
|
||||
async fn execute_kql_mgmt_query(client: KustoClient) -> Result<KustoIdentityToken> {
|
||||
let results = client
|
||||
.execute_command("NetDefaultDB", ".get kusto identity token", None)
|
||||
.await?;
|
||||
// TODO: any other checks, plus error handling
|
||||
let table = results.tables.first().unwrap();
|
||||
|
||||
println!("table: {:#?}", table);
|
||||
|
||||
// TODO: any other checks, plus error handling
|
||||
let kusto_identity_token = table
|
||||
.rows
|
||||
.first()
|
||||
.unwrap()
|
||||
.first()
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
|
||||
if kusto_identity_token.chars().all(char::is_whitespace) {
|
||||
return Err(anyhow::anyhow!("Kusto identity token is empty"));
|
||||
}
|
||||
|
||||
println!("kusto_identity_token: {:#?}", kusto_identity_token);
|
||||
|
||||
Ok(kusto_identity_token)
|
||||
}
|
||||
|
||||
// handle caching here
|
||||
pub async fn get(&self) -> Result<KustoIdentityToken> {
|
||||
let auth_context_cache = self.auth_context_cache.read().await;
|
||||
if !auth_context_cache.is_expired() {
|
||||
if let Some(inner_value) = auth_context_cache.get() {
|
||||
return Ok(inner_value.clone());
|
||||
}
|
||||
}
|
||||
// otherwise, drop the read lock and get a write lock to refresh the token
|
||||
drop(auth_context_cache);
|
||||
let mut auth_context_cache = self.auth_context_cache.write().await;
|
||||
|
||||
// check again in case another thread refreshed the token while we were
|
||||
// waiting on the write lock
|
||||
if let Some(inner_value) = auth_context_cache.get() {
|
||||
return Ok(inner_value.clone());
|
||||
}
|
||||
|
||||
let token = Self::execute_kql_mgmt_query(self.client.clone()).await?;
|
||||
auth_context_cache.update(Some(token.clone()));
|
||||
|
||||
Ok(token)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
use std::{
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Cached<T> {
|
||||
inner: T,
|
||||
last_updated: Instant,
|
||||
refresh_period: Duration,
|
||||
}
|
||||
|
||||
impl<T> Cached<T> {
|
||||
pub fn new(inner: T, refresh_period: Duration) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
last_updated: Instant::now(),
|
||||
refresh_period,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self) -> &T {
|
||||
&self.inner
|
||||
}
|
||||
|
||||
pub fn get_last_updated(&self) -> &Instant {
|
||||
&self.last_updated
|
||||
}
|
||||
|
||||
pub fn is_expired(&self) -> bool {
|
||||
self.last_updated.elapsed() > self.refresh_period
|
||||
}
|
||||
|
||||
pub fn update(&mut self, inner: T) {
|
||||
self.inner = inner;
|
||||
self.last_updated = Instant::now();
|
||||
}
|
||||
|
||||
pub fn update_with_time(&mut self, inner: T, last_updated: Instant) {
|
||||
self.inner = inner;
|
||||
self.last_updated = last_updated;
|
||||
}
|
||||
}
|
||||
|
||||
pub type Refreshing<T> = Arc<RwLock<Cached<T>>>;
|
|
@ -0,0 +1,68 @@
|
|||
use super::resource_uri::ResourceUri;
|
||||
use anyhow::Result;
|
||||
use azure_kusto_data::models::TableV1;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RawIngestClientResources {
|
||||
pub secured_ready_for_aggregation_queues: Vec<ResourceUri>,
|
||||
pub failed_ingestions_queues: Vec<ResourceUri>,
|
||||
pub successful_ingestions_queues: Vec<ResourceUri>,
|
||||
pub temp_storage: Vec<ResourceUri>,
|
||||
pub ingestions_status_tables: Vec<ResourceUri>,
|
||||
}
|
||||
|
||||
impl RawIngestClientResources {
|
||||
fn get_resource_by_name(table: &TableV1, resource_name: String) -> Result<Vec<ResourceUri>> {
|
||||
let storage_root_index = table
|
||||
.columns
|
||||
.iter()
|
||||
.position(|c| c.column_name == "StorageRoot")
|
||||
.unwrap();
|
||||
let resource_type_name_index = table
|
||||
.columns
|
||||
.iter()
|
||||
.position(|c| c.column_name == "ResourceTypeName")
|
||||
.unwrap();
|
||||
|
||||
println!("table: {:#?}", table);
|
||||
let resource_uris: Result<Vec<ResourceUri>> = table
|
||||
.rows
|
||||
.iter()
|
||||
.filter(|r| r[resource_type_name_index] == resource_name)
|
||||
.map(|r| {
|
||||
ResourceUri::try_from(
|
||||
r[storage_root_index]
|
||||
.as_str()
|
||||
.expect("We should get result here")
|
||||
.to_string(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
resource_uris
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&TableV1> for RawIngestClientResources {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(table: &TableV1) -> std::result::Result<Self, Self::Error> {
|
||||
let secured_ready_for_aggregation_queues =
|
||||
Self::get_resource_by_name(table, "SecuredReadyForAggregationQueue".to_string())?;
|
||||
let failed_ingestions_queues =
|
||||
Self::get_resource_by_name(table, "FailedIngestionsQueue".to_string())?;
|
||||
let successful_ingestions_queues =
|
||||
Self::get_resource_by_name(table, "SuccessfulIngestionsQueue".to_string())?;
|
||||
let temp_storage = Self::get_resource_by_name(table, "TempStorage".to_string())?;
|
||||
let ingestions_status_tables =
|
||||
Self::get_resource_by_name(table, "IngestionsStatusTable".to_string())?;
|
||||
|
||||
Ok(Self {
|
||||
secured_ready_for_aggregation_queues,
|
||||
failed_ingestions_queues,
|
||||
successful_ingestions_queues,
|
||||
temp_storage,
|
||||
ingestions_status_tables,
|
||||
})
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
use azure_storage::StorageCredentials;
|
||||
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
|
||||
use azure_storage_queues::{QueueClient, QueueServiceClientBuilder};
|
||||
use url::Url;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ResourceUri {
|
||||
uri: String,
|
||||
service_uri: String,
|
||||
object_name: String,
|
||||
sas_token: StorageCredentials,
|
||||
}
|
||||
|
||||
impl TryFrom<String> for ResourceUri {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(uri: String) -> Result<Self> {
|
||||
println!("uri: {:#?}", uri);
|
||||
let parsed_uri = Url::parse(&uri)?;
|
||||
println!("parsed_uri: {:#?}", parsed_uri);
|
||||
|
||||
let service_uri = parsed_uri.scheme().to_string()
|
||||
+ "://"
|
||||
+ parsed_uri.host_str().expect("We should get result here");
|
||||
let object_name = parsed_uri
|
||||
.path()
|
||||
.trim_start()
|
||||
.trim_start_matches("/")
|
||||
.to_string();
|
||||
let sas_token = parsed_uri
|
||||
.query()
|
||||
.expect("Returned URI should contain SAS token as query")
|
||||
.to_string();
|
||||
let sas_token = StorageCredentials::sas_token(sas_token)?;
|
||||
|
||||
Ok(Self {
|
||||
uri,
|
||||
service_uri,
|
||||
object_name,
|
||||
sas_token,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl ResourceUri {
|
||||
pub fn uri(&self) -> &str {
|
||||
self.uri.as_str()
|
||||
}
|
||||
|
||||
pub fn service_uri(&self) -> &str {
|
||||
self.service_uri.as_str()
|
||||
}
|
||||
|
||||
pub fn object_name(&self) -> &str {
|
||||
self.object_name.as_str()
|
||||
}
|
||||
|
||||
pub fn sas_token(&self) -> &StorageCredentials {
|
||||
&self.sas_token
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&ResourceUri> for QueueClient {
|
||||
fn from(resource_uri: &ResourceUri) -> Self {
|
||||
let queue_service =
|
||||
QueueServiceClientBuilder::with_location(azure_storage::CloudLocation::Custom {
|
||||
uri: resource_uri.service_uri().to_string(),
|
||||
credentials: resource_uri.sas_token().clone(),
|
||||
})
|
||||
.build();
|
||||
|
||||
queue_service.queue_client(resource_uri.object_name())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&ResourceUri> for ContainerClient {
|
||||
fn from(resource_uri: &ResourceUri) -> Self {
|
||||
ClientBuilder::with_location(azure_storage::CloudLocation::Custom {
|
||||
uri: resource_uri.service_uri().to_string(),
|
||||
credentials: resource_uri.sas_token().clone(),
|
||||
})
|
||||
.container_client(resource_uri.object_name())
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче