Merge pull request #29 from krishanjmistry/simplified-ingest-client

This commit is contained in:
AsafMah 2024-04-30 09:08:50 +03:00 коммит произвёл GitHub
Родитель 2dc2482c48 4774e29941
Коммит e1dca0af14
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
17 изменённых файлов: 1343 добавлений и 1 удалений

Просмотреть файл

@ -1,3 +1,3 @@
[workspace] [workspace]
members = ["azure-kusto-data"] members = ["azure-kusto-data", "azure-kusto-ingest"]
resolver = "2" resolver = "2"

Просмотреть файл

@ -0,0 +1,26 @@
[package]
name = "azure-kusto-ingest"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
azure-kusto-data = { path = "../azure-kusto-data", default-features = false }
# Azure SDK for Rust crates versions must be kept in sync
azure_core = "0.19"
azure_storage = "0.19"
azure_storage_blobs = "0.19"
azure_storage_queues = "0.19"
async-lock = "3"
rand = "0.8"
serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
thiserror = "1"
time = { version = "0.3", features = ["serde-human-readable", "macros"] }
url = "2"
uuid = { version = "1", features = ["v4", "serde"] }
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

Просмотреть файл

@ -0,0 +1,61 @@
use std::env;
use azure_kusto_data::prelude::{ConnectionString, KustoClient, KustoClientOptions};
use azure_kusto_ingest::data_format::DataFormat;
use azure_kusto_ingest::descriptors::{BlobAuth, BlobDescriptor};
use azure_kusto_ingest::ingestion_properties::IngestionProperties;
use azure_kusto_ingest::queued_ingest::QueuedIngestClient;
/// Example of ingesting data into Kusto from Azure Blob Storage using managed identities.
/// This example enforces that the Kusto cluster has a system assigned managed identity with access to the storage account
///
/// There are some steps that need to be taken to allow for managed identities to work:
/// - Permissions as the ingestor to initiate ingestion
/// https://learn.microsoft.com/en-us/azure/data-explorer/kusto/api/netfx/kusto-ingest-client-permissions
/// - Permissions for Kusto to access storage
/// https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-managed-identity
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cluster_ingest_uri = env::var("KUSTO_INGEST_URI").expect("Must define KUSTO_INGEST_URI");
let user_mi_object_id =
env::var("KUSTO_USER_MI_OBJECT_ID").expect("Must define KUSTO_USER_MI_OBJECT_ID");
// Create a Kusto client with managed identity authentication via the user assigned identity
let kusto_client = KustoClient::new(
ConnectionString::with_managed_identity_auth(cluster_ingest_uri, Some(user_mi_object_id)),
KustoClientOptions::default(),
)?;
// Create a queued ingest client
let queued_ingest_client = QueuedIngestClient::new(kusto_client);
// Define ingestion properties
let ingestion_properties = IngestionProperties {
database_name: env::var("KUSTO_DATABASE_NAME").expect("Must define KUSTO_DATABASE_NAME"),
table_name: env::var("KUSTO_TABLE_NAME").expect("Must define KUSTO_TABLE_NAME"),
// Don't delete the blob on successful ingestion
retain_blob_on_success: Some(true),
// File format of the blob is Parquet
data_format: DataFormat::Parquet,
// Assume the server side default for flush_immediately
flush_immediately: None,
};
// Define the blob to ingest from
let blob_uri = env::var("BLOB_URI").expect("Must define BLOB_URI");
// Define the size of the blob if known, this improves ingestion performance as Kusto does not need to access the blob to determine the size
let blob_size: Option<u64> = match env::var("BLOB_SIZE") {
Ok(blob_size) => Some(blob_size.parse().expect("BLOB_SIZE must be a valid u64")),
Err(_) => None,
};
// Create the blob descriptor, also specifying that the blob should be accessed using the system assigned managed identity of the Kusto cluster
let blob_descriptor = BlobDescriptor::new(blob_uri, blob_size, None)
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);
let _ = queued_ingest_client
.ingest_from_blob(blob_descriptor, ingestion_properties)
.await?;
Ok(())
}

Просмотреть файл

@ -0,0 +1,51 @@
use azure_core::ClientOptions;
/// Allows configurability of ClientOptions for the storage clients used within [QueuedIngestClient](crate::queued_ingest::QueuedIngestClient)
#[derive(Clone, Default)]
pub struct QueuedIngestClientOptions {
pub queue_service_options: ClientOptions,
pub blob_service_options: ClientOptions,
}
impl From<ClientOptions> for QueuedIngestClientOptions {
/// Creates a `QueuedIngestClientOptions` struct where the same [ClientOptions] are used for all services
fn from(client_options: ClientOptions) -> Self {
Self {
queue_service_options: client_options.clone(),
blob_service_options: client_options,
}
}
}
/// Builder for [QueuedIngestClientOptions], call `build()` to create the [QueuedIngestClientOptions]
#[derive(Clone, Default)]
pub struct QueuedIngestClientOptionsBuilder {
queue_service_options: ClientOptions,
blob_service_options: ClientOptions,
}
impl QueuedIngestClientOptionsBuilder {
pub fn new() -> Self {
Self {
queue_service_options: ClientOptions::default(),
blob_service_options: ClientOptions::default(),
}
}
pub fn with_queue_service_options(mut self, queue_service_options: ClientOptions) -> Self {
self.queue_service_options = queue_service_options;
self
}
pub fn with_blob_service_options(mut self, blob_service_options: ClientOptions) -> Self {
self.blob_service_options = blob_service_options;
self
}
pub fn build(self) -> QueuedIngestClientOptions {
QueuedIngestClientOptions {
queue_service_options: self.queue_service_options,
blob_service_options: self.blob_service_options,
}
}
}

