This commit is contained in:
Ashley Stanton-Nurse 2024-11-19 14:45:59 -08:00 коммит произвёл GitHub
Родитель 5669d9b1a5
Коммит 53083ea842
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
16 изменённых файлов: 710 добавлений и 129 удалений

Просмотреть файл

@ -5,3 +5,6 @@ euclidian
pkranges
sprocs
udfs
# Cosmos' docs all use "Autoscale" as a single word, rather than a compound "AutoScale" or "Auto Scale"
autoscale

Просмотреть файл

@ -16,17 +16,17 @@ categories = ["api-bindings"]
[dependencies]
async-trait.workspace = true
azure_core.workspace = true
typespec_client_core = { workspace = true, features = ["derive"] }
tracing.workspace = true
url.workspace = true
serde.workspace = true
futures.workspace = true
serde_json.workspace = true
serde.workspace = true
tracing.workspace = true
typespec_client_core = { workspace = true, features = ["derive"] }
url.workspace = true
[dev-dependencies]
azure_identity.workspace = true
clap.workspace = true
time.workspace = true
futures.workspace = true
tracing-subscriber = { workspace = true, features = [ "env-filter", "fmt" ] }
tokio = { workspace = true, default-features = false, features = [
"rt-multi-thread",

Просмотреть файл

@ -1,11 +1,13 @@
use std::error::Error;
use azure_data_cosmos::{
models::{ContainerProperties, PartitionKeyDefinition},
CosmosClient, PartitionKey,
models::{ContainerProperties, PartitionKeyDefinition, ThroughputProperties},
CosmosClient, CreateContainerOptions, CreateDatabaseOptions, PartitionKey,
};
use clap::{Args, Subcommand};
use crate::utils::ThroughputOptions;
/// Creates a new item, database, or container.
#[derive(Clone, Args)]
pub struct CreateCommand {
@ -36,6 +38,9 @@ pub enum Subcommands {
Database {
/// The ID of the new database to create.
id: String,
#[clap(flatten)]
throughput_options: ThroughputOptions,
},
/// Create a container (does not support Entra ID).
@ -43,6 +48,9 @@ pub enum Subcommands {
/// The ID of the database to create the container in.
database: String,
#[clap(flatten)]
throughput_options: ThroughputOptions,
/// The ID of the new container to create.
#[clap(long, short)]
id: Option<String>,
@ -83,9 +91,19 @@ impl CreateCommand {
Ok(())
}
Subcommands::Database { id } => {
Subcommands::Database {
id,
throughput_options,
} => {
let throughput_properties: Option<ThroughputProperties> =
throughput_options.try_into()?;
let options = throughput_properties.map(|p| CreateDatabaseOptions {
throughput: Some(p),
..Default::default()
});
let db = client
.create_database(&id, None)
.create_database(&id, options)
.await?
.deserialize_body()
.await?
@ -97,21 +115,39 @@ impl CreateCommand {
Subcommands::Container {
database,
throughput_options,
id,
partition_key,
json,
} => {
let throughput_properties: Option<ThroughputProperties> =
throughput_options.try_into()?;
let options = throughput_properties.map(|p| CreateContainerOptions {
throughput: Some(p),
..Default::default()
});
let properties = match json {
Some(j) => serde_json::from_str(&j).unwrap(),
None => ContainerProperties {
id: id.expect("the ID is required when not using '--json'"),
partition_key: PartitionKeyDefinition::new(partition_key),
..Default::default()
},
None => {
if partition_key.is_empty() {
panic!("the partition key is required when not using '--json'");
}
if partition_key.len() > 3 {
panic!("only up to 3 partition key paths are supported");
}
ContainerProperties {
id: id.expect("the ID is required when not using '--json'"),
partition_key: PartitionKeyDefinition::new(partition_key),
..Default::default()
}
}
};
let container = client
.database_client(&database)
.create_container(properties, None)
.create_container(properties, options)
.await?
.deserialize_body()
.await?

Просмотреть файл

@ -14,6 +14,7 @@ mod query;
mod read;
mod replace;
mod upsert;
mod utils;
/// A set of basic examples for interacting with Cosmos.
///

Просмотреть файл

@ -2,43 +2,117 @@ use std::error::Error;
use azure_core::StatusCode;
use azure_data_cosmos::CosmosClient;
use clap::Args;
use clap::{Args, Subcommand};
/// Reads a specific item.
#[derive(Clone, Args)]
pub struct ReadCommand {
/// The database containing the item.
database: String,
#[command(subcommand)]
subcommands: Subcommands,
}
/// The container containing the item.
container: String,
#[derive(Clone, Subcommand)]
enum Subcommands {
Database {
/// The database to read metadata for.
database: String,
},
Container {
/// The database containing the container.
database: String,
/// The ID of the item.
#[clap(long, short)]
item_id: String,
/// The container to read metadata for.
container: String,
},
Item {
/// The database containing the item.
database: String,
/// The partition key of the item.
#[clap(long, short)]
partition_key: String,
/// The container containing the item.
container: String,
/// The ID of the item.
#[clap(long, short)]
item_id: String,
/// The partition key of the item.
#[clap(long, short)]
partition_key: String,
},
}
impl ReadCommand {
pub async fn run(self, client: CosmosClient) -> Result<(), Box<dyn Error>> {
let db_client = client.database_client(&self.database);
let container_client = db_client.container_client(&self.container);
match self.subcommands {
Subcommands::Item {
database,
container,
item_id,
partition_key,
} => {
let db_client = client.database_client(&database);
let container_client = db_client.container_client(&container);
let response = container_client
.read_item(&self.partition_key, &self.item_id, None)
.await;
match response {
Err(e) if e.http_status() == Some(StatusCode::NotFound) => println!("Item not found!"),
Ok(r) => {
let item: serde_json::Value = r.deserialize_body().await?.unwrap();
println!("Found item:");
println!("{:#?}", item);
let response = container_client
.read_item(&partition_key, &item_id, None)
.await;
match response {
Err(e) if e.http_status() == Some(StatusCode::NotFound) => {
println!("Item not found!")
}
Ok(r) => {
let item: serde_json::Value = r.deserialize_body().await?.unwrap();
println!("Found item:");
println!("{:#?}", item);
}
Err(e) => return Err(e.into()),
};
Ok(())
}
Err(e) => return Err(e.into()),
};
Ok(())
Subcommands::Database { database } => {
let db_client = client.database_client(&database);
let response = db_client.read(None).await?.deserialize_body().await?;
println!("Database:");
println!(" {:#?}", response);
let resp = db_client.read_throughput(None).await?;
match resp {
None => println!("Database does not have provisioned throughput"),
Some(r) => {
let throughput = r.deserialize_body().await?;
println!("Throughput:");
crate::utils::print_throughput(throughput);
}
}
Ok(())
}
Subcommands::Container {
database,
container,
} => {
let db_client = client.database_client(&database);
let container_client = db_client.container_client(&container);
let response = container_client
.read(None)
.await?
.deserialize_body()
.await?;
println!("Container:");
println!(" {:#?}", response);
let resp = container_client.read_throughput(None).await?;
match resp {
None => println!("Container does not have provisioned throughput"),
Some(r) => {
let throughput = r.deserialize_body().await?;
println!("Throughput:");
crate::utils::print_throughput(throughput);
}
}
Ok(())
}
}
}
}

Просмотреть файл

@ -2,50 +2,121 @@ use std::error::Error;
use azure_core::StatusCode;
use azure_data_cosmos::{CosmosClient, PartitionKey};
use clap::Args;
use clap::{Args, Subcommand};
use crate::utils::ThroughputOptions;
/// Creates a new item.
#[derive(Clone, Args)]
pub struct ReplaceCommand {
/// The database in which to create the item.
database: String,
#[command(subcommand)]
subcommand: Subcommands,
}
/// The container in which to create the item.
container: String,
#[derive(Clone, Subcommand)]
pub enum Subcommands {
Item {
/// The database in which to create the item.
database: String,
/// The ID of the item.
#[clap(long, short)]
item_id: String,
/// The container in which to create the item.
container: String,
/// The partition key of the new item.
#[clap(long, short)]
partition_key: String,
/// The ID of the item.
#[clap(long, short)]
item_id: String,
/// The JSON of the new item.
#[clap(long, short)]
json: String,
/// The partition key of the new item.
#[clap(long, short)]
partition_key: String,
/// The JSON of the new item.
#[clap(long, short)]
json: String,
},
DatabaseThroughput {
/// The database to update throughput for.
database: String,
#[clap(flatten)]
throughput_options: ThroughputOptions,
},
ContainerThroughput {
/// The database containing the container.
database: String,
/// The container to update throughput for.
container: String,
#[clap(flatten)]
throughput_options: ThroughputOptions,
},
}
impl ReplaceCommand {
pub async fn run(self, client: CosmosClient) -> Result<(), Box<dyn Error>> {
let db_client = client.database_client(&self.database);
let container_client = db_client.container_client(&self.container);
match self.subcommand {
Subcommands::Item {
database,
container,
item_id,
partition_key,
json,
} => {
let db_client = client.database_client(&database);
let container_client = db_client.container_client(&container);
let pk = PartitionKey::from(&self.partition_key);
let item: serde_json::Value = serde_json::from_str(&self.json)?;
let pk = PartitionKey::from(&partition_key);
let item: serde_json::Value = serde_json::from_str(&json)?;
let response = container_client
.replace_item(pk, &self.item_id, item, None)
.await;
match response {
Err(e) if e.http_status() == Some(StatusCode::NotFound) => println!("Item not found!"),
Ok(r) => {
let item: serde_json::Value = r.deserialize_body().await?.unwrap();
println!("Replaced item:");
println!("{:#?}", item);
let response = container_client
.replace_item(pk, &item_id, item, None)
.await;
match response {
Err(e) if e.http_status() == Some(StatusCode::NotFound) => {
println!("Item not found!")
}
Ok(r) => {
let item: serde_json::Value = r.deserialize_body().await?.unwrap();
println!("Replaced item:");
println!("{:#?}", item);
}
Err(e) => return Err(e.into()),
};
Ok(())
}
Err(e) => return Err(e.into()),
};
Ok(())
Subcommands::DatabaseThroughput {
database,
throughput_options,
} => {
let throughput_properties = throughput_options.try_into()?;
let db_client = client.database_client(&database);
let new_throughput = db_client
.replace_throughput(throughput_properties, None)
.await?
.deserialize_body()
.await?;
println!("New Throughput:");
crate::utils::print_throughput(new_throughput);
Ok(())
}
Subcommands::ContainerThroughput {
database,
container,
throughput_options,
} => {
let throughput_properties = throughput_options.try_into()?;
let db_client = client.database_client(&database);
let container_client = db_client.container_client(&container);
let new_throughput = container_client
.replace_throughput(throughput_properties, None)
.await?
.deserialize_body()
.await?;
println!("New Throughput:");
crate::utils::print_throughput(new_throughput);
Ok(())
}
}
}
}

Просмотреть файл

@ -0,0 +1,56 @@
use azure_data_cosmos::models::ThroughputProperties;
use clap::Args;
#[derive(Args, Clone)]
pub struct ThroughputOptions {
/// Enables autoscaling and sets the maximum RUs to support. Cannot be used if `--manual` is set.
#[clap(long)]
autoscale: Option<usize>,
/// Sets the increment percentage for autoscale. Ignored unless `--autoscale` is set.
#[clap(long)]
autoscale_increment: Option<usize>,
/// Provisions manual throughput, specifying the number of RUs.
#[clap(long)]
manual: Option<usize>,
}
impl TryFrom<ThroughputOptions> for Option<ThroughputProperties> {
type Error = Box<dyn std::error::Error>;
fn try_from(v: ThroughputOptions) -> Result<Self, Box<dyn std::error::Error>> {
match (v.autoscale, v.manual) {
(Some(_), Some(_)) => Err("cannot set both '--autoscale' and '--manual'".into()),
(Some(max), None) => Ok(Some(ThroughputProperties::autoscale(
max,
v.autoscale_increment,
))),
(None, Some(rus)) => Ok(Some(ThroughputProperties::manual(rus))),
(None, None) => Ok(None),
}
}
}
impl TryFrom<ThroughputOptions> for ThroughputProperties {
type Error = Box<dyn std::error::Error>;
fn try_from(v: ThroughputOptions) -> Result<Self, Box<dyn std::error::Error>> {
let opt: Option<ThroughputProperties> = v.try_into()?;
opt.ok_or("must specify either '--autoscale' or '--manual'".into())
}
}
pub fn print_throughput(throughput: ThroughputProperties) {
if let Some(tp) = throughput.throughput() {
println!(" Throughput: {}RU/s", tp);
} else {
println!(" Throughput: Unlimited");
}
if let Some(autoscale_max) = throughput.autoscale_maximum() {
println!(" Autoscale max: {}RU/s", autoscale_max);
}
if let Some(autoscale_incr) = throughput.autoscale_increment() {
println!(" Autoscale increment: {}%", autoscale_incr);
}
}

Просмотреть файл

@ -3,11 +3,12 @@
use crate::{
constants,
models::{ContainerProperties, Item, PatchDocument, QueryResults},
models::{ContainerProperties, Item, PatchDocument, QueryResults, ThroughputProperties},
options::{QueryOptions, ReadContainerOptions},
pipeline::CosmosPipeline,
resource_context::{ResourceLink, ResourceType},
DeleteContainerOptions, ItemOptions, PartitionKey, Query, QueryPartitionStrategy,
ThroughputOptions,
};
use azure_core::{Method, Pager, Request, Response};
@ -62,14 +63,59 @@ impl ContainerClient {
&self,
options: Option<ReadContainerOptions<'_>>,
) -> azure_core::Result<Response<ContainerProperties>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.link);
let mut req = Request::new(url, Method::Get);
self.pipeline
.send(
options.map(|o| o.method_options.context),
&mut req,
self.link.clone(),
)
.send(options.method_options.context, &mut req, self.link.clone())
.await
}
/// Reads container throughput properties, if any.
///
/// This will return `None` if the database does not have a throughput offer configured.
///
/// # Arguments
/// * `options` - Optional parameters for the request.
pub async fn read_throughput(
&self,
options: Option<ThroughputOptions<'_>>,
) -> azure_core::Result<Option<Response<ThroughputProperties>>> {
let options = options.unwrap_or_default();
// We need to get the RID for the database.
let db = self.read(None).await?.deserialize_body().await?;
let resource_id = db
.system_properties
.resource_id
.expect("service should always return a '_rid' for a container");
self.pipeline
.read_throughput_offer(options.method_options.context, &resource_id)
.await
}
/// Replaces the container throughput properties.
///
/// # Arguments
/// * `throughput` - The new throughput properties to set.
/// * `options` - Optional parameters for the request.
pub async fn replace_throughput(
&self,
throughput: ThroughputProperties,
options: Option<ThroughputOptions<'_>>,
) -> azure_core::Result<Response<ThroughputProperties>> {
let options = options.unwrap_or_default();
// We need to get the RID for the database.
let db = self.read(None).await?.deserialize_body().await?;
let resource_id = db
.system_properties
.resource_id
.expect("service should always return a '_rid' for a container");
self.pipeline
.replace_throughput_offer(options.method_options.context, &resource_id, throughput)
.await
}
@ -83,14 +129,11 @@ impl ContainerClient {
&self,
options: Option<DeleteContainerOptions<'_>>,
) -> azure_core::Result<Response> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.link);
let mut req = Request::new(url, Method::Delete);
self.pipeline
.send(
options.map(|o| o.method_options.context),
&mut req,
self.link.clone(),
)
.send(options.method_options.context, &mut req, self.link.clone())
.await
}
@ -135,13 +178,14 @@ impl ContainerClient {
item: T,
options: Option<ItemOptions<'_>>,
) -> azure_core::Result<Response<Item<T>>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.items_link);
let mut req = Request::new(url, Method::Post);
req.insert_headers(&partition_key.into())?;
req.set_json(&item)?;
self.pipeline
.send(
options.map(|o| o.method_options.context),
options.method_options.context,
&mut req,
self.items_link.clone(),
)
@ -194,13 +238,14 @@ impl ContainerClient {
// REASON: This is a documented public API so prefixing with '_' is undesirable.
options: Option<ItemOptions<'_>>,
) -> azure_core::Result<Response<Item<T>>> {
let options = options.unwrap_or_default();
let link = self.items_link.item(item_id);
let url = self.pipeline.url(&link);
let mut req = Request::new(url, Method::Put);
req.insert_headers(&partition_key.into())?;
req.set_json(&item)?;
self.pipeline
.send(options.map(|o| o.method_options.context), &mut req, link)
.send(options.method_options.context, &mut req, link)
.await
}
@ -248,6 +293,7 @@ impl ContainerClient {
item: T,
options: Option<ItemOptions<'_>>,
) -> azure_core::Result<Response<Item<T>>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.items_link);
let mut req = Request::new(url, Method::Post);
req.insert_header(constants::IS_UPSERT, "true");
@ -255,7 +301,7 @@ impl ContainerClient {
req.set_json(&item)?;
self.pipeline
.send(
options.map(|o| o.method_options.context),
options.method_options.context,
&mut req,
self.items_link.clone(),
)
@ -298,12 +344,13 @@ impl ContainerClient {
item_id: &str,
options: Option<ItemOptions<'_>>,
) -> azure_core::Result<Response<Item<T>>> {
let options = options.unwrap_or_default();
let link = self.items_link.item(item_id);
let url = self.pipeline.url(&link);
let mut req = Request::new(url, Method::Get);
req.insert_headers(&partition_key.into())?;
self.pipeline
.send(options.map(|o| o.method_options.context), &mut req, link)
.send(options.method_options.context, &mut req, link)
.await
}
@ -332,12 +379,13 @@ impl ContainerClient {
item_id: &str,
options: Option<ItemOptions<'_>>,
) -> azure_core::Result<Response> {
let options = options.unwrap_or_default();
let link = self.items_link.item(item_id);
let url = self.pipeline.url(&link);
let mut req = Request::new(url, Method::Delete);
req.insert_headers(&partition_key.into())?;
self.pipeline
.send(options.map(|o| o.method_options.context), &mut req, link)
.send(options.method_options.context, &mut req, link)
.await
}
@ -387,6 +435,7 @@ impl ContainerClient {
patch: PatchDocument,
options: Option<ItemOptions<'_>>,
) -> azure_core::Result<Response> {
let options = options.unwrap_or_default();
let link = self.items_link.item(item_id);
let url = self.pipeline.url(&link);
let mut req = Request::new(url, Method::Patch);
@ -394,7 +443,7 @@ impl ContainerClient {
req.set_json(&patch)?;
self.pipeline
.send(options.map(|o| o.method_options.context), &mut req, link)
.send(options.method_options.context, &mut req, link)
.await
}
@ -458,13 +507,14 @@ impl ContainerClient {
partition_key: impl Into<QueryPartitionStrategy>,
options: Option<QueryOptions<'_>>,
) -> azure_core::Result<Pager<QueryResults<T>>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.items_link);
let mut base_request = Request::new(url, Method::Post);
let QueryPartitionStrategy::SinglePartition(partition_key) = partition_key.into();
base_request.insert_headers(&partition_key)?;
self.pipeline.send_query_request(
options.map(|o| o.method_options.context),
options.method_options.context,
query.into(),
base_request,
self.items_link.clone(),

Просмотреть файл

@ -129,11 +129,12 @@ impl CosmosClient {
query: impl Into<Query>,
options: Option<QueryDatabasesOptions<'_>>,
) -> azure_core::Result<azure_core::Pager<DatabaseQueryResults>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.databases_link);
let base_request = Request::new(url, azure_core::Method::Post);
self.pipeline.send_query_request(
options.map(|o| o.method_options.context),
options.method_options.context,
query.into(),
base_request,
self.databases_link.clone(),
@ -152,6 +153,8 @@ impl CosmosClient {
id: &str,
options: Option<CreateDatabaseOptions<'_>>,
) -> azure_core::Result<Response<Item<DatabaseProperties>>> {
let options = options.unwrap_or_default();
#[derive(Serialize)]
struct RequestBody<'a> {
id: &'a str,
@ -159,11 +162,12 @@ impl CosmosClient {
let url = self.pipeline.url(&self.databases_link);
let mut req = Request::new(url, Method::Post);
req.insert_headers(&options.throughput)?;
req.set_json(&RequestBody { id })?;
self.pipeline
.send(
options.map(|o| o.method_options.context),
options.method_options.context,
&mut req,
self.databases_link.clone(),
)

Просмотреть файл

@ -3,11 +3,14 @@
use crate::{
clients::ContainerClient,
models::{ContainerProperties, ContainerQueryResults, DatabaseProperties, Item},
models::{
ContainerProperties, ContainerQueryResults, DatabaseProperties, Item, ThroughputProperties,
},
options::ReadDatabaseOptions,
pipeline::CosmosPipeline,
resource_context::{ResourceLink, ResourceType},
CreateContainerOptions, DeleteDatabaseOptions, Query, QueryContainersOptions,
ThroughputOptions,
};
use azure_core::{Method, Pager, Request, Response};
@ -71,14 +74,11 @@ impl DatabaseClient {
&self,
options: Option<ReadDatabaseOptions<'_>>,
) -> azure_core::Result<Response<DatabaseProperties>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.link);
let mut req = Request::new(url, Method::Get);
self.pipeline
.send(
options.map(|o| o.method_options.context),
&mut req,
self.link.clone(),
)
.send(options.method_options.context, &mut req, self.link.clone())
.await
}
@ -110,11 +110,12 @@ impl DatabaseClient {
query: impl Into<Query>,
options: Option<QueryContainersOptions<'_>>,
) -> azure_core::Result<Pager<ContainerQueryResults>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.containers_link);
let base_request = Request::new(url, Method::Post);
self.pipeline.send_query_request(
options.map(|o| o.method_options.context),
options.method_options.context,
query.into(),
base_request,
self.containers_link.clone(),
@ -133,13 +134,15 @@ impl DatabaseClient {
properties: ContainerProperties,
options: Option<CreateContainerOptions<'_>>,
) -> azure_core::Result<Response<Item<ContainerProperties>>> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.containers_link);
let mut req = Request::new(url, Method::Post);
req.insert_headers(&options.throughput)?;
req.set_json(&properties)?;
self.pipeline
.send(
options.map(|o| o.method_options.context),
options.method_options.context,
&mut req,
self.containers_link.clone(),
)
@ -156,14 +159,59 @@ impl DatabaseClient {
&self,
options: Option<DeleteDatabaseOptions<'_>>,
) -> azure_core::Result<Response> {
let options = options.unwrap_or_default();
let url = self.pipeline.url(&self.link);
let mut req = Request::new(url, Method::Delete);
self.pipeline
.send(
options.map(|o| o.method_options.context),
&mut req,
self.link.clone(),
)
.send(options.method_options.context, &mut req, self.link.clone())
.await
}
/// Reads database throughput properties, if any.
///
/// This will return `None` if the database does not have a throughput offer configured.
///
/// # Arguments
/// * `options` - Optional parameters for the request.
pub async fn read_throughput(
&self,
options: Option<ThroughputOptions<'_>>,
) -> azure_core::Result<Option<Response<ThroughputProperties>>> {
let options = options.unwrap_or_default();
// We need to get the RID for the database.
let db = self.read(None).await?.deserialize_body().await?;
let resource_id = db
.system_properties
.resource_id
.expect("service should always return a '_rid' for a database");
self.pipeline
.read_throughput_offer(options.method_options.context, &resource_id)
.await
}
/// Replaces the database throughput properties.
///
/// # Arguments
/// * `throughput` - The new throughput properties to set.
/// * `options` - Optional parameters for the request.
pub async fn replace_throughput(
&self,
throughput: ThroughputProperties,
options: Option<ThroughputOptions<'_>>,
) -> azure_core::Result<Response<ThroughputProperties>> {
let options = options.unwrap_or_default();
// We need to get the RID for the database.
let db = self.read(None).await?.deserialize_body().await?;
let resource_id = db
.system_properties
.resource_id
.expect("service should always return a '_rid' for a database");
self.pipeline
.replace_throughput_offer(options.method_options.context, &resource_id, throughput)
.await
}
}

Просмотреть файл

@ -14,5 +14,8 @@ pub const CONTINUATION: HeaderName = HeaderName::from_static("x-ms-continuation"
pub const INDEX_METRICS: HeaderName = HeaderName::from_static("x-ms-cosmos-index-utilization");
pub const QUERY_METRICS: HeaderName = HeaderName::from_static("x-ms-documentdb-query-metrics");
pub const IS_UPSERT: HeaderName = HeaderName::from_static("x-ms-documentdb-is-upsert");
pub const OFFER_THROUGHPUT: HeaderName = HeaderName::from_static("x-ms-offer-throughput");
pub const OFFER_AUTOPILOT_SETTINGS: HeaderName =
HeaderName::from_static("x-ms-cosmos-offer-autopilot-settings");
pub const QUERY_CONTENT_TYPE: ContentType = ContentType::from_static("application/query+json");

Просмотреть файл

@ -11,12 +11,14 @@ mod indexing_policy;
mod item;
mod partition_key_definition;
mod patch_operations;
mod throughput_properties;
pub use container_properties::*;
pub use indexing_policy::*;
pub use item::*;
pub use partition_key_definition::*;
pub use patch_operations::*;
pub use throughput_properties::*;
fn deserialize_cosmos_timestamp<'de, D>(deserializer: D) -> Result<Option<OffsetDateTime>, D::Error>
where
@ -87,7 +89,8 @@ pub struct SystemProperties {
/// The system-generated unique identifier associated with the resource.
#[serde(default)]
#[serde(skip_serializing)]
// Some APIs do expect the "_rid" to be provided (Replace Offer, for example), so we do want to serialize it if it's provided.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "_rid")]
pub resource_id: Option<String>,

Просмотреть файл

@ -0,0 +1,135 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
use std::borrow::Cow;
use azure_core::{
headers::{AsHeaders, HeaderName, HeaderValue},
Model,
};
use serde::{Deserialize, Serialize};
use crate::{constants, models::SystemProperties};
const OFFER_VERSION_2: &str = "V2";
#[derive(Model, Clone, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ThroughputProperties {
resource: String,
#[serde(rename = "content")]
pub(crate) offer: Offer,
#[serde(rename = "id")]
pub(crate) offer_id: String,
offer_resource_id: String,
offer_type: String,
offer_version: Cow<'static, str>, // When we serialize, this is always going to be a constant.
#[serde(flatten)]
pub(crate) system_properties: SystemProperties,
}
impl ThroughputProperties {
pub fn manual(throughput: usize) -> ThroughputProperties {
ThroughputProperties {
offer_version: OFFER_VERSION_2.into(),
offer: Offer {
offer_throughput: Some(throughput),
..Default::default()
},
..Default::default()
}
}
pub fn autoscale(
starting_maximum_throughput: usize,
increment_percent: Option<usize>,
) -> ThroughputProperties {
ThroughputProperties {
offer_version: OFFER_VERSION_2.into(),
offer: Offer {
offer_autopilot_settings: Some(OfferAutoscaleSettings {
max_throughput: starting_maximum_throughput,
auto_upgrade_policy: increment_percent.map(|p| AutoscaleAutoUpgradePolicy {
throughput_policy: Some(AutoscaleThroughputPolicy {
increment_percent: p,
}),
}),
}),
..Default::default()
},
..Default::default()
}
}
pub fn throughput(&self) -> Option<usize> {
self.offer.offer_throughput
}
pub fn autoscale_maximum(&self) -> Option<usize> {
Some(self.offer.offer_autopilot_settings.as_ref()?.max_throughput)
}
pub fn autoscale_increment(&self) -> Option<usize> {
Some(
self.offer
.offer_autopilot_settings
.as_ref()?
.auto_upgrade_policy
.as_ref()?
.throughput_policy
.as_ref()?
.increment_percent,
)
}
}
impl AsHeaders for ThroughputProperties {
type Error = azure_core::Error;
type Iter = std::vec::IntoIter<(HeaderName, HeaderValue)>;
fn as_headers(&self) -> Result<Self::Iter, Self::Error> {
let vec = match (
self.offer.offer_throughput,
self.offer.offer_autopilot_settings.as_ref(),
) {
(Some(t), _) => vec![(constants::OFFER_THROUGHPUT, t.to_string().into())],
(_, Some(ap)) => vec![(
constants::OFFER_AUTOPILOT_SETTINGS,
serde_json::to_string(&ap)?.into(),
)],
(None, None) => vec![],
};
Ok(vec.into_iter())
}
}
#[derive(Clone, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Offer {
#[serde(skip_serializing_if = "Option::is_none")]
pub offer_throughput: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub offer_autopilot_settings: Option<OfferAutoscaleSettings>,
}
#[derive(Clone, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct OfferAutoscaleSettings {
pub max_throughput: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub auto_upgrade_policy: Option<AutoscaleAutoUpgradePolicy>,
}
#[derive(Clone, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct AutoscaleAutoUpgradePolicy {
#[serde(skip_serializing_if = "Option::is_none")]
pub throughput_policy: Option<AutoscaleThroughputPolicy>,
}
#[derive(Clone, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct AutoscaleThroughputPolicy {
pub increment_percent: usize,
}

Просмотреть файл

@ -3,68 +3,78 @@
use azure_core::{ClientMethodOptions, ClientOptions};
use crate::models::ThroughputProperties;
/// Options used when creating a [`CosmosClient`](crate::CosmosClient).
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct CosmosClientOptions {
pub client_options: ClientOptions,
}
/// Options to be passed to [`DatabaseClient::create_container()`](crate::clients::DatabaseClient::create_container()).
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct CreateContainerOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
pub throughput: Option<ThroughputProperties>,
}
/// Options to be passed to [`CosmosClient::create_database()`](crate::CosmosClient::create_database()).
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct CreateDatabaseOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
pub throughput: Option<ThroughputProperties>,
}
/// Options to be passed to [`ContainerClient::delete()`](crate::clients::ContainerClient::delete()).
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct DeleteContainerOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}
/// Options to be passed to [`DatabaseClient::delete()`](crate::clients::DatabaseClient::delete()).
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct DeleteDatabaseOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}
/// Options to be passed to APIs that manipulate items.
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct ItemOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}
/// Options to be passed to [`DatabaseClient::query_containers()`](crate::clients::DatabaseClient::query_containers())
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct QueryContainersOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}
/// Options to be passed to [`CosmosClient::query_databases()`](crate::CosmosClient::query_databases())
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct QueryDatabasesOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}
/// Options to be passed to [`ContainerClient::query_items()`](crate::clients::ContainerClient::query_items()).
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct QueryOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}
/// Options to be passed to [`ContainerClient::read()`](crate::clients::ContainerClient::read()).
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct ReadContainerOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}
/// Options to be passed to [`DatabaseClient::read()`](crate::clients::DatabaseClient::read()).
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct ReadDatabaseOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}
/// Options to be passed to operations related to Throughput offers.
#[derive(Clone, Default)]
pub struct ThroughputOptions<'a> {
pub method_options: ClientMethodOptions<'a>,
}

