initial markups
This commit is contained in:
Родитель
b72d2a9aba
Коммит
e00f9438c7
|
@ -7,7 +7,7 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
azure-kusto-data = {path = "../azure-kusto-data"}
|
||||
# Azure SDK for Rust crates versions should be kept in sync
|
||||
# Azure SDK for Rust crates versions must be kept in sync
|
||||
azure_core = "0.14"
|
||||
azure_storage = "0.14"
|
||||
azure_storage_blobs = "0.14"
|
||||
|
|
|
@ -18,7 +18,6 @@ async fn main() -> Result<()> {
|
|||
let user_mi_object_id = "<managed-identity-object-id>";
|
||||
|
||||
// Create a Kusto client with managed identity authentication via the user assigned identity
|
||||
// Note that this requires
|
||||
let kusto_client = KustoClient::new(
|
||||
ConnectionString::with_managed_identity_auth(
|
||||
cluster_uri,
|
||||
|
@ -47,7 +46,7 @@ async fn main() -> Result<()> {
|
|||
// Define the size of the blob if known, this improves ingestion performance as Kusto does not need to access the blob to determine the size
|
||||
let blob_size = 123;
|
||||
// Create the blob descriptor, also specifying that the blob should be accessed using the system assigned managed identity of the Kusto cluster
|
||||
let blob_descriptor = BlobDescriptor::new(blob_uri.to_string(), Some(blob_size), None)
|
||||
let blob_descriptor = BlobDescriptor::new(blob_uri, Some(blob_size), None)
|
||||
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);
|
||||
|
||||
queued_ingest_client
|
||||
|
|
|
@ -11,14 +11,14 @@ pub struct BlobDescriptor {
|
|||
}
|
||||
|
||||
impl BlobDescriptor {
|
||||
pub fn new(uri: String, size: Option<u64>, source_id: Option<Uuid>) -> Self {
|
||||
pub fn new(uri: impl Into<String>, size: Option<u64>, source_id: Option<Uuid>) -> Self {
|
||||
let source_id = match source_id {
|
||||
Some(source_id) => source_id,
|
||||
None => Uuid::new_v4(),
|
||||
};
|
||||
|
||||
Self {
|
||||
uri,
|
||||
uri: uri.into(),
|
||||
size,
|
||||
source_id,
|
||||
blob_auth: None,
|
||||
|
@ -81,7 +81,7 @@ mod tests {
|
|||
#[test]
|
||||
fn blob_descriptor_with_no_auth_modification() {
|
||||
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
|
||||
let blob_descriptor = BlobDescriptor::new(uri.to_string(), None, None);
|
||||
let blob_descriptor = BlobDescriptor::new(uri, None, None);
|
||||
|
||||
assert_eq!(blob_descriptor.uri(), uri);
|
||||
}
|
||||
|
@ -90,34 +90,34 @@ mod tests {
|
|||
fn blob_descriptor_with_sas_token() {
|
||||
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
|
||||
let sas_token = "my_sas_token";
|
||||
let blob_descriptor = BlobDescriptor::new(uri.to_string(), None, None)
|
||||
let blob_descriptor = BlobDescriptor::new(uri, None, None)
|
||||
.with_blob_auth(BlobAuth::SASToken(sas_token.to_string()));
|
||||
|
||||
assert_eq!(blob_descriptor.uri(), format!("{}?{}", uri, sas_token));
|
||||
assert_eq!(blob_descriptor.uri(), format!("{uri}?{sas_token}"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn blob_descriptor_with_user_assigned_managed_identity() {
|
||||
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
|
||||
let object_id = "my_object_id";
|
||||
let blob_descriptor = BlobDescriptor::new(uri.to_string(), None, None)
|
||||
let blob_descriptor = BlobDescriptor::new(uri, None, None)
|
||||
.with_blob_auth(BlobAuth::UserAssignedManagedIdentity(object_id.to_string()));
|
||||
|
||||
assert_eq!(
|
||||
blob_descriptor.uri(),
|
||||
format!("{};managed_identity={}", uri, object_id)
|
||||
format!("{uri};managed_identity={object_id}")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn blob_descriptor_with_system_assigned_managed_identity() {
|
||||
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
|
||||
let blob_descriptor = BlobDescriptor::new(uri.to_string(), None, None)
|
||||
let blob_descriptor = BlobDescriptor::new(uri, None, None)
|
||||
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);
|
||||
|
||||
assert_eq!(
|
||||
blob_descriptor.uri(),
|
||||
format!("{};managed_identity=system", uri)
|
||||
format!("{uri};managed_identity=system")
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -125,7 +125,7 @@ mod tests {
|
|||
fn blob_descriptor_with_size() {
|
||||
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
|
||||
let size = 123;
|
||||
let blob_descriptor = BlobDescriptor::new(uri.to_string(), Some(size), None);
|
||||
let blob_descriptor = BlobDescriptor::new(uri, Some(size), None);
|
||||
|
||||
assert_eq!(blob_descriptor.size, Some(size));
|
||||
}
|
||||
|
@ -134,7 +134,7 @@ mod tests {
|
|||
fn blob_descriptor_with_source_id() {
|
||||
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
|
||||
let source_id = Uuid::new_v4();
|
||||
let blob_descriptor = BlobDescriptor::new(uri.to_string(), None, Some(source_id));
|
||||
let blob_descriptor = BlobDescriptor::new(uri, None, Some(source_id));
|
||||
|
||||
assert_eq!(blob_descriptor.source_id, source_id);
|
||||
}
|
||||
|
|
|
@ -68,11 +68,9 @@ impl AuthorizationContext {
|
|||
};
|
||||
|
||||
// Convert the JSON string into a Rust string
|
||||
let kusto_identity_token = kusto_identity_token
|
||||
.as_str()
|
||||
.ok_or(anyhow::anyhow!(
|
||||
"Kusto response did not contain a string value"
|
||||
))?;
|
||||
let kusto_identity_token = kusto_identity_token.as_str().ok_or(anyhow::anyhow!(
|
||||
"Kusto response did not contain a string value"
|
||||
))?;
|
||||
|
||||
if kusto_identity_token.chars().all(char::is_whitespace) {
|
||||
return Err(anyhow::anyhow!("Kusto identity token is empty"));
|
||||
|
@ -83,7 +81,7 @@ impl AuthorizationContext {
|
|||
|
||||
/// Fetches the latest Kusto identity token, either retrieving from cache if valid, or by executing a KQL query
|
||||
pub async fn get(&self) -> Result<KustoIdentityToken> {
|
||||
// First, attempt to get the return the token from the cache
|
||||
// Attempt to get the token from the cache
|
||||
let auth_context_cache = self.auth_context_cache.read().await;
|
||||
if !auth_context_cache.is_expired() {
|
||||
if let Some(inner_value) = auth_context_cache.get() {
|
||||
|
|
|
@ -28,7 +28,7 @@ impl<T> Cached<T> {
|
|||
}
|
||||
|
||||
pub fn is_expired(&self) -> bool {
|
||||
self.last_updated.elapsed() > self.refresh_period
|
||||
self.last_updated.elapsed() >= self.refresh_period
|
||||
}
|
||||
|
||||
pub fn update(&mut self, inner: T) {
|
||||
|
|
Загрузка…
Ссылка в новой задаче