Просмотреть файл

@ -0,0 +1,37 @@
use serde::Serialize;
/// All data formats supported by Kusto.
/// Default is [DataFormat::CSV]
#[derive(Serialize, Clone, Debug, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum DataFormat {
ApacheAvro,
Avro,
#[default]
CSV,
JSON,
MultiJSON,
ORC,
Parquet,
PSV,
RAW,
SCSV,
SOHsv,
SingleJSON,
SStream,
TSV,
TSVe,
TXT,
W3CLOGFILE,
}
// Unit tests
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn data_format_default() {
assert_eq!(DataFormat::default(), DataFormat::CSV);
}
}

Просмотреть файл

@ -0,0 +1,148 @@
use uuid::Uuid;
/// Encapsulates the information related to a blob that is required to ingest from a blob
#[derive(Debug, Clone)]
pub struct BlobDescriptor {
uri: String,
pub(crate) size: Option<u64>,
pub(crate) source_id: Uuid,
/// Authentication information for the blob; when [None], the uri is passed through as is
blob_auth: Option<BlobAuth>,
}
impl BlobDescriptor {
/// Create a new BlobDescriptor.
///
/// Parameters:
/// - `uri`: the uri of the blob to ingest from, note you can use the optional helper method `with_blob_auth` to add authentication information to the uri
/// - `size`: although the size is not required, providing it is recommended as it allows Kusto to better plan the ingestion process
/// - `source_id`: optional, useful if tracking ingestion status, if not provided, a random uuid will be generated
pub fn new(uri: impl Into<String>, size: Option<u64>, source_id: Option<Uuid>) -> Self {
let source_id = match source_id {
Some(source_id) => source_id,
None => Uuid::new_v4(),
};
Self {
uri: uri.into(),
size,
source_id,
blob_auth: None,
}
}
/// Mutator to modify the authentication information of the BlobDescriptor
pub fn with_blob_auth(mut self, blob_auth: BlobAuth) -> Self {
self.blob_auth = Some(blob_auth);
self
}
/// Returns the uri with the authentication information concatenated, ready to be serialized into the ingestion message
pub(crate) fn uri(&self) -> String {
match &self.blob_auth {
Some(BlobAuth::SASToken(sas_token)) => {
format!("{}?{}", self.uri, sas_token.as_str())
}
Some(BlobAuth::UserAssignedManagedIdentity(object_id)) => {
format!("{};managed_identity={}", self.uri, object_id)
}
Some(BlobAuth::SystemAssignedManagedIdentity) => {
format!("{};managed_identity=system", self.uri)
}
None => self.uri.to_string(),
}
}
}
/// Helper for adding authentication information to a blob path in the format expected by Kusto
#[derive(Clone)]
pub enum BlobAuth {
/// adds `?<sas_token>` to the blob path
SASToken(String),
/// adds `;managed_identity=<identity>` to the blob path
UserAssignedManagedIdentity(String),
/// adds `;managed_identity=system` to the blob path
SystemAssignedManagedIdentity,
}
/// Custom impl of Debug to avoid leaking sensitive information
impl std::fmt::Debug for BlobAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BlobAuth::SASToken(_) => f.debug_struct("SASToken").finish(),
BlobAuth::UserAssignedManagedIdentity(object_id) => f
.debug_struct("UserAssignedManagedIdentity")
.field("object_id", object_id)
.finish(),
BlobAuth::SystemAssignedManagedIdentity => {
f.debug_struct("SystemAssignedManagedIdentity").finish()
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blob_descriptor_with_no_auth_modification() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let blob_descriptor = BlobDescriptor::new(uri, None, None);
assert_eq!(blob_descriptor.uri(), uri);
}
#[test]
fn blob_descriptor_with_sas_token() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let sas_token = "my_sas_token";
let blob_descriptor = BlobDescriptor::new(uri, None, None)
.with_blob_auth(BlobAuth::SASToken(sas_token.to_string()));
assert_eq!(blob_descriptor.uri(), format!("{uri}?{sas_token}"));
}
#[test]
fn blob_descriptor_with_user_assigned_managed_identity() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let object_id = "my_object_id";
let blob_descriptor = BlobDescriptor::new(uri, None, None)
.with_blob_auth(BlobAuth::UserAssignedManagedIdentity(object_id.to_string()));
assert_eq!(
blob_descriptor.uri(),
format!("{uri};managed_identity={object_id}")
);
}
#[test]
fn blob_descriptor_with_system_assigned_managed_identity() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let blob_descriptor = BlobDescriptor::new(uri, None, None)
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);
assert_eq!(
blob_descriptor.uri(),
format!("{uri};managed_identity=system")
);
}
#[test]
fn blob_descriptor_with_size() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let size = 123;
let blob_descriptor = BlobDescriptor::new(uri, Some(size), None);
assert_eq!(blob_descriptor.size, Some(size));
}
#[test]
fn blob_descriptor_with_source_id() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let source_id = Uuid::new_v4();
let blob_descriptor = BlobDescriptor::new(uri, None, Some(source_id));
assert_eq!(blob_descriptor.source_id, source_id);
}
}

