Simplified ingest client markups (#12)

* resource_uri changes

* markups in client_options

* resource_manager markups

* remove dependency on chrono - use time

* cache changes

* add missing dev dependency feature

* add some basic tests for caching implementation
This commit is contained in:
Krishan 2024-01-26 18:34:07 +00:00 коммит произвёл GitHub
Родитель ab17f7e279
Коммит b8ee0197df
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
9 изменённых файлов: 256 добавлений и 151 удалений

Просмотреть файл

@ -14,13 +14,13 @@ azure_storage_blobs = "0.19"
azure_storage_queues = "0.19"
async-lock = "3"
chrono = { version = "0.4", default-features = false, features = ["serde"] }
rand = "0.8"
serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
thiserror = "1"
time = { version = "0.3", features = ["serde-human-readable", "macros"] }
url = "2"
uuid = { version = "1", features = ["v4", "serde"] }
[dev-dependencies]
tokio = { version = "1", features = ["macros"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

Просмотреть файл

@ -3,16 +3,16 @@ use azure_core::ClientOptions;
/// Allows configurability of ClientOptions for the storage clients used within [QueuedIngestClient](crate::queued_ingest::QueuedIngestClient)
#[derive(Clone, Default)]
pub struct QueuedIngestClientOptions {
pub queue_service: ClientOptions,
pub blob_service: ClientOptions,
pub queue_service_options: ClientOptions,
pub blob_service_options: ClientOptions,
}
impl From<ClientOptions> for QueuedIngestClientOptions {
/// Creates a `QueuedIngestClientOptions` struct where the same [ClientOptions] are used for all services
fn from(client_options: ClientOptions) -> Self {
Self {
queue_service: client_options.clone(),
blob_service: client_options,
queue_service_options: client_options.clone(),
blob_service_options: client_options,
}
}
}
@ -20,32 +20,32 @@ impl From<ClientOptions> for QueuedIngestClientOptions {
/// Builder for [QueuedIngestClientOptions], call `build()` to create the [QueuedIngestClientOptions]
#[derive(Clone, Default)]
pub struct QueuedIngestClientOptionsBuilder {
queue_service: ClientOptions,
blob_service: ClientOptions,
queue_service_options: ClientOptions,
blob_service_options: ClientOptions,
}
impl QueuedIngestClientOptionsBuilder {
pub fn new() -> Self {
Self {
queue_service: ClientOptions::default(),
blob_service: ClientOptions::default(),
queue_service_options: ClientOptions::default(),
blob_service_options: ClientOptions::default(),
}
}
pub fn with_queue_service(mut self, queue_service: ClientOptions) -> Self {
self.queue_service = queue_service;
pub fn with_queue_service_options(mut self, queue_service_options: ClientOptions) -> Self {
self.queue_service_options = queue_service_options;
self
}
pub fn with_blob_service(mut self, blob_service: ClientOptions) -> Self {
self.blob_service = blob_service;
pub fn with_blob_service_options(mut self, blob_service_options: ClientOptions) -> Self {
self.blob_service_options = blob_service_options;
self
}
pub fn build(self) -> QueuedIngestClientOptions {
QueuedIngestClientOptions {
queue_service: self.queue_service,
blob_service: self.blob_service,
queue_service_options: self.queue_service_options,
blob_service_options: self.blob_service_options,
}
}
}

Просмотреть файл

@ -1,4 +1,3 @@
use chrono::{DateTime, Utc};
use serde::Serialize;
use uuid::Uuid;
@ -8,6 +7,18 @@ use crate::{
resource_manager::authorization_context::KustoIdentityToken,
};
use time::{
format_description::well_known::{iso8601, Iso8601},
OffsetDateTime,
};
/// The [DEFAULT](iso8601::Config::DEFAULT) ISO8601 format that the time crate serializes to uses a 6 digit year,
/// Here we create our own serializer function that uses a 4 digit year which is exposed as `kusto_ingest_iso8601_format`
const CONFIG: iso8601::EncodedConfig = iso8601::Config::DEFAULT
.set_year_is_six_digits(false)
.encode();
const FORMAT: Iso8601<CONFIG> = Iso8601::<CONFIG>;
time::serde::format_description!(kusto_ingest_iso8601_format, OffsetDateTime, FORMAT);
/// Message to be serialized as JSON and sent to the ingestion queue
///
/// Basing the ingestion message on
@ -37,7 +48,9 @@ pub(crate) struct QueuedIngestionMessage {
/// If set to `true`, any server side aggregation will be skipped - thus overriding the batching policy. Default is `false`.
#[serde(skip_serializing_if = "Option::is_none")]
flush_immediately: Option<bool>,
source_message_creation_time: DateTime<Utc>,
#[serde(with = "kusto_ingest_iso8601_format")]
source_message_creation_time: OffsetDateTime,
// source_message_creation_time: DateTime<Utc>,
// Extra properties added to the ingestion command
additional_properties: AdditionalProperties,
}
@ -61,7 +74,7 @@ impl QueuedIngestionMessage {
table_name: ingestion_properties.table_name.clone(),
retain_blob_on_success: ingestion_properties.retain_blob_on_success,
flush_immediately: ingestion_properties.flush_immediately,
source_message_creation_time: Utc::now(),
source_message_creation_time: OffsetDateTime::now_utc(),
additional_properties,
}
}
@ -77,3 +90,30 @@ struct AdditionalProperties {
#[serde(rename = "format")]
data_format: DataFormat,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn time_custom_iso8601_serialization() {
#[derive(Serialize, Debug)]
struct TestTimeSerialize {
#[serde(with = "kusto_ingest_iso8601_format")]
customised_time_format: time::OffsetDateTime,
}
let test_message = TestTimeSerialize {
customised_time_format: time::OffsetDateTime::from_unix_timestamp_nanos(
1_234_567_890_123_456_789,
)
.unwrap(),
};
let serialized_message = serde_json::to_string(&test_message).unwrap();
assert_eq!(
serialized_message,
"{\"customised_time_format\":\"2009-02-13T23:31:30.123456789Z\"}"
);
}
}

Просмотреть файл

@ -43,7 +43,7 @@ impl QueuedIngestClient {
blob_descriptor: BlobDescriptor,
ingestion_properties: IngestionProperties,
) -> Result<()> {
let queue_client = self.resource_manager.ingestion_queue().await?;
let queue_client = self.resource_manager.random_ingestion_queue().await?;
let auth_context = self.resource_manager.authorization_context().await?;

Просмотреть файл

@ -17,7 +17,7 @@ use self::{
ingest_client_resources::IngestClientResources,
};
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use rand::{seq::SliceRandom, thread_rng};
pub const RESOURCE_REFRESH_PERIOD: Duration = Duration::from_secs(60 * 60);
@ -60,9 +60,14 @@ impl ResourceManager {
/// Returns a [QueueClient] to ingest to.
/// This is a random selection from the list of ingestion queues
pub async fn ingestion_queue(&self) -> Result<QueueClient> {
pub async fn random_ingestion_queue(&self) -> Result<QueueClient> {
let ingestion_queues = self.ingestion_queues().await?;
let selected_queue = select_random_resource(ingestion_queues)?;
let mut rng = thread_rng();
let selected_queue = ingestion_queues
.choose(&mut rng)
.ok_or(ResourceManagerError::NoResourcesFound)?;
Ok(selected_queue.clone())
}
@ -74,42 +79,3 @@ impl ResourceManager {
.map_err(ResourceManagerError::AuthorizationContextError)
}
}
/// Selects a random resource from the given list of resources
fn select_random_resource<T: Clone>(resources: Vec<T>) -> Result<T> {
let mut rng: StdRng = SeedableRng::from_entropy();
resources
.choose(&mut rng)
.ok_or(ResourceManagerError::NoResourcesFound)
.cloned()
}
#[cfg(test)]
mod select_random_resource_tests {
use super::*;
#[test]
fn single_resource() {
const VALUE: i32 = 1;
let resources = vec![VALUE];
let selected_resource = select_random_resource(resources).unwrap();
assert!(selected_resource == VALUE)
}
#[test]
fn multiple_resources() {
let resources = vec![1, 2, 3, 4, 5];
let selected_resource = select_random_resource(resources.clone()).unwrap();
assert!(resources.contains(&selected_resource));
}
#[test]
fn no_resources() {
let resources: Vec<i32> = vec![];
let selected_resource = select_random_resource(resources);
assert!(selected_resource.is_err());
assert!(matches!(
selected_resource.unwrap_err(),
ResourceManagerError::NoResourcesFound
))
}
}

Просмотреть файл

@ -1,10 +1,7 @@
use std::sync::Arc;
use async_lock::RwLock;
use azure_kusto_data::prelude::KustoClient;
use serde_json::Value;
use super::cache::{Cached, ThreadSafeCachedValue};
use super::cache::ThreadSafeCachedValue;
use super::utils::get_column_index;
use super::RESOURCE_REFRESH_PERIOD;
@ -40,14 +37,14 @@ pub(crate) struct AuthorizationContext {
/// A client against a Kusto ingestion cluster
client: KustoClient,
/// Cache of the Kusto identity token
token_cache: ThreadSafeCachedValue<Option<KustoIdentityToken>>,
token_cache: ThreadSafeCachedValue<KustoIdentityToken>,
}
impl AuthorizationContext {
pub fn new(client: KustoClient) -> Self {
Self {
client,
token_cache: Arc::new(RwLock::new(Cached::new(None, RESOURCE_REFRESH_PERIOD))),
token_cache: ThreadSafeCachedValue::new(RESOURCE_REFRESH_PERIOD),
}
}
@ -99,31 +96,8 @@ impl AuthorizationContext {
/// Fetches the latest Kusto identity token, either retrieving from cache if valid, or by executing a KQL query
pub(crate) async fn get(&self) -> Result<KustoIdentityToken> {
// first, try to get the resources from the cache by obtaining a read lock
{
let token_cache = self.token_cache.read().await;
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
}
}
}
// obtain a write lock to refresh the kusto response
let mut token_cache = self.token_cache.write().await;
// Again attempt to return from cache, check is done in case another thread
// refreshed the token while we were waiting on the write lock
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
}
}
// Fetch new token from Kusto, update the cache, and return the token
let token = self.query_kusto_identity_token().await?;
token_cache.update(Some(token.clone()));
Ok(token)
self.token_cache
.get(self.query_kusto_identity_token())
.await
}
}

Просмотреть файл

@ -1,4 +1,6 @@
use std::{
error::Error,
future::Future,
sync::Arc,
time::{Duration, Instant},
};
@ -37,10 +39,57 @@ impl<T> Cached<T> {
}
}
pub type ThreadSafeCachedValue<T> = Arc<RwLock<Cached<T>>>;
#[derive(Debug, Clone)]
pub struct ThreadSafeCachedValue<T>
where
T: Clone,
{
cache: Arc<RwLock<Cached<Option<T>>>>,
}
impl<T: Clone> ThreadSafeCachedValue<T> {
pub fn new(refresh_period: Duration) -> Self {
Self {
cache: Arc::new(RwLock::new(Cached::new(None, refresh_period))),
}
}
/// Fetches the latest value, either retrieving from cache if valid, or by executing the callback
pub async fn get<F, E: Error>(&self, callback: F) -> Result<T, E>
where
F: Future<Output = Result<T, E>>,
{
// First, try to get a value from the cache by obtaining a read lock
{
let cache = self.cache.read().await;
if !cache.is_expired() {
if let Some(cached_value) = cache.get() {
return Ok(cached_value.clone());
}
}
}
// Obtain a write lock to refresh the cached value
let mut cache = self.cache.write().await;
// Again attempt to return from cache, check is done in case another thread
// refreshed the cached value while we were waiting on the write lock and its now valid
if !cache.is_expired() {
if let Some(cached_value) = cache.get() {
return Ok(cached_value.clone());
}
}
// Fetch new value by executing the callback, update the cache, and return the value
let fetched_value = callback.await?;
cache.update(Some(fetched_value.clone()));
Ok(fetched_value)
}
}
#[cfg(test)]
mod tests {
mod cached_tests {
use super::*;
use std::time::Duration;
@ -78,3 +127,57 @@ mod tests {
assert_eq!(cached_string.get(), new_value);
}
}
#[cfg(test)]
mod thread_safe_cached_value_tests {
use super::*;
use std::{fmt::Error, sync::Mutex};
#[derive(Debug)]
struct MockToken {
get_token_call_count: Mutex<usize>,
}
impl MockToken {
fn new() -> Self {
Self {
get_token_call_count: Mutex::new(0),
}
}
async fn get_new_token(&self) -> Result<usize, Error> {
// Include an incrementing counter in the token to track how many times the token has been refreshed
let mut call_count = self.get_token_call_count.lock().unwrap();
*call_count += 1;
Ok(call_count.clone())
}
}
#[tokio::test]
async fn returns_same_value_if_unexpired() -> Result<(), Error> {
let cache = ThreadSafeCachedValue::new(Duration::from_secs(300));
let mock_token = MockToken::new();
let token1 = cache.get(mock_token.get_new_token()).await?;
let token2 = cache.get(mock_token.get_new_token()).await?;
assert_eq!(token1, 1);
assert_eq!(token2, 1);
Ok(())
}
#[tokio::test]
async fn returns_new_value_if_expired() -> Result<(), Error> {
let cache = ThreadSafeCachedValue::new(Duration::from_millis(1));
let mock_token = MockToken::new();
let token1 = cache.get(mock_token.get_new_token()).await?;
// Sleep to ensure the token expires
tokio::time::sleep(Duration::from_secs(1)).await;
let token2 = cache.get(mock_token.get_new_token()).await?;
assert_eq!(token1, 1);
assert_eq!(token2, 2);
Ok(())
}
}

Просмотреть файл

@ -1,13 +1,11 @@
use std::sync::Arc;
use crate::client_options::QueuedIngestClientOptions;
use super::{
cache::{Cached, ThreadSafeCachedValue},
cache::ThreadSafeCachedValue,
resource_uri::{ClientFromResourceUri, ResourceUri},
utils, RESOURCE_REFRESH_PERIOD,
};
use async_lock::RwLock;
use azure_core::ClientOptions;
use azure_kusto_data::{models::TableV1, prelude::KustoClient};
use azure_storage_blobs::prelude::ContainerClient;
@ -99,19 +97,22 @@ impl TryFrom<(&TableV1, &QueuedIngestClientOptions)> for InnerIngestClientResour
Ok(Self {
ingestion_queues: create_clients_vec(
&secured_ready_for_aggregation_queues,
&client_options.queue_service,
&client_options.queue_service_options,
),
temp_storage_containers: create_clients_vec(
&temp_storage,
&client_options.blob_service,
&client_options.blob_service_options,
),
})
}
}
pub struct IngestClientResources {
/// A client against a Kusto ingestion cluster
client: KustoClient,
resources: ThreadSafeCachedValue<Option<InnerIngestClientResources>>,
/// Cache of the ingest client resources
resources_cache: ThreadSafeCachedValue<InnerIngestClientResources>,
/// Options to customise the storage clients
client_options: QueuedIngestClientOptions,
}
@ -119,7 +120,7 @@ impl IngestClientResources {
pub fn new(client: KustoClient, client_options: QueuedIngestClientOptions) -> Self {
Self {
client,
resources: Arc::new(RwLock::new(Cached::new(None, RESOURCE_REFRESH_PERIOD))),
resources_cache: ThreadSafeCachedValue::new(RESOURCE_REFRESH_PERIOD),
client_options,
}
}
@ -141,29 +142,8 @@ impl IngestClientResources {
/// Gets the latest resources either from cache, or fetching from Kusto and updating the cached resources
pub async fn get(&self) -> Result<InnerIngestClientResources> {
// first, try to get the resources from the cache by obtaining a read lock
{
let resources = self.resources.read().await;
if !resources.is_expired() {
if let Some(inner_value) = resources.get() {
return Ok(inner_value.clone());
}
}
}
// obtain a write lock to refresh the kusto response
let mut resources = self.resources.write().await;
// check again in case another thread refreshed while we were waiting on the write lock
if !resources.is_expired() {
if let Some(inner_value) = resources.get() {
return Ok(inner_value.clone());
}
}
let new_resources = self.query_ingestion_resources().await?;
resources.update(Some(new_resources.clone()));
Ok(new_resources)
self.resources_cache
.get(self.query_ingestion_resources())
.await
}
}

Просмотреть файл

@ -9,6 +9,9 @@ pub enum ResourceUriError {
#[error("URI scheme must be 'https', was '{0}'")]
InvalidScheme(String),
#[error("URI host must be a domain")]
InvalidHost,
#[error("Object name is missing in the URI")]
MissingObjectName,
@ -40,39 +43,49 @@ impl TryFrom<&str> for ResourceUri {
fn try_from(uri: &str) -> Result<Self, Self::Error> {
let parsed_uri = Url::parse(uri)?;
let scheme = match parsed_uri.scheme() {
"https" => "https".to_string(),
match parsed_uri.scheme() {
"https" => {}
other_scheme => return Err(ResourceUriError::InvalidScheme(other_scheme.to_string())),
};
let host_string = parsed_uri
.host_str()
.expect("Url::parse should always return a host for a URI");
let service_uri = scheme + "://" + host_string;
let host_string_components = host_string.split_terminator('.').collect::<Vec<_>>();
if host_string_components.len() < 2 {
return Err(ResourceUriError::MissingAccountName);
}
let account_name = host_string_components[0].to_string();
let object_name = match parsed_uri.path().trim_start().trim_start_matches('/') {
"" => return Err(ResourceUriError::MissingObjectName),
name => name.to_string(),
let host_string = match parsed_uri.host() {
Some(url::Host::Domain(host_string)) => host_string,
_ => return Err(ResourceUriError::InvalidHost),
};
let sas_token = match parsed_uri.query() {
Some(query) => query.to_string(),
None => return Err(ResourceUriError::MissingSasToken),
let service_uri = String::from("https://") + host_string;
// WIBNI: better parsing that this conforms to a storage resource URI,
// perhaps then ResourceUri could take a type like ResourceUri<Queue> or ResourceUri<Container>
let (account_name, _service_endpoint) = host_string
.split_once('.')
.ok_or(ResourceUriError::MissingAccountName)?;
let object_name = match parsed_uri.path_segments() {
Some(mut path_segments) => {
let object_name = match path_segments.next() {
Some(object_name) if !object_name.is_empty() => object_name,
_ => return Err(ResourceUriError::MissingObjectName),
};
// Ensure there is only one path segment (i.e. the object name)
if path_segments.next().is_some() {
return Err(ResourceUriError::MissingObjectName);
};
object_name
}
None => return Err(ResourceUriError::MissingObjectName),
};
let sas_token = parsed_uri
.query()
.ok_or(ResourceUriError::MissingSasToken)?;
let sas_token = StorageCredentials::sas_token(sas_token)?;
Ok(Self {
service_uri,
object_name,
account_name,
object_name: object_name.to_string(),
account_name: account_name.to_string(),
sas_token,
})
}
@ -151,6 +164,10 @@ mod tests {
let resource_uri = ResourceUri::try_from(uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::InvalidScheme(_)
));
}
#[test]
@ -166,6 +183,31 @@ mod tests {
));
}
#[test]
fn invalid_host_ipv4() {
let uri = "https://127.0.0.1/containerobjectname?sas=token";
let resource_uri = ResourceUri::try_from(uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::InvalidHost
));
}
#[test]
fn invalid_host_ipv6() {
let uri = "https://[3FFE:FFFF:0::CD30]/containerobjectname?sas=token";
let resource_uri = ResourceUri::try_from(uri);
println!("{:#?}", resource_uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::InvalidHost
));
}
#[test]
fn missing_object_name() {
let uri = "https://storageaccountname.blob.core.windows.com/?sas=token";