Remove use of anyhow and replace with thiserror, other markups (#7)

* Use thiserror

* use scopes over explicit use of drop

* update syntax

* remove std::fmt::Debug

* qualify thiserror
This commit is contained in:
Krishan 2024-01-18 20:01:17 +00:00 коммит произвёл GitHub
Родитель 6145aadb7b
Коммит f0dddceca7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
10 изменённых файлов: 238 добавлений и 92 удалений

Просмотреть файл

@ -13,11 +13,11 @@ azure_storage = "0.17"
azure_storage_blobs = "0.17"
azure_storage_queues = "0.17"
anyhow = "1"
chrono = { version = "0.4", default-features = false, features = ["serde"] }
rand = "0.8"
serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tracing = { version = "0.1", default-features = false, features = ["std"] }
url = "2"

Просмотреть файл

@ -1,6 +1,5 @@
use std::env;
use anyhow::Result;
use azure_kusto_data::prelude::{ConnectionString, KustoClient, KustoClientOptions};
use azure_kusto_ingest::data_format::DataFormat;
use azure_kusto_ingest::descriptors::{BlobAuth, BlobDescriptor};
@ -16,7 +15,7 @@ use azure_kusto_ingest::queued_ingest::QueuedIngestClient;
/// - Permissions for Kusto to access storage
/// https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-managed-identity
#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cluster_ingest_uri = env::var("KUSTO_INGEST_URI").expect("Must define KUSTO_INGEST_URI");
let user_mi_object_id =
env::var("KUSTO_USER_MI_OBJECT_ID").expect("Must define KUSTO_USER_MI_OBJECT_ID");
@ -54,7 +53,9 @@ async fn main() -> Result<()> {
let blob_descriptor = BlobDescriptor::new(blob_uri, blob_size, None)
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);
queued_ingest_client
let _ = queued_ingest_client
.ingest_from_blob(blob_descriptor, ingestion_properties)
.await
.await?;
Ok(())
}

Просмотреть файл

