This commit is contained in:
Krishan Mistry 2023-09-05 11:21:26 +01:00
Родитель 26dc69a5d6
Коммит d477e2b0b5
5 изменённых файлов: 45 добавлений и 44 удалений

Просмотреть файл

@ -4,6 +4,7 @@ pub mod authorization_context;
pub mod cache;
pub mod ingest_client_resources;
pub mod resource_uri;
pub mod utils;
use anyhow::Result;
use azure_kusto_data::prelude::KustoClient;

Просмотреть файл

@ -4,25 +4,26 @@ use anyhow::Result;
use azure_kusto_data::prelude::KustoClient;
use tokio::sync::RwLock;
use super::cache::{Cached, Refreshing};
use super::cache::{Cached, ThreadSafeCachedValue};
use super::utils::get_column_index;
use super::RESOURCE_REFRESH_PERIOD;
pub type KustoIdentityToken = String;
pub(crate) type KustoIdentityToken = String;
/// Logic to obtain a Kusto identity token from the management endpoint. This auth token is a temporary token
#[derive(Debug, Clone)]
pub struct AuthorizationContext {
pub(crate) struct AuthorizationContext {
/// A client against a Kusto ingestion cluster
client: KustoClient,
/// Cache of the Kusto identity token
auth_context_cache: Refreshing<Option<KustoIdentityToken>>,
token_cache: ThreadSafeCachedValue<Option<KustoIdentityToken>>,
}
impl AuthorizationContext {
pub fn new(client: KustoClient) -> Self {
Self {
client,
auth_context_cache: Arc::new(RwLock::new(Cached::new(None, RESOURCE_REFRESH_PERIOD))),
token_cache: Arc::new(RwLock::new(Cached::new(None, RESOURCE_REFRESH_PERIOD))),
}
}
@ -45,64 +46,59 @@ impl AuthorizationContext {
};
// Check that a column in this table actually exists called `AuthorizationContext`
let index = table
.columns
.iter()
.position(|c| c.column_name == "AuthorizationContext")
.ok_or(anyhow::anyhow!(
"AuthorizationContext column is missing in the table"
))?;
let index = get_column_index(table, "AuthorizationContext")?;
// Check that there is only 1 row in the table, and that the value in the first row at the given index is not empty
let kusto_identity_token = match &table.rows[..] {
let token = match &table.rows[..] {
[row] => row.get(index).ok_or(anyhow::anyhow!(
"Kusto response did not contain a value in the first row at position {}",
index
))?,
_ => {
return Err(anyhow::anyhow!(
"Kusto Expected 1 row in results, found {}",
"Kusto expected 1 row in results, found {}",
table.rows.len()
))
}
};
// Convert the JSON string into a Rust string
let kusto_identity_token = kusto_identity_token.as_str().ok_or(anyhow::anyhow!(
"Kusto response did not contain a string value"
let token = token.as_str().ok_or(anyhow::anyhow!(
"Kusto response did not contain a string value: {:?}",
token
))?;
if kusto_identity_token.chars().all(char::is_whitespace) {
if token.chars().all(char::is_whitespace) {
return Err(anyhow::anyhow!("Kusto identity token is empty"));
}
Ok(kusto_identity_token.to_string())
Ok(token.to_string())
}
/// Fetches the latest Kusto identity token, either retrieving from cache if valid, or by executing a KQL query
pub async fn get(&self) -> Result<KustoIdentityToken> {
pub(crate) async fn get(&self) -> Result<KustoIdentityToken> {
// Attempt to get the token from the cache
let auth_context_cache = self.auth_context_cache.read().await;
if !auth_context_cache.is_expired() {
if let Some(token) = auth_context_cache.get() {
let token_cache = self.token_cache.read().await;
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
}
}
// Drop the read lock and get a write lock to refresh the token
drop(auth_context_cache);
let mut auth_context_cache = self.auth_context_cache.write().await;
drop(token_cache);
let mut token_cache = self.token_cache.write().await;
// Again attempt to return from cache, check is done in case another thread
// refreshed the token while we were waiting on the write lock
if !auth_context_cache.is_expired() {
if let Some(token) = auth_context_cache.get() {
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
}
}
// Fetch new token from Kusto, update the cache, and return the token
let token = self.query_kusto_identity_token().await?;
auth_context_cache.update(Some(token.clone()));
token_cache.update(Some(token.clone()));
Ok(token)
}

Просмотреть файл

@ -37,7 +37,7 @@ impl<T> Cached<T> {
}
}
pub type Refreshing<T> = Arc<RwLock<Cached<T>>>;
pub type ThreadSafeCachedValue<T> = Arc<RwLock<Cached<T>>>;
#[cfg(test)]
mod tests {

Просмотреть файл

@ -3,8 +3,9 @@ use std::sync::Arc;
use crate::client_options::QueuedIngestClientOptions;
use super::{
cache::{Cached, Refreshing},
cache::{Cached, ThreadSafeCachedValue},
resource_uri::{ClientFromResourceUri, ResourceUri},
utils::get_column_index,
RESOURCE_REFRESH_PERIOD,
};
use anyhow::Result;
@ -14,19 +15,6 @@ use azure_storage_blobs::prelude::ContainerClient;
use azure_storage_queues::QueueClient;
use tokio::sync::RwLock;
/// Helper to get a column index from a table
// TODO: this could be moved upstream into Kusto Data - would likely result in a change to the API of this function to return an Option<usize>
fn get_column_index(table: &TableV1, column_name: &str) -> Result<usize> {
table
.columns
.iter()
.position(|c| c.column_name == column_name)
.ok_or(anyhow::anyhow!(
"{} column is missing in the table",
column_name
))
}
/// Helper to get a resource URI from a table, erroring if there are no resources of the given name
fn get_resource_by_name(table: &TableV1, resource_name: String) -> Result<Vec<ResourceUri>> {
let storage_root_index = get_column_index(table, "StorageRoot")?;
@ -38,7 +26,8 @@ fn get_resource_by_name(table: &TableV1, resource_name: String) -> Result<Vec<Re
.filter(|r| r[resource_type_name_index] == resource_name)
.map(|r| {
ResourceUri::try_from(r[storage_root_index].as_str().ok_or(anyhow::anyhow!(
"Response returned from Kusto could not be parsed as a string"
"Response returned from Kusto could not be parsed as a string {:?}",
r[storage_root_index]
))?)
})
.collect();
@ -95,7 +84,7 @@ impl TryFrom<(&TableV1, &QueuedIngestClientOptions)> for InnerIngestClientResour
pub struct IngestClientResources {
client: KustoClient,
resources: Refreshing<Option<InnerIngestClientResources>>,
resources: ThreadSafeCachedValue<Option<InnerIngestClientResources>>,
client_options: QueuedIngestClientOptions,
}

Просмотреть файл

@ -0,0 +1,15 @@
use anyhow::Result;
use azure_kusto_data::models::TableV1;
/// Helper to get a column index from a table
// TODO: this could be moved upstream into Kusto Data - would likely result in a change to the API of this function to return an Option<usize>
pub fn get_column_index(table: &TableV1, column_name: &str) -> Result<usize> {
table
.columns
.iter()
.position(|c| c.column_name == column_name)
.ok_or(anyhow::anyhow!(
"{} column is missing in the table",
column_name
))
}