Просмотреть файл

@ -0,0 +1,20 @@
//! Defines [Error] for representing failures in various operations.
/// Error type for kusto ingestion operations.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error raised when failing to obtain ingestion resources.
#[error("Error obtaining ingestion resources: {0}")]
ResourceManagerError(#[from] super::resource_manager::ResourceManagerError),
/// Error relating to (de-)serialization of JSON data
#[error("Error in JSON serialization/deserialization: {0}")]
JsonError(#[from] serde_json::Error),
/// Error occurring within core azure crates
#[error("Error in azure-core: {0}")]
AzureError(#[from] azure_core::error::Error),
}
/// Result type for kusto ingest operations.
pub type Result<T> = std::result::Result<T, Error>;

Просмотреть файл

@ -0,0 +1,119 @@
use serde::Serialize;
use uuid::Uuid;
use crate::{
data_format::DataFormat, descriptors::BlobDescriptor,
ingestion_properties::IngestionProperties,
resource_manager::authorization_context::KustoIdentityToken,
};
use time::{
format_description::well_known::{iso8601, Iso8601},
OffsetDateTime,
};
/// The [DEFAULT](iso8601::Config::DEFAULT) ISO8601 format that the time crate serializes to uses a 6 digit year,
/// Here we create our own serializer function that uses a 4 digit year which is exposed as `kusto_ingest_iso8601_format`
const CONFIG: iso8601::EncodedConfig = iso8601::Config::DEFAULT
.set_year_is_six_digits(false)
.encode();
const FORMAT: Iso8601<CONFIG> = Iso8601::<CONFIG>;
time::serde::format_description!(kusto_ingest_iso8601_format, OffsetDateTime, FORMAT);
/// Message to be serialized as JSON and sent to the ingestion queue
///
/// Basing the ingestion message on
/// https://learn.microsoft.com/en-us/azure/data-explorer/kusto/api/netfx/kusto-ingest-client-rest#ingestion-message-internal-structure
#[derive(Serialize, Debug)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct QueuedIngestionMessage {
/// Message identifier for this upload
id: Uuid,
/// Path (URI) to the blob.
/// This should include any SAS token required to access the blob, or hints to use managed identity auth.
/// Extra permissions are required if the `RetainBlobOnSuccess` option is not true so that the ingestion service can delete the blob once it has completed ingesting the data.
blob_path: String,
// Name of the Kusto database the data will ingest into
database_name: String,
// Name of the Kusto table the the data will ingest into
table_name: String,
/// Size of the uncompressed data in bytes.
/// Providing this value allows the ingestion service to optimize ingestion by potentially aggregating multiple blobs.
/// Although this property is optional, it is recommended to provide the size as otherwise the service will access the blob just to retrieve the size.
#[serde(skip_serializing_if = "Option::is_none")]
raw_data_size: Option<u64>,
/// If set to `true`, the blob won't be deleted once ingestion is successfully completed.
/// Default is `false` when this property is not specified. Note that this has implications on permissions required against the blob.
#[serde(skip_serializing_if = "Option::is_none")]
retain_blob_on_success: Option<bool>,
/// If set to `true`, any server side aggregation will be skipped - thus overriding the batching policy. Default is `false`.
#[serde(skip_serializing_if = "Option::is_none")]
flush_immediately: Option<bool>,
#[serde(with = "kusto_ingest_iso8601_format")]
source_message_creation_time: OffsetDateTime,
// source_message_creation_time: DateTime<Utc>,
// Extra properties added to the ingestion command
additional_properties: AdditionalProperties,
}
impl QueuedIngestionMessage {
pub(crate) fn new(
blob_descriptor: &BlobDescriptor,
ingestion_properties: &IngestionProperties,
authorization_context: KustoIdentityToken,
) -> Self {
let additional_properties = AdditionalProperties {
authorization_context,
data_format: ingestion_properties.data_format.clone(),
};
Self {
id: blob_descriptor.source_id,
blob_path: blob_descriptor.uri(),
raw_data_size: blob_descriptor.size,
database_name: ingestion_properties.database_name.clone(),
table_name: ingestion_properties.table_name.clone(),
retain_blob_on_success: ingestion_properties.retain_blob_on_success,
flush_immediately: ingestion_properties.flush_immediately,
source_message_creation_time: OffsetDateTime::now_utc(),
additional_properties,
}
}
}
/// Additional properties to be added to the ingestion message
/// This struct is modelled on: https://learn.microsoft.com/en-us/azure/data-explorer/ingestion-properties
#[derive(Serialize, Clone, Debug)]
struct AdditionalProperties {
/// Authorization string obtained from Kusto to allow for ingestion
#[serde(rename = "authorizationContext")]
authorization_context: KustoIdentityToken,
#[serde(rename = "format")]
data_format: DataFormat,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn time_custom_iso8601_serialization() {
#[derive(Serialize, Debug)]
struct TestTimeSerialize {
#[serde(with = "kusto_ingest_iso8601_format")]
customised_time_format: time::OffsetDateTime,
}
let test_message = TestTimeSerialize {
customised_time_format: time::OffsetDateTime::from_unix_timestamp_nanos(
1_234_567_890_123_456_789,
)
.unwrap(),
};
let serialized_message = serde_json::to_string(&test_message).unwrap();
assert_eq!(
serialized_message,
"{\"customised_time_format\":\"2009-02-13T23:31:30.123456789Z\"}"
);
}
}

Просмотреть файл

@ -0,0 +1,18 @@
use crate::data_format::DataFormat;
/// Properties of ingestion that can be used when ingesting data into Kusto allowing for customisation of the ingestion process
#[derive(Clone, Debug, Default)]
pub struct IngestionProperties {
/// Name of the database to ingest into
pub database_name: String,
/// Name of the table to ingest into
pub table_name: String,
/// Whether the blob is retained after ingestion.
/// Note that the default when not provided is `false`, meaning that Kusto will attempt to delete the blob upon ingestion.
/// This will only be successful if provided sufficient permissions on the blob
pub retain_blob_on_success: Option<bool>,
/// Format of the data being ingested
pub data_format: DataFormat,
/// If set to `true`, any aggregation will be skipped. Default is `false`
pub flush_immediately: Option<bool>,
}

Просмотреть файл

@ -0,0 +1,8 @@
pub mod client_options;
pub mod data_format;
pub mod descriptors;
pub mod error;
pub(crate) mod ingestion_blob_info;
pub mod ingestion_properties;
pub mod queued_ingest;
pub(crate) mod resource_manager;

Просмотреть файл

@ -0,0 +1,62 @@
use std::sync::Arc;
use crate::error::Result;
use azure_core::base64;
use azure_kusto_data::prelude::KustoClient;
use crate::client_options::QueuedIngestClientOptions;
use crate::descriptors::BlobDescriptor;
use crate::ingestion_blob_info::QueuedIngestionMessage;
use crate::ingestion_properties::IngestionProperties;
use crate::resource_manager::ResourceManager;
/// Client for ingesting data into Kusto using the queued flavour of ingestion
#[derive(Clone)]
pub struct QueuedIngestClient {
resource_manager: Arc<ResourceManager>,
}
impl QueuedIngestClient {
/// Creates a new client from the given [KustoClient].
///
/// **WARNING**: the [KustoClient] must be created with a connection string that points to the ingestion endpoint
pub fn new(kusto_client: KustoClient) -> Self {
Self::new_with_client_options(kusto_client, QueuedIngestClientOptions::default())
}
/// Creates a new client from the given [KustoClient] and [QueuedIngestClientOptions]
/// This allows for customisation of the [ClientOptions] used for the storage clients
///
/// **WARNING**: the [KustoClient] must be created with a connection string that points to the ingestion endpoint
pub fn new_with_client_options(
kusto_client: KustoClient,
options: QueuedIngestClientOptions,
) -> Self {
Self {
resource_manager: Arc::new(ResourceManager::new(kusto_client, options)),
}
}
/// Ingest a file into Kusto from Azure Blob Storage
pub async fn ingest_from_blob(
&self,
blob_descriptor: BlobDescriptor,
ingestion_properties: IngestionProperties,
) -> Result<()> {
let queue_client = self.resource_manager.random_ingestion_queue().await?;
let auth_context = self.resource_manager.authorization_context().await?;
let message =
QueuedIngestionMessage::new(&blob_descriptor, &ingestion_properties, auth_context);
let message = serde_json::to_string(&message)?;
// Base64 encode the ingestion message
let message = base64::encode(&message);
let _resp = queue_client.put_message(message).await?;
Ok(())
}
}

Просмотреть файл

@ -0,0 +1,81 @@
use std::{sync::Arc, time::Duration};
pub mod authorization_context;
pub mod cache;
pub mod ingest_client_resources;
pub mod resource_uri;
pub mod utils;
use azure_kusto_data::prelude::KustoClient;
use azure_storage_queues::QueueClient;
use crate::client_options::QueuedIngestClientOptions;
use self::{
authorization_context::{AuthorizationContext, KustoIdentityToken},
ingest_client_resources::IngestClientResources,
};
use rand::{seq::SliceRandom, thread_rng};
pub const RESOURCE_REFRESH_PERIOD: Duration = Duration::from_secs(60 * 60);
#[derive(Debug, thiserror::Error)]
pub enum ResourceManagerError {
#[error("Failed to obtain ingestion resources: {0}")]
IngestClientResourcesError(#[from] ingest_client_resources::IngestionResourceError),
#[error("Failed to obtain authorization token: {0}")]
AuthorizationContextError(#[from] authorization_context::KustoIdentityTokenError),
#[error("Failed to select a resource - no resources found")]
NoResourcesFound,
}
type Result<T> = std::result::Result<T, ResourceManagerError>;
/// ResourceManager is a struct that keeps track of all the resources required for ingestion using the queued flavour
pub struct ResourceManager {
ingest_client_resources: Arc<IngestClientResources>,
authorization_context: Arc<AuthorizationContext>,
}
impl ResourceManager {
/// Creates a new ResourceManager from the given [KustoClient] and the [QueuedIngestClientOptions] as provided by the user
pub fn new(client: KustoClient, client_options: QueuedIngestClientOptions) -> Self {
Self {
ingest_client_resources: Arc::new(IngestClientResources::new(
client.clone(),
client_options,
)),
authorization_context: Arc::new(AuthorizationContext::new(client)),
}
}
/// Returns the latest [QueueClient]s ready for posting ingestion messages to
async fn ingestion_queues(&self) -> Result<Vec<QueueClient>> {
Ok(self.ingest_client_resources.get().await?.ingestion_queues)
}
/// Returns a [QueueClient] to ingest to.
/// This is a random selection from the list of ingestion queues
pub async fn random_ingestion_queue(&self) -> Result<QueueClient> {
let ingestion_queues = self.ingestion_queues().await?;
let mut rng = thread_rng();
let selected_queue = ingestion_queues
.choose(&mut rng)
.ok_or(ResourceManagerError::NoResourcesFound)?;
Ok(selected_queue.clone())
}
/// Returns the latest [KustoIdentityToken] to be added as an authorization context to ingestion messages
pub async fn authorization_context(&self) -> Result<KustoIdentityToken> {
self.authorization_context
.get()
.await
.map_err(ResourceManagerError::AuthorizationContextError)
}
}

Просмотреть файл

@ -0,0 +1,103 @@
use azure_kusto_data::prelude::KustoClient;
use serde_json::Value;
use super::cache::ThreadSafeCachedValue;
use super::utils::get_column_index;
use super::RESOURCE_REFRESH_PERIOD;
pub(crate) type KustoIdentityToken = String;
const AUTHORIZATION_CONTEXT: &str = "AuthorizationContext";
#[derive(thiserror::Error, Debug)]
pub enum KustoIdentityTokenError {
#[error("Kusto expected 1 table in results, found {0}")]
ExpectedOneTable(usize),
#[error("Kusto expected 1 row in table, found {0}")]
ExpectedOneRow(usize),
#[error("Column {0} not found in table")]
ColumnNotFound(String),
#[error("Invalid JSON response from Kusto: {0:?}")]
InvalidJSONResponse(Value),
#[error("Token is empty")]
EmptyToken,
#[error(transparent)]
KustoError(#[from] azure_kusto_data::error::Error),
}
type Result<T> = std::result::Result<T, KustoIdentityTokenError>;
/// Logic to obtain a Kusto identity token from the management endpoint. This auth token is a temporary token
#[derive(Debug, Clone)]
pub(crate) struct AuthorizationContext {
/// A client against a Kusto ingestion cluster
client: KustoClient,
/// Cache of the Kusto identity token
token_cache: ThreadSafeCachedValue<KustoIdentityToken>,
}
impl AuthorizationContext {
pub fn new(client: KustoClient) -> Self {
Self {
client,
token_cache: ThreadSafeCachedValue::new(RESOURCE_REFRESH_PERIOD),
}
}
/// Executes a KQL query to get the Kusto identity token from the management endpoint
async fn query_kusto_identity_token(&self) -> Result<KustoIdentityToken> {
let results = self
.client
.execute_command("NetDefaultDB", ".get kusto identity token", None)
.await?;
// Check that there is only 1 table in the results returned by the query
let table = match &results.tables[..] {
[a] => a,
_ => {
return Err(KustoIdentityTokenError::ExpectedOneTable(
results.tables.len(),
))
}
};
// Check that a column in this table actually exists called `AuthorizationContext`
let index = get_column_index(table, AUTHORIZATION_CONTEXT).ok_or(
KustoIdentityTokenError::ColumnNotFound(AUTHORIZATION_CONTEXT.into()),
)?;
// Check that there is only 1 row in the table, and that the value in the first row at the given index is not empty
let token = match &table.rows[..] {
[row] => row
.get(index)
.ok_or(KustoIdentityTokenError::ColumnNotFound(
AUTHORIZATION_CONTEXT.into(),
))?,
_ => return Err(KustoIdentityTokenError::ExpectedOneRow(table.rows.len())),
};
// Convert the JSON string into a Rust string
let token = token
.as_str()
.ok_or(KustoIdentityTokenError::InvalidJSONResponse(
token.to_owned(),
))?;
if token.chars().all(char::is_whitespace) {
return Err(KustoIdentityTokenError::EmptyToken);
}
Ok(token.to_string())
}
/// Fetches the latest Kusto identity token, either retrieving from cache if valid, or by executing a KQL query
pub(crate) async fn get(&self) -> Result<KustoIdentityToken> {
self.token_cache
.get(self.query_kusto_identity_token())
.await
}
}

Просмотреть файл

@ -0,0 +1,183 @@
use std::{
error::Error,
future::Future,
sync::Arc,
time::{Duration, Instant},
};
use async_lock::RwLock;
/// Wrapper around a value that allows for storing when the value was last updated,
/// as well as the period after which it should be refreshed (i.e. expired)
#[derive(Debug, Clone)]
pub struct Cached<T> {
inner: T,
last_updated: Instant,
refresh_period: Duration,
}
impl<T> Cached<T> {
pub fn new(inner: T, refresh_period: Duration) -> Self {
Self {
inner,
last_updated: Instant::now(),
refresh_period,
}
}
pub fn get(&self) -> &T {
&self.inner
}
pub fn is_expired(&self) -> bool {
self.last_updated.elapsed() >= self.refresh_period
}
pub fn update(&mut self, inner: T) {
self.inner = inner;
self.last_updated = Instant::now();
}
}
#[derive(Debug, Clone)]
pub struct ThreadSafeCachedValue<T>
where
T: Clone,
{
cache: Arc<RwLock<Cached<Option<T>>>>,
}
impl<T: Clone> ThreadSafeCachedValue<T> {
pub fn new(refresh_period: Duration) -> Self {
Self {
cache: Arc::new(RwLock::new(Cached::new(None, refresh_period))),
}
}
/// Fetches the latest value, either retrieving from cache if valid, or by executing the callback
pub async fn get<F, E: Error>(&self, callback: F) -> Result<T, E>
where
F: Future<Output = Result<T, E>>,
{
// First, try to get a value from the cache by obtaining a read lock
{
let cache = self.cache.read().await;
if !cache.is_expired() {
if let Some(cached_value) = cache.get() {
return Ok(cached_value.clone());
}
}
}
// Obtain a write lock to refresh the cached value
let mut cache = self.cache.write().await;
// Again attempt to return from cache, check is done in case another thread
// refreshed the cached value while we were waiting on the write lock and its now valid
if !cache.is_expired() {
if let Some(cached_value) = cache.get() {
return Ok(cached_value.clone());
}
}
// Fetch new value by executing the callback, update the cache, and return the value
let fetched_value = callback.await?;
cache.update(Some(fetched_value.clone()));
Ok(fetched_value)
}
}
#[cfg(test)]
mod cached_tests {
use super::*;
use std::time::Duration;
#[test]
fn test_cached_get() {
let value = "hello";
let cached_string = Cached::new(value.to_string(), Duration::from_secs(60));
assert_eq!(cached_string.get(), value);
}
#[test]
fn test_cached_is_expired() {
let value = "hello";
let mut cached_string = Cached::new(value.to_string(), Duration::from_secs(60));
assert!(!cached_string.is_expired());
cached_string.last_updated = Instant::now() - Duration::from_secs(61);
assert!(cached_string.is_expired());
}
#[test]
fn test_cached_update() {
let value = "hello";
let mut cached_string = Cached::new(value.to_string(), Duration::from_secs(60));
assert_eq!(cached_string.get(), value);
let new_value = "world";
cached_string.update(new_value.to_string());
assert!(!cached_string.is_expired());
assert_eq!(cached_string.get(), new_value);
}
}
#[cfg(test)]
mod thread_safe_cached_value_tests {
use super::*;
use std::{fmt::Error, sync::Mutex};
#[derive(Debug)]
struct MockToken {
get_token_call_count: Mutex<usize>,
}
impl MockToken {
fn new() -> Self {
Self {
get_token_call_count: Mutex::new(0),
}
}
async fn get_new_token(&self) -> Result<usize, Error> {
// Include an incrementing counter in the token to track how many times the token has been refreshed
let mut call_count = self.get_token_call_count.lock().unwrap();
*call_count += 1;
Ok(call_count.clone())
}
}
#[tokio::test]
async fn returns_same_value_if_unexpired() -> Result<(), Error> {
let cache = ThreadSafeCachedValue::new(Duration::from_secs(300));
let mock_token = MockToken::new();
let token1 = cache.get(mock_token.get_new_token()).await?;
let token2 = cache.get(mock_token.get_new_token()).await?;
assert_eq!(token1, 1);
assert_eq!(token2, 1);
Ok(())
}
#[tokio::test]
async fn returns_new_value_if_expired() -> Result<(), Error> {
let cache = ThreadSafeCachedValue::new(Duration::from_millis(1));
let mock_token = MockToken::new();
let token1 = cache.get(mock_token.get_new_token()).await?;
// Sleep to ensure the token expires
tokio::time::sleep(Duration::from_secs(1)).await;
let token2 = cache.get(mock_token.get_new_token()).await?;
assert_eq!(token1, 1);
assert_eq!(token2, 2);
Ok(())
}
}

Просмотреть файл

@ -0,0 +1,149 @@
use crate::client_options::QueuedIngestClientOptions;
use super::{
cache::ThreadSafeCachedValue,
resource_uri::{ClientFromResourceUri, ResourceUri},
utils, RESOURCE_REFRESH_PERIOD,
};
use azure_core::ClientOptions;
use azure_kusto_data::{models::TableV1, prelude::KustoClient};
use azure_storage_blobs::prelude::ContainerClient;
use azure_storage_queues::QueueClient;
use serde_json::Value;
#[derive(Debug, thiserror::Error)]
pub enum IngestionResourceError {
#[error("{column_name} column is missing in the table")]
ColumnNotFoundError { column_name: String },
#[error("Response returned from Kusto could not be parsed as a string: {0}")]
ParseAsStringError(Value),
#[error("No {0} resources found in the table")]
NoResourcesFound(String),
#[error(transparent)]
KustoError(#[from] azure_kusto_data::error::Error),
#[error(transparent)]
ResourceUriError(#[from] super::resource_uri::ResourceUriError),
#[error("Kusto expected a table containing ingestion resource results, found no tables")]
NoTablesFound,
}
type Result<T> = std::result::Result<T, IngestionResourceError>;
fn get_column_index(table: &TableV1, column_name: &str) -> Result<usize> {
utils::get_column_index(table, column_name).ok_or(IngestionResourceError::ColumnNotFoundError {
column_name: column_name.to_string(),
})
}
/// Helper to get a resource URI from a table, erroring if there are no resources of the given name
fn get_resource_by_name(table: &TableV1, resource_name: String) -> Result<Vec<ResourceUri>> {
let storage_root_index = get_column_index(table, "StorageRoot")?;
let resource_type_name_index = get_column_index(table, "ResourceTypeName")?;
let resource_uris: Vec<Result<ResourceUri>> = table
.rows
.iter()
.filter(|r| r[resource_type_name_index] == resource_name)
.map(|r| {
let x = r[storage_root_index].as_str().ok_or(
IngestionResourceError::ParseAsStringError(r[storage_root_index].clone()),
)?;
ResourceUri::try_from(x).map_err(IngestionResourceError::ResourceUriError)
})
.collect();
if resource_uris.is_empty() {
return Err(IngestionResourceError::NoResourcesFound(resource_name));
}
resource_uris.into_iter().collect()
}
/// Helper to turn a vector of resource URIs into a vector of Azure clients of type T with the provided [ClientOptions]
fn create_clients_vec<T>(resource_uris: &[ResourceUri], client_options: &ClientOptions) -> Vec<T>
where
T: ClientFromResourceUri,
{
resource_uris
.iter()
.map(|uri| T::create_client(uri.clone(), client_options.clone()))
.collect()
}
/// Storage of the clients required for ingestion
#[derive(Debug, Clone)]
pub struct InnerIngestClientResources {
pub ingestion_queues: Vec<QueueClient>,
pub temp_storage_containers: Vec<ContainerClient>,
}
impl TryFrom<(&TableV1, &QueuedIngestClientOptions)> for InnerIngestClientResources {
type Error = IngestionResourceError;
/// Attempts to create a new InnerIngestClientResources from the given [TableV1] and [QueuedIngestClientOptions]
fn try_from(
(table, client_options): (&TableV1, &QueuedIngestClientOptions),
) -> std::result::Result<Self, Self::Error> {
let secured_ready_for_aggregation_queues =
get_resource_by_name(table, "SecuredReadyForAggregationQueue".to_string())?;
let temp_storage = get_resource_by_name(table, "TempStorage".to_string())?;
Ok(Self {
ingestion_queues: create_clients_vec(
&secured_ready_for_aggregation_queues,
&client_options.queue_service_options,
),
temp_storage_containers: create_clients_vec(
&temp_storage,
&client_options.blob_service_options,
),
})
}
}
pub struct IngestClientResources {
/// A client against a Kusto ingestion cluster
client: KustoClient,
/// Cache of the ingest client resources
resources_cache: ThreadSafeCachedValue<InnerIngestClientResources>,
/// Options to customise the storage clients
client_options: QueuedIngestClientOptions,
}
impl IngestClientResources {
pub fn new(client: KustoClient, client_options: QueuedIngestClientOptions) -> Self {
Self {
client,
resources_cache: ThreadSafeCachedValue::new(RESOURCE_REFRESH_PERIOD),
client_options,
}
}
/// Executes a KQL management query that retrieves resource URIs for the various Azure resources used for ingestion
async fn query_ingestion_resources(&self) -> Result<InnerIngestClientResources> {
let results = self
.client
.execute_command("NetDefaultDB", ".get ingestion resources", None)
.await?;
let new_resources = results
.tables
.first()
.ok_or(IngestionResourceError::NoTablesFound)?;
InnerIngestClientResources::try_from((new_resources, &self.client_options))
}
/// Gets the latest resources either from cache, or fetching from Kusto and updating the cached resources
pub async fn get(&self) -> Result<InnerIngestClientResources> {
self.resources_cache
.get(self.query_ingestion_resources())
.await
}
}

Просмотреть файл

@ -0,0 +1,266 @@
use azure_core::ClientOptions;
use azure_storage::StorageCredentials;
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
use azure_storage_queues::{QueueClient, QueueServiceClientBuilder};
use url::Url;
#[derive(Debug, thiserror::Error)]
pub enum ResourceUriError {
#[error("URI scheme must be 'https', was '{0}'")]
InvalidScheme(String),
#[error("URI host must be a domain")]
InvalidHost,
#[error("Object name is missing in the URI")]
MissingObjectName,
#[error("SAS token is missing in the URI as a query parameter")]
MissingSasToken,
#[error("Account name is missing in the URI")]
MissingAccountName,
#[error(transparent)]
ParseError(#[from] url::ParseError),
#[error(transparent)]
AzureError(#[from] azure_core::Error),
}
/// Parsing logic of resource URIs as returned by the Kusto management endpoint
#[derive(Debug, Clone)]
pub(crate) struct ResourceUri {
pub(crate) service_uri: String,
pub(crate) object_name: String,
pub(crate) account_name: String,
pub(crate) sas_token: StorageCredentials,
}
impl TryFrom<&str> for ResourceUri {
type Error = ResourceUriError;
fn try_from(uri: &str) -> Result<Self, Self::Error> {
let parsed_uri = Url::parse(uri)?;
match parsed_uri.scheme() {
"https" => {}
other_scheme => return Err(ResourceUriError::InvalidScheme(other_scheme.to_string())),
};
let host_string = match parsed_uri.host() {
Some(url::Host::Domain(host_string)) => host_string,
_ => return Err(ResourceUriError::InvalidHost),
};
let service_uri = String::from("https://") + host_string;
// WIBNI: better parsing that this conforms to a storage resource URI,
// perhaps then ResourceUri could take a type like ResourceUri<Queue> or ResourceUri<Container>
let (account_name, _service_endpoint) = host_string
.split_once('.')
.ok_or(ResourceUriError::MissingAccountName)?;
let object_name = match parsed_uri.path_segments() {
Some(mut path_segments) => {
let object_name = match path_segments.next() {
Some(object_name) if !object_name.is_empty() => object_name,
_ => return Err(ResourceUriError::MissingObjectName),
};
// Ensure there is only one path segment (i.e. the object name)
if path_segments.next().is_some() {
return Err(ResourceUriError::MissingObjectName);
};
object_name
}
None => return Err(ResourceUriError::MissingObjectName),
};
let sas_token = parsed_uri
.query()
.ok_or(ResourceUriError::MissingSasToken)?;
let sas_token = StorageCredentials::sas_token(sas_token)?;
Ok(Self {
service_uri,
object_name: object_name.to_string(),
account_name: account_name.to_string(),
sas_token,
})
}
}
/// Trait to be used to create an Azure client from a resource URI with configurability of ClientOptions
pub(crate) trait ClientFromResourceUri {
fn create_client(resource_uri: ResourceUri, client_options: ClientOptions) -> Self;
}
impl ClientFromResourceUri for QueueClient {
fn create_client(resource_uri: ResourceUri, client_options: ClientOptions) -> Self {
QueueServiceClientBuilder::with_location(
azure_storage::CloudLocation::Custom {
uri: resource_uri.service_uri,
account: resource_uri.account_name,
},
resource_uri.sas_token,
)
.client_options(client_options)
.build()
.queue_client(resource_uri.object_name)
}
}
impl ClientFromResourceUri for ContainerClient {
fn create_client(resource_uri: ResourceUri, client_options: ClientOptions) -> Self {
ClientBuilder::with_location(
azure_storage::CloudLocation::Custom {
uri: resource_uri.service_uri,
account: resource_uri.account_name,
},
resource_uri.sas_token,
)
.client_options(client_options)
.container_client(resource_uri.object_name)
}
}
#[cfg(test)]
mod tests {
use azure_storage::StorageCredentialsInner;
use super::*;
use std::convert::TryFrom;
#[test]
fn resource_uri_try_from() {
let uri = "https://storageaccountname.blob.core.windows.com/containerobjectname?sas=token";
let resource_uri = ResourceUri::try_from(uri).unwrap();
assert_eq!(
resource_uri.service_uri,
"https://storageaccountname.blob.core.windows.com"
);
assert_eq!(resource_uri.object_name, "containerobjectname");
let storage_credential_inner = std::sync::Arc::into_inner(resource_uri.sas_token.0)
.unwrap()
.into_inner();
assert!(matches!(
storage_credential_inner,
StorageCredentialsInner::SASToken(_)
));
if let StorageCredentialsInner::SASToken(sas_vec) = storage_credential_inner {
assert_eq!(sas_vec.len(), 1);
assert_eq!(sas_vec[0].0, "sas");
assert_eq!(sas_vec[0].1, "token");
}
}
#[test]
fn invalid_scheme() {
let uri = "http://storageaccountname.blob.core.windows.com/containerobjectname?sas=token";
let resource_uri = ResourceUri::try_from(uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::InvalidScheme(_)
));
}
#[test]
fn missing_host_str() {
let uri = "https:";
let resource_uri = ResourceUri::try_from(uri);
println!("{:#?}", resource_uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::ParseError(_)
));
}
#[test]
fn invalid_host_ipv4() {
let uri = "https://127.0.0.1/containerobjectname?sas=token";
let resource_uri = ResourceUri::try_from(uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::InvalidHost
));
}
#[test]
fn invalid_host_ipv6() {
let uri = "https://[3FFE:FFFF:0::CD30]/containerobjectname?sas=token";
let resource_uri = ResourceUri::try_from(uri);
println!("{:#?}", resource_uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::InvalidHost
));
}
#[test]
fn missing_object_name() {
let uri = "https://storageaccountname.blob.core.windows.com/?sas=token";
let resource_uri = ResourceUri::try_from(uri);
println!("{:#?}", resource_uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::MissingObjectName
));
}
#[test]
fn missing_sas_token() {
let uri = "https://storageaccountname.blob.core.windows.com/containerobjectname";
let resource_uri = ResourceUri::try_from(uri);
println!("{:#?}", resource_uri);
assert!(resource_uri.is_err());
assert!(matches!(
resource_uri.unwrap_err(),
ResourceUriError::MissingSasToken
));
}
#[test]
fn queue_client_from_resource_uri() {
let resource_uri = ResourceUri {
service_uri: "https://mystorageaccount.queue.core.windows.net".to_string(),
object_name: "queuename".to_string(),
account_name: "mystorageaccount".to_string(),
sas_token: StorageCredentials::sas_token("sas=token").unwrap(),
};
let client_options = ClientOptions::default();
let queue_client = QueueClient::create_client(resource_uri, client_options);
assert_eq!(queue_client.queue_name(), "queuename");
}
#[test]
fn container_client_from_resource_uri() {
let resource_uri = ResourceUri {
service_uri: "https://mystorageaccount.blob.core.windows.net".to_string(),
object_name: "containername".to_string(),
account_name: "mystorageaccount".to_string(),
sas_token: StorageCredentials::sas_token("sas=token").unwrap(),
};
let client_options = ClientOptions::default();
let container_client = ContainerClient::create_client(resource_uri, client_options);
assert_eq!(container_client.container_name(), "containername");
}
}

Просмотреть файл

@ -0,0 +1,10 @@
use azure_kusto_data::models::TableV1;
/// Helper to get a column index from a table
// TODO: this could be moved upstream into Kusto Data
pub fn get_column_index(table: &TableV1, column_name: &str) -> Option<usize> {
table
.columns
.iter()
.position(|c| c.column_name == column_name)
}