@ -0,0 +1,20 @@
//! Defines [Error] for representing failures in various operations.
/// Error type for kusto ingestion operations.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error raised when failing to obtain ingestion resources.
#[error("Error obtaining ingestion resources: {0}")]
ResourceManagerError(#[from] super::resource_manager::ResourceManagerError),
/// Error relating to (de-)serialization of JSON data
#[error("Error in JSON serialization/deserialization: {0}")]
JsonError(#[from] serde_json::Error),
/// Error occurring within core azure crates
#[error("Error in azure-core: {0}")]
AzureError(#[from] azure_core::error::Error),
}
/// Result type for kusto ingest operations.
pub type Result<T> = std::result::Result<T, Error>;

Просмотреть файл

@ -1,6 +1,7 @@
pub mod client_options;
pub mod data_format;
pub mod descriptors;
pub mod error;
pub(crate) mod ingestion_blob_info;
pub mod ingestion_properties;
pub mod queued_ingest;

Просмотреть файл

@ -1,11 +1,8 @@
use std::sync::Arc;
use anyhow::Result;
use crate::error::Result;
use azure_core::base64;
use azure_kusto_data::prelude::KustoClient;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use tracing::debug;
use crate::client_options::QueuedIngestClientOptions;
@ -47,8 +44,8 @@ impl QueuedIngestClient {
blob_descriptor: BlobDescriptor,
ingestion_properties: IngestionProperties,
) -> Result<()> {
let ingestion_queues = self.resource_manager.ingestion_queues().await?;
debug!("ingestion queues: {:#?}", ingestion_queues);
let queue_client = self.resource_manager.ingestion_queue().await?;
debug!("ingestion queues: {:#?}", queue_client);
let auth_context = self.resource_manager.authorization_context().await?;
debug!("auth_context: {:#?}\n", auth_context);
@ -57,14 +54,7 @@ impl QueuedIngestClient {
QueuedIngestionMessage::new(&blob_descriptor, &ingestion_properties, auth_context);
debug!("message: {:#?}\n", message);
// Pick a random queue from the queue clients returned by the resource manager
let mut rng: StdRng = SeedableRng::from_entropy();
let queue_client = ingestion_queues
.choose(&mut rng)
.ok_or(anyhow::anyhow!("Failed to pick a random queue"))?;
debug!("randomly seeded queue_client: {:#?}\n", queue_client);
let message = serde_json::to_string(&message).unwrap();
let message = serde_json::to_string(&message)?;
debug!("message as string: {}\n", message);
// Base64 encode the ingestion message

Просмотреть файл

@ -6,7 +6,6 @@ pub mod ingest_client_resources;
pub mod resource_uri;
pub mod utils;
use anyhow::Result;
use azure_kusto_data::prelude::KustoClient;
use azure_storage_queues::QueueClient;
@ -18,8 +17,24 @@ use self::{
ingest_client_resources::IngestClientResources,
};
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
pub const RESOURCE_REFRESH_PERIOD: Duration = Duration::from_secs(60 * 60);
#[derive(Debug, thiserror::Error)]
pub enum ResourceManagerError {
#[error("Failed to obtain ingestion resources: {0}")]
IngestClientResourcesError(#[from] ingest_client_resources::IngestionResourceError),
#[error("Failed to obtain authorization token: {0}")]
AuthorizationContextError(#[from] authorization_context::KustoIdentityTokenError),
#[error("Failed to select a resource - no resources found")]
NoResourcesFound,
}
type Result<T> = std::result::Result<T, ResourceManagerError>;
/// ResourceManager is a struct that keeps track of all the resources required for ingestion using the queued flavour
pub struct ResourceManager {
ingest_client_resources: Arc<IngestClientResources>,
@ -39,12 +54,62 @@ impl ResourceManager {
}
/// Returns the latest [QueueClient]s ready for posting ingestion messages to
pub async fn ingestion_queues(&self) -> Result<Vec<QueueClient>> {
async fn ingestion_queues(&self) -> Result<Vec<QueueClient>> {
Ok(self.ingest_client_resources.get().await?.ingestion_queues)
}
/// Returns a [QueueClient] to ingest to.
/// This is a random selection from the list of ingestion queues
pub async fn ingestion_queue(&self) -> Result<QueueClient> {
let ingestion_queues = self.ingestion_queues().await?;
let selected_queue = select_random_resource(ingestion_queues)?;
Ok(selected_queue.clone())
}
/// Returns the latest [KustoIdentityToken] to be added as an authorization context to ingestion messages
pub async fn authorization_context(&self) -> Result<KustoIdentityToken> {
self.authorization_context.get().await
self.authorization_context
.get()
.await
.map_err(ResourceManagerError::AuthorizationContextError)
}
}
/// Selects a random resource from the given list of resources
fn select_random_resource<T: Clone>(resources: Vec<T>) -> Result<T> {
let mut rng: StdRng = SeedableRng::from_entropy();
resources
.choose(&mut rng)
.ok_or(ResourceManagerError::NoResourcesFound)
.cloned()
}
#[cfg(test)]
mod select_random_resource_tests {
use super::*;
#[test]
fn single_resource() {
const VALUE: i32 = 1;
let resources = vec![VALUE];
let selected_resource = select_random_resource(resources).unwrap();
assert!(selected_resource == VALUE)
}
#[test]
fn multiple_resources() {
let resources = vec![1, 2, 3, 4, 5];
let selected_resource = select_random_resource(resources.clone()).unwrap();
assert!(resources.contains(&selected_resource));
}
#[test]
fn no_resources() {
let resources: Vec<i32> = vec![];
let selected_resource = select_random_resource(resources);
assert!(selected_resource.is_err());
assert!(matches!(
selected_resource.unwrap_err(),
ResourceManagerError::NoResourcesFound
))
}
}

Просмотреть файл

@ -1,7 +1,7 @@
use std::sync::Arc;
use anyhow::Result;
use azure_kusto_data::prelude::KustoClient;
use serde_json::Value;
use tokio::sync::RwLock;
use super::cache::{Cached, ThreadSafeCachedValue};
@ -10,6 +10,30 @@ use super::RESOURCE_REFRESH_PERIOD;
pub(crate) type KustoIdentityToken = String;
const AUTHORIZATION_CONTEXT: &str = "AuthorizationContext";
#[derive(thiserror::Error, Debug)]
pub enum KustoIdentityTokenError {
#[error("Kusto expected 1 table in results, found {0}")]
ExpectedOneTable(usize),
#[error("Kusto expected 1 row in table, found {0}")]
ExpectedOneRow(usize),
#[error("Column {0} not found in table")]
ColumnNotFound(String),
#[error("Invalid JSON response from Kusto: {0:?}")]
InvalidJSONResponse(Value),
#[error("Token is empty")]
EmptyToken,
#[error(transparent)]
KustoError(#[from] azure_kusto_data::error::Error),
}
type Result<T> = std::result::Result<T, KustoIdentityTokenError>;
/// Logic to obtain a Kusto identity token from the management endpoint. This auth token is a temporary token
#[derive(Debug, Clone)]
pub(crate) struct AuthorizationContext {
@ -38,38 +62,36 @@ impl AuthorizationContext {
let table = match &results.tables[..] {
[a] => a,
_ => {
return Err(anyhow::anyhow!(
"Kusto Expected 1 table in results, found {}",
results.tables.len()
return Err(KustoIdentityTokenError::ExpectedOneTable(
results.tables.len(),
))
}
};
// Check that a column in this table actually exists called `AuthorizationContext`
let index = get_column_index(table, "AuthorizationContext")?;
let index = get_column_index(table, AUTHORIZATION_CONTEXT).ok_or(
KustoIdentityTokenError::ColumnNotFound(AUTHORIZATION_CONTEXT.into()),
)?;
// Check that there is only 1 row in the table, and that the value in the first row at the given index is not empty
let token = match &table.rows[..] {
[row] => row.get(index).ok_or(anyhow::anyhow!(
"Kusto response did not contain a value in the first row at position {}",
index
))?,
_ => {
return Err(anyhow::anyhow!(
"Kusto expected 1 row in results, found {}",
table.rows.len()
))
}
[row] => row
.get(index)
.ok_or(KustoIdentityTokenError::ColumnNotFound(
AUTHORIZATION_CONTEXT.into(),
))?,
_ => return Err(KustoIdentityTokenError::ExpectedOneRow(table.rows.len())),
};
// Convert the JSON string into a Rust string
let token = token.as_str().ok_or(anyhow::anyhow!(
"Kusto response did not contain a string value: {:?}",
token
))?;
let token = token
.as_str()
.ok_or(KustoIdentityTokenError::InvalidJSONResponse(
token.to_owned(),
))?;
if token.chars().all(char::is_whitespace) {
return Err(anyhow::anyhow!("Kusto identity token is empty"));
return Err(KustoIdentityTokenError::EmptyToken);
}
Ok(token.to_string())
@ -77,15 +99,17 @@ impl AuthorizationContext {
/// Fetches the latest Kusto identity token, either retrieving from cache if valid, or by executing a KQL query
pub(crate) async fn get(&self) -> Result<KustoIdentityToken> {
// Attempt to get the token from the cache
let token_cache = self.token_cache.read().await;
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
// first, try to get the resources from the cache by obtaining a read lock
{
let token_cache = self.token_cache.read().await;
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
}
}
}
// Drop the read lock and get a write lock to refresh the token
drop(token_cache);
// obtain a write lock to refresh the kusto response
let mut token_cache = self.token_cache.write().await;
// Again attempt to return from cache, check is done in case another thread

Просмотреть файл

@ -5,16 +5,44 @@ use crate::client_options::QueuedIngestClientOptions;
use super::{
cache::{Cached, ThreadSafeCachedValue},
resource_uri::{ClientFromResourceUri, ResourceUri},
utils::get_column_index,
RESOURCE_REFRESH_PERIOD,
utils, RESOURCE_REFRESH_PERIOD,
};
use anyhow::Result;
use azure_core::ClientOptions;
use azure_kusto_data::{models::TableV1, prelude::KustoClient};
use azure_storage_blobs::prelude::ContainerClient;
use azure_storage_queues::QueueClient;
use serde_json::Value;
use tokio::sync::RwLock;
#[derive(Debug, thiserror::Error)]
pub enum IngestionResourceError {
#[error("{column_name} column is missing in the table")]
ColumnNotFoundError { column_name: String },
#[error("Response returned from Kusto could not be parsed as a string: {0}")]
ParseAsStringError(Value),
#[error("No {0} resources found in the table")]
NoResourcesFound(String),
#[error(transparent)]
KustoError(#[from] azure_kusto_data::error::Error),
#[error(transparent)]
ResourceUriError(#[from] super::resource_uri::ResourceUriError),
#[error("Kusto expected a table containing ingestion resource results, found no tables")]
NoTablesFound,
}
type Result<T> = std::result::Result<T, IngestionResourceError>;
fn get_column_index(table: &TableV1, column_name: &str) -> Result<usize> {
utils::get_column_index(table, column_name).ok_or(IngestionResourceError::ColumnNotFoundError {
column_name: column_name.to_string(),
})
}
/// Helper to get a resource URI from a table, erroring if there are no resources of the given name
fn get_resource_by_name(table: &TableV1, resource_name: String) -> Result<Vec<ResourceUri>> {
let storage_root_index = get_column_index(table, "StorageRoot")?;
@ -25,18 +53,15 @@ fn get_resource_by_name(table: &TableV1, resource_name: String) -> Result<Vec<Re
.iter()
.filter(|r| r[resource_type_name_index] == resource_name)
.map(|r| {
ResourceUri::try_from(r[storage_root_index].as_str().ok_or(anyhow::anyhow!(
"Response returned from Kusto could not be parsed as a string {:?}",
r[storage_root_index]
))?)
let x = r[storage_root_index].as_str().ok_or(
IngestionResourceError::ParseAsStringError(r[storage_root_index].clone()),
)?;
ResourceUri::try_from(x).map_err(IngestionResourceError::ResourceUriError)
})
.collect();
if resource_uris.is_empty() {
return Err(anyhow::anyhow!(
"No {} resources found in the table",
resource_name
));
return Err(IngestionResourceError::NoResourcesFound(resource_name));
}
resource_uris.into_iter().collect()
@ -61,10 +86,12 @@ pub struct InnerIngestClientResources {
}
impl TryFrom<(&TableV1, &QueuedIngestClientOptions)> for InnerIngestClientResources {
type Error = anyhow::Error;
type Error = IngestionResourceError;
/// Attempts to create a new InnerIngestClientResources from the given [TableV1] and [QueuedIngestClientOptions]
fn try_from((table, client_options): (&TableV1, &QueuedIngestClientOptions)) -> Result<Self> {
fn try_from(
(table, client_options): (&TableV1, &QueuedIngestClientOptions),
) -> std::result::Result<Self, Self::Error> {
let secured_ready_for_aggregation_queues =
get_resource_by_name(table, "SecuredReadyForAggregationQueue".to_string())?;
let temp_storage = get_resource_by_name(table, "TempStorage".to_string())?;
@ -104,24 +131,27 @@ impl IngestClientResources {
.execute_command("NetDefaultDB", ".get ingestion resources", None)
.await?;
let new_resources = results.tables.first().ok_or(anyhow::anyhow!(
"Kusto expected a table containing ingestion resource results, found no tables",
))?;
let new_resources = results
.tables
.first()
.ok_or(IngestionResourceError::NoTablesFound)?;
InnerIngestClientResources::try_from((new_resources, &self.client_options))
}
/// Gets the latest resources either from cache, or fetching from Kusto and updating the cached resources
pub async fn get(&self) -> Result<InnerIngestClientResources> {
let resources = self.resources.read().await;
if !resources.is_expired() {
if let Some(inner_value) = resources.get() {
return Ok(inner_value.clone());
// first, try to get the resources from the cache by obtaining a read lock
{
let resources = self.resources.read().await;
if !resources.is_expired() {
if let Some(inner_value) = resources.get() {
return Ok(inner_value.clone());
}
}
}
// otherwise, drop the read lock and get a write lock to refresh the kusto response
drop(resources);
// obtain a write lock to refresh the kusto response
let mut resources = self.resources.write().await;
// check again in case another thread refreshed while we were waiting on the write lock

Просмотреть файл

@ -4,7 +4,23 @@ use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
use azure_storage_queues::{QueueClient, QueueServiceClientBuilder};
use url::Url;
use anyhow::Result;
#[derive(Debug, thiserror::Error)]
pub enum ResourceUriError {
#[error("URI scheme must be 'https', was '{0}'")]
InvalidScheme(String),
#[error("Object name is missing in the URI")]
MissingObjectName,
#[error("SAS token is missing in the URI as a query parameter")]
MissingSasToken,
#[error(transparent)]
ParseError(#[from] url::ParseError),
#[error(transparent)]
AzureError(#[from] azure_core::Error),
}
/// Parsing logic of resource URIs as returned by the Kusto management endpoint
#[derive(Debug, Clone)]
@ -15,18 +31,14 @@ pub(crate) struct ResourceUri {
}
impl TryFrom<&str> for ResourceUri {
type Error = anyhow::Error;
type Error = ResourceUriError;
fn try_from(uri: &str) -> Result<Self> {
fn try_from(uri: &str) -> Result<Self, Self::Error> {
let parsed_uri = Url::parse(uri)?;
let scheme = match parsed_uri.scheme() {
"https" => "https".to_string(),
other_scheme => {
return Err(anyhow::anyhow!(
"URI scheme must be 'https', was '{other_scheme}'"
))
}
other_scheme => return Err(ResourceUriError::InvalidScheme(other_scheme.to_string())),
};
let service_uri = scheme
@ -36,17 +48,13 @@ impl TryFrom<&str> for ResourceUri {
.expect("Url::parse should always return a host for a URI");
let object_name = match parsed_uri.path().trim_start().trim_start_matches('/') {
"" => return Err(anyhow::anyhow!("Object name is missing in the URI")),
"" => return Err(ResourceUriError::MissingObjectName),
name => name.to_string(),
};
let sas_token = match parsed_uri.query() {
Some(query) => query.to_string(),
None => {
return Err(anyhow::anyhow!(
"SAS token is missing in the URI as a query parameter"
))
}
None => return Err(ResourceUriError::MissingSasToken),
};
let sas_token = StorageCredentials::sas_token(sas_token)?;
@ -138,6 +146,10 @@ mod tests {
println!("{:#?}", resource_uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::ParseError(_)
));
}
#[test]
@ -147,6 +159,10 @@ mod tests {
println!("{:#?}", resource_uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::MissingObjectName
));
}
#[test]
@ -156,6 +172,10 @@ mod tests {
println!("{:#?}", resource_uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::MissingSasToken
));
}
#[test]

Просмотреть файл

@ -1,15 +1,10 @@
use anyhow::Result;
use azure_kusto_data::models::TableV1;
/// Helper to get a column index from a table
// TODO: this could be moved upstream into Kusto Data - would likely result in a change to the API of this function to return an Option<usize>
pub fn get_column_index(table: &TableV1, column_name: &str) -> Result<usize> {
// TODO: this could be moved upstream into Kusto Data
pub fn get_column_index(table: &TableV1, column_name: &str) -> Option<usize> {
table
.columns
.iter()
.position(|c| c.column_name == column_name)
.ok_or(anyhow::anyhow!(
"{} column is missing in the table",
column_name
))
}