Просмотреть файл

@ -7,12 +7,18 @@ mod signature_target;
use std::sync::Arc;
pub use authorization_policy::AuthorizationPolicy;
use azure_core::{ClientOptions, Context, Pager, Request};
use serde::de::DeserializeOwned;
use azure_core::{ClientOptions, Context, Method, Model, Pager, Request, Response};
use futures::StreamExt;
use serde::{de::DeserializeOwned, Deserialize};
use typespec_client_core::http::PagerResult;
use url::Url;
use crate::{constants, resource_context::ResourceLink, Query};
use crate::{
constants,
models::ThroughputProperties,
resource_context::{ResourceLink, ResourceType},
Query,
};
/// Newtype that wraps an Azure Core pipeline to provide a Cosmos-specific pipeline which configures our authorization policy and enforces that a [`ResourceType`] is set on the context.
#[derive(Debug, Clone)]
@ -50,17 +56,17 @@ impl CosmosPipeline {
pub async fn send<T>(
&self,
ctx: Option<Context<'_>>,
ctx: Context<'_>,
request: &mut Request,
resource_link: ResourceLink,
) -> azure_core::Result<azure_core::Response<T>> {
let ctx = ctx.unwrap_or_default().with_value(resource_link);
) -> azure_core::Result<Response<T>> {
let ctx = ctx.with_value(resource_link);
self.pipeline.send(&ctx, request).await
}
pub fn send_query_request<T: DeserializeOwned>(
&self,
ctx: Option<Context<'_>>,
ctx: Context<'_>,
query: Query,
mut base_request: Request,
resource_link: ResourceLink,
@ -72,10 +78,7 @@ impl CosmosPipeline {
// We have to double-clone here.
// First we clone the pipeline to pass it in to the closure
let pipeline = self.pipeline.clone();
let ctx = ctx
.unwrap_or_default()
.with_value(resource_link)
.into_owned();
let ctx = ctx.with_value(resource_link).into_owned();
Ok(Pager::from_callback(move |continuation| {
// Then we have to clone it again to pass it in to the async block.
// This is because Pageable can't borrow any data, it has to own it all.
@ -97,4 +100,86 @@ impl CosmosPipeline {
}
}))
}
/// Helper function to read a throughput offer given a resource ID.
///
/// ## Arguments
/// * `context` - The context for the request.
/// * `resource_id` - The resource ID to read the throughput offer for.
pub async fn read_throughput_offer(
&self,
context: Context<'_>,
resource_id: &str,
) -> azure_core::Result<Option<Response<ThroughputProperties>>> {
#[derive(Model, Deserialize)]
struct OfferResults {
#[serde(rename = "Offers")]
pub offers: Vec<ThroughputProperties>,
}
// We only have to into_owned here in order to call send_query_request below,
// since it returns `Pager` which must own it's data.
// See https://github.com/Azure/azure-sdk-for-rust/issues/1911 for further discussion
let context = context.into_owned();
// Now, query for the offer for this resource.
let query = Query::from("SELECT * FROM c WHERE c.offerResourceId = @rid")
.with_parameter("@rid", resource_id)?;
let offers_link = ResourceLink::root(ResourceType::Offers);
let mut results: Pager<OfferResults> = self.send_query_request(
context.clone(),
query,
Request::new(self.url(&offers_link), Method::Post),
offers_link.clone(),
)?;
let offers = results
.next()
.await
.expect("the first pager result should always be Some, even when there's an error")?
.deserialize_body()
.await?
.offers;
if offers.is_empty() {
// No offers found for this resource.
return Ok(None);
}
let offer_link = offers_link.item(&offers[0].offer_id);
let offer_url = self.url(&offer_link);
// Now we can read the offer itself
let mut req = Request::new(offer_url, Method::Get);
self.send(context, &mut req, offer_link).await.map(Some)
}
/// Helper function to update a throughput offer given a resource ID.
///
/// ## Arguments
/// * `context` - The context for the request.
/// * `resource_id` - The resource ID to update the throughput offer for.
/// * `throughput` - The new throughput to set.
pub async fn replace_throughput_offer(
&self,
context: Context<'_>,
resource_id: &str,
throughput: ThroughputProperties,
) -> azure_core::Result<Response<ThroughputProperties>> {
let response = self
.read_throughput_offer(context.clone(), resource_id)
.await?;
let mut current_throughput = match response {
Some(r) => r.deserialize_body().await?,
None => Default::default(),
};
current_throughput.offer = throughput.offer;
// NOTE: Offers API doesn't allow Enable Content Response On Write to be false, so once we support that option, we'll need to ignore it here.
let offer_link =
ResourceLink::root(ResourceType::Offers).item(&current_throughput.offer_id);
let mut req = Request::new(self.url(&offer_link), Method::Put);
req.set_json(&current_throughput)?;
self.send(context, &mut req, offer_link).await
}
}

Просмотреть файл

@ -82,9 +82,11 @@ impl ResourceLink {
/// See https://learn.microsoft.com/en-us/rest/api/cosmos-db/access-control-on-cosmosdb-resources#constructkeytoken for more details.
#[cfg_attr(not(feature = "key_auth"), allow(dead_code))] // REASON: Currently only used in key_auth feature but we don't want to conditional-compile it.
pub fn resource_link(&self) -> String {
match self.item_id {
Some(_) => self.path(),
None => self.parent.clone().unwrap_or_default(),
match (self.resource_type, self.item_id.as_ref()) {
// Offers have a particular resource link format expected when requesting the offer itself.
(ResourceType::Offers, Some(i)) => i.to_lowercase(),
(_, Some(_)) => self.path(),
(_, None) => self.parent.clone().unwrap_or_default(),
}
}