Added content to the readme for Rust Event Hubs (#1740)

* Added content to the readme for Rust Eventhubs - lost more to come

* Fixed readme.md inclusion

* Added content to the readme for Rust Eventhubs; added documentation for the Consumer module

* Added eventhubs to global cspell

* Cleaned up use of EventHubs service name (it's Event Hubs); moved StartPosition to consumer since it's a consumer construct, not a model construct; Cleaned up doccomments around the tree.
This commit is contained in:
Larry Osterman 2024-08-06 16:53:49 -07:00 коммит произвёл GitHub
Родитель 7fbd2b8783
Коммит 504d88806c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
18 изменённых файлов: 802 добавлений и 513 удалений

5
.vscode/cspell.json поставляемый
Просмотреть файл

@ -15,6 +15,8 @@
"downcasted",
"downcasting",
"etag",
"eventhub",
"eventhubs",
"hmac",
"iothub",
"keyvault",
@ -25,6 +27,7 @@
"posix",
"reqwest",
"seekable",
"servicebus",
"stylesheet",
"typespec",
"virtualmachine"
@ -84,4 +87,4 @@
]
}
]
}
}

Просмотреть файл

@ -1,5 +1,3 @@
<!-- cspell: words EVENTHUB eventhubs -->
# azure_core_amqp
Azure AMQP crate for consumption of AMQP based packages in the Azure SDK for C++.

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
// cspell: words amqp widnow eventhubs sasl
// cspell: words amqp sasl
use crate::connection::{AmqpConnectionApis, AmqpConnectionOptions};

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
// cspell: words thiserror eventhubs amqp
// cspell: words thiserror amqp
macro_rules! impl_from_external_error {
($(($amqp_error:ident, $foreign_error:ty)),*) => {

Просмотреть файл

@ -1,7 +1,300 @@
# azure_messaging_servicebus
<!-- cspell:words pwsh yourgroup westus servicebus checkpointing -->
# Azure Event Hubs Client Package for Rust
Azure Event Hubs crate for the Microsoft Azure SDK for Rust.
This crate is part of a collection of crates: for more information please refer to [https://github.com/azure/azure-sdk-for-rust](https://github.com/azure/azure-sdk-for-rust).
[Azure Event Hubs](https://azure.microsoft.com/services/event-hubs/) is a big data streaming platform and event ingestion service from Microsoft. For more information about Event Hubs see: [link](https://docs.microsoft.com/azure/event-hubs/event-hubs-about).
License: MIT
Use the client library `azure_messaging_eventhubs` in your application to:
- Send events to an event hub.
- Consume events from an event hub.
Key links:
- [Source code][source]
- [API Reference Documentation][rustdoc]
- [Product documentation](https://azure.microsoft.com/services/event-hubs/)
- [Samples][rustdoc_examples]
## Getting started
### Install the package
Add the Azure Event Hubs client package for rust to your `cargo.toml` file:
```bash
cargo add azure_messaging_eventhubs
```
### Prerequisites
- A Rust Compiler. See [here](https://www.rust-lang.org/tools/install) for installation instructions.
- An [Azure subscription](https://azure.microsoft.com/free/)
- An [Event Hub namespace](https://docs.microsoft.com/azure/event-hubs/).
- An Event Hub. You can create an event hub in your Event Hubs Namespace using the [Azure Portal](https://docs.microsoft.com/azure/event-hubs/event-hubs-create), or the [Azure CLI](https://docs.microsoft.com/azure/event-hubs/event-hubs-quickstart-cli).
#### Create a namespace using the Azure CLI
Login to the CLI:
```pwsh
az login
```
Create a resource group:
```pwsh
az group create --name <your group name> --location <your location> --subscription <your subscription>
```
This should output something like:
```json
{
"id": "/subscriptions/<your subscription ID>/resourceGroups/<your group name>",
"location": "<your location>",
"managedBy": null,
"name": "<yourgroup name>",
"properties": {
"provisioningState": "Succeeded"
},
"tags": null,
"type": "Microsoft.Resources/resourceGroups"
}
```
Create an Event Hubs namespace:
```pwsh
az eventhubs namespace create --resource-group <your group name> --name <your namespace name> --sku Standard --subscription <your subscription>
```
This should output something like:
```json
{
"createdAt": "2023-08-10T18:41:54.19Z",
"disableLocalAuth": false,
"id": "/subscriptions/<your subscription ID>/resourceGroups/<your group name>/providers/Microsoft.EventHub/namespaces/<your namespace>",
"isAutoInflateEnabled": false,
"kafkaEnabled": true,
"location": "West US",
"maximumThroughputUnits": 0,
"metricId": "REDACTED",
"minimumTlsVersion": "1.2",
"name": "<your namespace name>",
"provisioningState": "Succeeded",
"publicNetworkAccess": "Enabled",
"resourceGroup": "<your resource group>",
"serviceBusEndpoint": "https://<your namespace name>.servicebus.windows.net:443/",
"sku": {
"capacity": 1,
"name": "Standard",
"tier": "Standard"
},
"status": "Active",
"tags": {},
"type": "Microsoft.EventHub/Namespaces",
"updatedAt": "2023-08-10T18:42:41.343Z",
"zoneRedundant": false
}
```
Create an EventHub:
```pwsh
az eventhubs eventhub create --resource-group <your resource group> --namespace-name <your namespace name> --name <your eventhub name>
```
That should output something like:
```json
{
"createdAt": "2023-08-10T21:02:07.62Z",
"id": "/subscriptions/<your subscription>/resourceGroups/<your group name>/providers/Microsoft.EventHub/namespaces/<your namespace name>/eventhubs/<your eventhub name>",
"location": "westus",
"messageRetentionInDays": 7,
"name": "<your eventhub name>",
"partitionCount": 4,
"partitionIds": ["0", "1", "2", "3"],
"resourceGroup": "<your group name>",
"retentionDescription": {
"cleanupPolicy": "Delete",
"retentionTimeInHours": 168
},
"status": "Active",
"type": "Microsoft.EventHub/namespaces/eventhubs",
"updatedAt": "2023-08-10T21:02:16.29Z"
}
```
### Authenticate the client
Event Hub clients are created using a credential from the [Azure Identity package][azure_identity_pkg], like [DefaultAzureCredential][default_azure_credential].
# Key concepts
An Event Hub [**namespace**](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#namespace) can have multiple event hubs.
Each event hub, in turn, contains [**partitions**](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#partitions) which
store events.
<!-- NOTE: Fix dead links -->
Events are published to an event hub using an [event publisher](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#event-publishers). In this package, the event publisher is the [ProducerClient]()
Events can be consumed from an event hub using an [event consumer](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#event-consumers). In this package there are two types for consuming events:
- The basic event consumer is the PartitionClient, in the [ConsumerClient][consumer_client]. This consumer is useful if you already known which partitions you want to receive from.
- A distributed event consumer, which uses Azure Blobs for checkpointing and coordination. This is implemented in the [Processor](https://azure.github.io/azure-sdk-for-cpp/storage.html).
The Processor is useful when you want to have the partition assignment be dynamically chosen, and balanced with other Processor instances.
More information about Event Hubs features and terminology can be found here: [link](https://docs.microsoft.com/azure/event-hubs/event-hubs-features)
# Examples
Examples for various scenarios can be found on in the samples directory in our GitHub repo for
[Event Hubs](https://github.com/Azure/azure-sdk-for-rust/tree/main/sdk/eventhubs/azure-messaging-eventhubs/samples).
## Send events
The following example shows how to send events to an event hub:
```rust
async fn send_events() {
let host = std::env::var("EVENTHUBS_HOST").unwrap();
let eventhub = std::env::var("EVENTHUB_NAME").unwrap();
let credential = azure_identity::DefaultAzureCredential::create(azure_identity::TokenCredentialOptions::default()).unwrap();
let client = azure_messaging_eventhubs::producer::ProducerClient::new(
host,
eventhub.clone(),
credential,
azure_messaging_eventhubs::producer::ProducerClientOptions::builder()
.with_application_id("test_create_batch")
.build(),
);
client.open().await.unwrap();
{
let mut batch = client.create_batch(None).await.unwrap();
assert_eq!(batch.len(), 0);
assert!(batch.try_add_event_data(vec![1, 2, 3, 4], None).unwrap());
let res = client.submit_batch(&batch).await;
assert!(res.is_ok());
}
}
```
## Receive events
The following example shows how to receive events from partition 1 on an event hub:
```rust no_run
async fn receive_events() {
use futures::pin_mut;
use async_std::stream::StreamExt;
let host = std::env::var("EVENTHUBS_HOST").unwrap();
let eventhub = std::env::var("EVENTHUB_NAME").unwrap();
let credential = azure_identity::DefaultAzureCredential::create(azure_identity::TokenCredentialOptions::default()).unwrap();
let client = azure_messaging_eventhubs::consumer::ConsumerClient::new(
host,
eventhub,
None,
credential,
Some(
azure_messaging_eventhubs::consumer::ConsumerClientOptions::builder()
.with_application_id("receive_lots_of_events")
.build(),
),
);
client.open().await.unwrap();
let event_stream = client
.receive_events_on_partition(
"0",
Some(
azure_messaging_eventhubs::consumer::ReceiveOptions::builder()
.with_start_position(azure_messaging_eventhubs::consumer::StartPosition::builder().with_earliest_location().build())
.build(),
),
)
.await;
pin_mut!(event_stream); // Needed for iteration.
while let Some(event_result) = event_stream.next().await {
match event_result {
Ok(event) => {
// Process the received event
println!("Received event: {:?}", event);
}
Err(err) => {
// Handle the error
eprintln!("Error receiving event: {:?}", err);
}
}
}
}
```
# Troubleshooting
## Logging
The Event Hubs SDK client uses the [tracing](https://docs.rs/tracing/latest/tracing/) package to
enable diagnostics.
## Contributing
For details on contributing to this repository, see the [contributing guide][azure_sdk_for_cpp_contributing].
This project welcomes contributions and suggestions. Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the rights to use your contribution. For details, visit [the contributor license agreement page](https://cla.microsoft.com).
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide
a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions
provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/).
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
### Additional Helpful Links for Contributors
Many people all over the world have helped make this project better. You'll want to check out:
- [What are some good first issues for new contributors to the repo?](https://github.com/azure/azure-sdk-for-rust/issues?q=is%3Aopen+is%3Aissue+label%3A%22up+for+grabs%22)
- [How to build and test your change][azure_sdk_for_cpp_contributing_developer_guide]
- [How you can make a change happen!][azure_sdk_for_cpp_contributing_pull_requests]
- Frequently Asked Questions (FAQ) and Conceptual Topics in the detailed [Azure SDK for C++ wiki](https://github.com/azure/azure-sdk-for-cpp/wiki).
<!-- ### Community-->
### Reporting security issues and security bugs
Security issues and bugs should be reported privately, via email, to the Microsoft Security Response Center (MSRC) <secure@microsoft.com>. You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Further information, including the MSRC PGP key, can be found in the [Security TechCenter](https://www.microsoft.com/msrc/faqs-report-an-issue).
### License
Azure SDK for C++ is licensed under the [MIT](https://github.com/Azure/azure-sdk-for-cpp/blob/main/LICENSE.txt) license.
<!-- LINKS -->
[azure_sdk_for_cpp_contributing]: https://github.com/Azure/azure-sdk-for-rust/blob/main/CONTRIBUTING.md
[azure_sdk_for_cpp_contributing_developer_guide]: https://github.com/Azure/azure-sdk-for-rust/blob/main/CONTRIBUTING.md#developer-guide
[azure_sdk_for_cpp_contributing_pull_requests]: https://github.com/Azure/azure-sdk-for-rust/blob/main/CONTRIBUTING.md#pull-requests
[consumer_client]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-messaging-eventhubs/latest/class_azure_1_1_messaging_1_1_event_hubs_1_1_consumer_client.html
[producer_client]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-messaging-eventhubs/1.0.0-beta.1/class_azure_1_1_messaging_1_1_event_hubs_1_1_producer_client.html
[source]: https://github.com/Azure/azure-sdk-for-rust/tree/feature/track2/sdk/eventhubs/azure_messaging_eventhubs
[azure_identity_pkg]: https://docs.rs/azure_identity/latest/azure_identity/
[default_azure_credential]: https://docs.rs/azure_identity/latest/azure_identity/struct.DefaultAzureCredential.html
[rustdoc]: https://docs.rs/azure_messaging_eventhubs/latest/azure_messaging_eventhubs
[rustdoc_examples]: https://github.com/Azure/azure-sdk-for-cpp/tree/main/sdk/eventhubs/azure-messaging-eventhubs/samples
![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-cpp%2Fsdk%2Feventhubs%2FREADME.png)

Просмотреть файл

@ -0,0 +1,108 @@
<!-- cspell: words -->
# consumer Module Overview
This module contains the `ConsumerClient` struct and related types, which are used for receiving events from an Event Hub.
The `ConsumerClient` provides functionality to establish a connection to an Event Hub, receive events from a specific partition,
and manage the lifecycle of the consumer client.
## Examples
### Creating a new `ConsumerClient` instance
```rust no_run
use azure_messaging_eventhubs::consumer::ConsumerClient;
use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
let my_credential = DefaultAzureCredential::create(TokenCredentialOptions::default()).unwrap();
let consumer = ConsumerClient::new("my_namespace", "my_eventhub", None, my_credential, None);
```
### Opening a connection to the Event Hub
```rust no_run
use azure_messaging_eventhubs::consumer::ConsumerClient;
use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
#[tokio::main]
async fn main() {
let my_credential = DefaultAzureCredential::create(TokenCredentialOptions::default()).unwrap();
let consumer = ConsumerClient::new("my_namespace", "my_eventhub", None, my_credential, None);
let result = consumer.open().await;
match result {
Ok(()) => {
// Connection opened successfully
println!("Connection opened successfully");
}
Err(err) => {
// Handle the error
eprintln!("Error opening connection: {:?}", err);
}
}
}
```
### Closing the connection to the Event Hub
```rust no_run
use azure_messaging_eventhubs::consumer::ConsumerClient;
use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
#[tokio::main]
async fn main() {
let my_credential = DefaultAzureCredential::create(TokenCredentialOptions::default()).unwrap();
let consumer = ConsumerClient::new("my_namespace", "my_eventhub", None, my_credential, None);
consumer.open().await.unwrap();
let result = consumer.close().await;
match result {
Ok(()) => {
// Connection closed successfully
println!("Connection closed successfully");
}
Err(err) => {
// Handle the error
eprintln!("Error closing connection: {:?}", err);
}
}
}
```
### Receiving events from a specific partition of the Event Hub
```rust no_run
use azure_messaging_eventhubs::consumer::ConsumerClient;
use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
use async_std::stream::StreamExt;
#[tokio::main]
async fn main() {
let my_credential = DefaultAzureCredential::create(TokenCredentialOptions::default()).unwrap();
let consumer = ConsumerClient::new("my_namespace", "my_eventhub", None, my_credential, None);
let partition_id = "0";
let options = None;
consumer.open().await.unwrap();
let event_stream = consumer.receive_events_on_partition(partition_id, options).await;
tokio::pin!(event_stream);
while let Some(event_result) = event_stream.next().await {
match event_result {
Ok(event) => {
// Process the received event
println!("Received event: {:?}", event);
}
Err(err) => {
// Handle the error
eprintln!("Error receiving event: {:?}", err);
}
}
}
}
```

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
// cspell: words eventhub eventhubs
// cspell: words
use azure_core::error::Result;
use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
use azure_messaging_eventhubs::producer::{ProducerClient, ProducerClientOptions};

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
// cspell: words eventhub eventhubs
// cspell: words
use azure_core::error::Result;
use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};

Просмотреть файл

@ -1,2 +0,0 @@
[toolchain]
channel = "1.78.0"

Просмотреть файл

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
// cspell: words eventhub eventhubs amqp
// cspell: words amqp
use crate::{
error::ErrorKind,
models::{EventHubPartitionProperties, EventHubProperties},

Просмотреть файл

@ -1,120 +1,15 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
//cspell: words amqp eventhub eventhubs mgmt amqps
//cspell: words amqp mgmt amqps
/// This module contains the `ConsumerClient` struct and related types, which are used for receiving events from an Event Hub.
///
/// The `ConsumerClient` provides functionality to establish a connection to an Event Hub, receive events from a specific partition,
/// and manage the lifecycle of the consumer client.
///
/// # Examples
///
/// Creating a new `ConsumerClient` instance:
///
/// ``` no_run
/// use azure_messaging_eventhubs::consumer::ConsumerClient;
/// use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
///
/// let my_credential = DefaultAzureCredential::create(TokenCredentialOptions::default()).unwrap();
/// let consumer = ConsumerClient::new("my_namespace", "my_eventhub", None, my_credential, None);
/// ```
///
/// Opening a connection to the Event Hub:
///
/// ``` no_run
/// use azure_messaging_eventhubs::consumer::ConsumerClient;
/// use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
///
/// #[tokio::main]
/// async fn main() {
/// let my_credential = DefaultAzureCredential::create(TokenCredentialOptions::default()).unwrap();
/// let consumer = ConsumerClient::new("my_namespace", "my_eventhub", None, my_credential, None);
///
/// let result = consumer.open().await;
///
/// match result {
/// Ok(()) => {
/// // Connection opened successfully
/// println!("Connection opened successfully");
/// }
/// Err(err) => {
/// // Handle the error
/// eprintln!("Error opening connection: {:?}", err);
/// }
/// }
/// }
/// ```
///
/// Closing the connection to the Event Hub:
///
/// ``` no_run
/// use azure_messaging_eventhubs::consumer::ConsumerClient;
/// use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
///
/// #[tokio::main]
/// async fn main() {
/// let my_credential = DefaultAzureCredential::create(TokenCredentialOptions::default()).unwrap();
/// let consumer = ConsumerClient::new("my_namespace", "my_eventhub", None, my_credential, None);
///
/// consumer.open().await.unwrap();
///
/// let result = consumer.close().await;
///
/// match result {
/// Ok(()) => {
/// // Connection closed successfully
/// println!("Connection closed successfully");
/// }
/// Err(err) => {
/// // Handle the error
/// eprintln!("Error closing connection: {:?}", err);
/// }
/// }
/// }
/// ```
///
/// Receiving events from a specific partition of the Event Hub:
///
/// ``` no_run
/// use azure_messaging_eventhubs::consumer::ConsumerClient;
/// use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
/// use async_std::stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
/// let my_credential = DefaultAzureCredential::create(TokenCredentialOptions::default()).unwrap();
/// let consumer = ConsumerClient::new("my_namespace", "my_eventhub", None, my_credential, None);
/// let partition_id = "0";
/// let options = None;
///
/// consumer.open().await.unwrap();
///
/// let event_stream = consumer.receive_events_on_partition(partition_id, options).await;
///
/// tokio::pin!(event_stream);
/// while let Some(event_result) = event_stream.next().await {
/// match event_result {
/// Ok(event) => {
/// // Process the received event
/// println!("Received event: {:?}", event);
/// }
/// Err(err) => {
/// // Handle the error
/// eprintln!("Error receiving event: {:?}", err);
/// }
/// }
/// }
/// }
/// ```
///
use super::{
common::{
user_agent::{get_package_name, get_package_version, get_platform_info, get_user_agent},
ManagementInstance,
},
error::ErrorKind,
models::{EventHubPartitionProperties, EventHubProperties, ReceivedEventData, StartPosition},
models::{EventHubPartitionProperties, EventHubProperties, ReceivedEventData},
};
use async_std::sync::Mutex;
@ -648,6 +543,139 @@ impl ReceiveOptions {
}
}
#[derive(Debug, Default, PartialEq, Clone)]
pub(crate) enum StartLocation {
Offset(String),
SequenceNumber(i64),
EnqueuedTime(std::time::SystemTime),
Earliest,
#[default]
Latest,
}
const ENQUEUED_TIME_ANNOTATION: &str = "amqp.annotation.x-opt-enqueued-time";
const OFFSET_ANNOTATION: &str = "amqp.annotation.x-opt-offset";
const SEQUENCE_NUMBER_ANNOTATION: &str = "amqp.annotation.x-opt-sequence-number";
/// Represents the starting position of a consumer when receiving events from an Event Hub.
///
/// This enum provides different ways to specify the starting position of a consumer when receiving events from an Event Hub.
/// The starting position can be specified using an offset, a sequence number, an enqueued time, or the earliest or latest event in the partition.
///
/// The default starting position is the latest event in the partition (always receive new events).
///
/// # Examples
///
/// Basic usage:
///
/// ```no_run
/// use azure_messaging_eventhubs::consumer::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_sequence_number(12345)
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::consumer::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_sequence_number(12345)
/// .inclusive()
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::consumer::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_enqueued_time(std::time::SystemTime::now())
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::consumer::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_offset("12345".to_string())
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::consumer::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_earliest_location()
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::consumer::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_latest_location()
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::consumer::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .build();
/// ```
///
#[derive(Debug, PartialEq, Clone, Default)]
pub struct StartPosition {
location: StartLocation,
inclusive: bool,
}
impl StartPosition {
/// Creates a new builder to build a `StartPosition`.
///
/// # Returns
///
/// A builder which can be used to create a StartPosition.
///
pub fn builder() -> builders::StartPositionBuilder {
builders::StartPositionBuilder::new()
}
pub(crate) fn start_expression(position: &Option<StartPosition>) -> String {
if let Some(position) = position {
let mut greater_than: &str = ">";
if position.inclusive {
greater_than = ">=";
}
match &position.location {
StartLocation::Offset(offset) => {
format!("{} {}'{}'", OFFSET_ANNOTATION, greater_than, offset)
}
StartLocation::SequenceNumber(sequence_number) => {
format!(
"{} {}'{}'",
SEQUENCE_NUMBER_ANNOTATION, greater_than, sequence_number
)
}
StartLocation::EnqueuedTime(enqueued_time) => {
let enqueued_time = enqueued_time
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();
format!(
"{} {}'{}'",
ENQUEUED_TIME_ANNOTATION, greater_than, enqueued_time
)
}
StartLocation::Earliest => "amqp.annotation.x-opt-offset > '-1'".to_string(),
StartLocation::Latest => "amqp.annotation.x-opt-offset > '@latest'".to_string(),
}
} else {
"amqp.annotation.x-opt-offset > '@latest'".to_string()
}
}
}
mod builders {
use super::*;
@ -757,11 +785,143 @@ mod builders {
self.options
}
}
/// A builder for the `StartPosition` struct.
pub struct StartPositionBuilder {
position: StartPosition,
}
impl StartPositionBuilder {
pub(super) fn new() -> Self {
Self {
position: Default::default(),
}
}
/// Sets the starting position to the earliest event in the partition.
///
/// # Returns
///
/// A reference to the updated builder.
///
pub fn with_earliest_location(mut self) -> Self {
self.position.location = StartLocation::Earliest;
self
}
/// Sets the starting position to the latest event in the partition.
///
/// # Returns
///
/// A reference to the updated builder.
///
pub fn with_latest_location(mut self) -> Self {
self.position.location = StartLocation::Latest;
self
}
/// Sets the starting position to the event with the specified sequence number.
///
/// # Parameters
///
/// - `sequence_number`: The sequence number to start receiving events.
///
/// # Returns
///
/// A reference to the updated builder.
///
/// # Remarks:
///
/// If the "inclusive" method is not called, the starting position will be greater than the specified sequence number.
/// If the "inclusive" method is called, the message at the starting sequence number will be included.
///
pub fn with_sequence_number(mut self, sequence_number: i64) -> Self {
self.position.location = StartLocation::SequenceNumber(sequence_number);
self
}
/// Sets the starting position to the event enqueued at the specified time.
///
/// # Parameters
///
/// - `enqueued_time`: The time when the event was enqueued.
///
/// # Returns
///
/// A reference to the updated builder.
///
/// # Remarks
///
/// If the "inclusive" method is not called, the starting position will be greater than the specified enqueued time.
/// If the "inclusive" method is called, the message enqueued at the specified time will be included.
///
pub fn with_enqueued_time(mut self, enqueued_time: std::time::SystemTime) -> Self {
self.position.location = StartLocation::EnqueuedTime(enqueued_time);
self
}
/// Sets the starting position to the event with the specified offset.
///
/// # Parameters
///
/// - `offset`: The offset of the event.
///
/// # Returns
///
/// A reference to the updated builder.
///
/// # Remarks
///
/// If the "inclusive" method is not called, the starting position will be greater than the specified offset.
/// If the "inclusive" method is called, the message at the specified offset will be included.
///
pub fn with_offset(mut self, offset: String) -> Self {
self.position.location = StartLocation::Offset(offset);
self
}
/// Sets the starting position to be inclusive.
///
/// # Returns
///
/// A reference to the updated builder.
///
/// # Remarks
///
/// If this method is called, the message at the starting position will be included.
///
pub fn inclusive(mut self) -> Self {
self.position.inclusive = true;
self
}
/// Builds the `StartPosition`.
///
/// # Returns
///
/// The built `StartPosition`.
///
pub fn build(self) -> StartPosition {
self.position
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tracing::info;
static INIT_LOGGING: std::sync::Once = std::sync::Once::new();
#[test]
fn setup() {
INIT_LOGGING.call_once(|| {
println!("Setting up test logger...");
tracing_subscriber::fmt::init();
});
}
#[test]
fn test_default_consumer_options() {
@ -792,4 +952,112 @@ mod tests {
assert_eq!(options.instance_id, Some("test_instance_id".to_string()));
assert!(options.retry_options.is_some());
}
#[test]
fn test_start_position_builder_with_sequence_number() {
setup();
let sequence_number = 12345i64;
let start_position = StartPosition::builder()
.with_sequence_number(sequence_number)
.build();
assert_eq!(
start_position.location,
StartLocation::SequenceNumber(sequence_number)
);
assert_eq!(
StartPosition::start_expression(&Some(start_position)),
"amqp.annotation.x-opt-sequence-number >'12345'"
);
let start_position = StartPosition::builder()
.with_sequence_number(sequence_number)
.inclusive()
.build();
assert_eq!(
StartPosition::start_expression(&Some(start_position)),
"amqp.annotation.x-opt-sequence-number >='12345'"
);
}
#[test]
fn test_start_position_builder_with_enqueued_time() {
setup();
let enqueued_time = std::time::SystemTime::now();
let start_position = StartPosition::builder()
.with_enqueued_time(enqueued_time)
.build();
info!("enqueued_time: {:?}", enqueued_time);
info!(
"enqueued_time: {:?}",
enqueued_time.duration_since(std::time::UNIX_EPOCH)
);
info!(
"enqueued_time: {:?}",
enqueued_time
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
assert_eq!(
start_position.location,
StartLocation::EnqueuedTime(enqueued_time)
);
assert_eq!(
StartPosition::start_expression(&Some(start_position)),
format!(
"amqp.annotation.x-opt-enqueued-time >'{}'",
enqueued_time
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
)
);
let start_position = StartPosition::builder()
.with_enqueued_time(enqueued_time)
.inclusive()
.build();
assert_eq!(
StartPosition::start_expression(&Some(start_position)),
format!(
"amqp.annotation.x-opt-enqueued-time >='{}'",
enqueued_time
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
)
);
}
#[test]
fn test_start_position_builder_with_offset() {
setup();
let offset = "12345".to_string();
let start_position = StartPosition::builder().with_offset(offset.clone()).build();
assert_eq!(
start_position.location,
StartLocation::Offset(offset.clone())
);
assert_eq!(
"amqp.annotation.x-opt-offset >'12345'",
StartPosition::start_expression(&Some(start_position)),
);
let start_position = StartPosition::builder()
.with_offset(offset.clone())
.inclusive()
.build();
assert_eq!(
"amqp.annotation.x-opt-offset >='12345'",
StartPosition::start_expression(&Some(start_position)),
);
}
#[test]
fn test_start_position_builder_inclusive() {
setup();
let start_position = StartPosition::builder().inclusive().build();
assert!(start_position.inclusive);
let start_position = StartPosition::builder().build();
assert!(!start_position.inclusive);
}
}

Просмотреть файл

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All Rights Reserved.
// Licensed under the MIT License.
// cspell: words amqp eventhubs
// cspell: words amqp
use azure_core_amqp::error::Error;
/// Represents the different kinds of errors that can occur in the Eventhubs module.

Просмотреть файл

@ -3,9 +3,8 @@
#![recursion_limit = "128"]
#![warn(missing_docs)]
// cspell: words amqp eventhub eventhubs eventdata
//! #[doc = include_str!("../README.md")]
// cspell: words amqp eventdata
#![doc = include_str!("../README.md")]
/// This module contains the implementation of the Azure Messaging Event Hubs SDK for Rust.
///
@ -33,7 +32,8 @@
///
pub(crate) mod common;
/// Types to consume events from an Event Hub
/// Types related to consuming events from an Event Hub.
#[doc = include_str!("../docs/eh_consumer.md")]
pub mod consumer;
/// Types related to errors processing events.
@ -269,139 +269,6 @@ pub mod models {
}
}
#[derive(Debug, Default, PartialEq, Clone)]
pub(crate) enum StartLocation {
Offset(String),
SequenceNumber(i64),
EnqueuedTime(std::time::SystemTime),
Earliest,
#[default]
Latest,
}
const ENQUEUED_TIME_ANNOTATION: &str = "amqp.annotation.x-opt-enqueued-time";
const OFFSET_ANNOTATION: &str = "amqp.annotation.x-opt-offset";
const SEQUENCE_NUMBER_ANNOTATION: &str = "amqp.annotation.x-opt-sequence-number";
/// Represents the starting position of a consumer when receiving events from an Event Hub.
///
/// This enum provides different ways to specify the starting position of a consumer when receiving events from an Event Hub.
/// The starting position can be specified using an offset, a sequence number, an enqueued time, or the earliest or latest event in the partition.
///
/// The default starting position is the latest event in the partition (always receive new events).
///
/// # Examples
///
/// Basic usage:
///
/// ```no_run
/// use azure_messaging_eventhubs::models::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_sequence_number(12345)
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::models::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_sequence_number(12345)
/// .inclusive()
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::models::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_enqueued_time(std::time::SystemTime::now())
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::models::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_offset("12345".to_string())
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::models::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_earliest_location()
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::models::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .with_latest_location()
/// .build();
/// ```
///
/// ```no_run
/// use azure_messaging_eventhubs::models::StartPosition;
///
/// let start_position = StartPosition::builder()
/// .build();
/// ```
///
#[derive(Debug, PartialEq, Clone, Default)]
pub struct StartPosition {
location: StartLocation,
inclusive: bool,
}
impl StartPosition {
/// Creates a new builder to build a `StartPosition`.
///
/// # Returns
///
/// A builder which can be used to create a StartPosition.
///
pub fn builder() -> builders::StartPositionBuilder {
builders::StartPositionBuilder::new()
}
pub(crate) fn start_expression(position: &Option<StartPosition>) -> String {
if let Some(position) = position {
let mut greater_than: &str = ">";
if position.inclusive {
greater_than = ">=";
}
match &position.location {
StartLocation::Offset(offset) => {
format!("{} {}'{}'", OFFSET_ANNOTATION, greater_than, offset)
}
StartLocation::SequenceNumber(sequence_number) => {
format!(
"{} {}'{}'",
SEQUENCE_NUMBER_ANNOTATION, greater_than, sequence_number
)
}
StartLocation::EnqueuedTime(enqueued_time) => {
let enqueued_time = enqueued_time
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();
format!(
"{} {}'{}'",
ENQUEUED_TIME_ANNOTATION, greater_than, enqueued_time
)
}
StartLocation::Earliest => "amqp.annotation.x-opt-offset > '-1'".to_string(),
StartLocation::Latest => "amqp.annotation.x-opt-offset > '@latest'".to_string(),
}
} else {
"amqp.annotation.x-opt-offset > '@latest'".to_string()
}
}
}
/// The EventData struct represents the data associated with an event in an Event Hub.
///
/// This struct provides the body, content type, correlation identifier, message identifier, and properties of an event.
@ -806,251 +673,6 @@ pub mod models {
self.event_data
}
}
/// A builder for the `StartPosition` struct.
pub struct StartPositionBuilder {
position: StartPosition,
}
impl StartPositionBuilder {
pub(super) fn new() -> Self {
Self {
position: Default::default(),
}
}
/// Sets the starting position to the earliest event in the partition.
///
/// # Returns
///
/// A reference to the updated builder.
///
pub fn with_earliest_location(mut self) -> Self {
self.position.location = StartLocation::Earliest;
self
}
/// Sets the starting position to the latest event in the partition.
///
/// # Returns
///
/// A reference to the updated builder.
///
pub fn with_latest_location(mut self) -> Self {
self.position.location = StartLocation::Latest;
self
}
/// Sets the starting position to the event with the specified sequence number.
///
/// # Parameters
///
/// - `sequence_number`: The sequence number to start receiving events.
///
/// # Returns
///
/// A reference to the updated builder.
///
/// # Remarks:
///
/// If the "inclusive" method is not called, the starting position will be greater than the specified sequence number.
/// If the "inclusive" method is called, the message at the starting sequence number will be included.
///
pub fn with_sequence_number(mut self, sequence_number: i64) -> Self {
self.position.location = StartLocation::SequenceNumber(sequence_number);
self
}
/// Sets the starting position to the event enqueued at the specified time.
///
/// # Parameters
///
/// - `enqueued_time`: The time when the event was enqueued.
///
/// # Returns
///
/// A reference to the updated builder.
///
/// # Remarks
///
/// If the "inclusive" method is not called, the starting position will be greater than the specified enqueued time.
/// If the "inclusive" method is called, the message enqueued at the specified time will be included.
///
pub fn with_enqueued_time(mut self, enqueued_time: std::time::SystemTime) -> Self {
self.position.location = StartLocation::EnqueuedTime(enqueued_time);
self
}
/// Sets the starting position to the event with the specified offset.
///
/// # Parameters
///
/// - `offset`: The offset of the event.
///
/// # Returns
///
/// A reference to the updated builder.
///
/// # Remarks
///
/// If the "inclusive" method is not called, the starting position will be greater than the specified offset.
/// If the "inclusive" method is called, the message at the specified offset will be included.
///
pub fn with_offset(mut self, offset: String) -> Self {
self.position.location = StartLocation::Offset(offset);
self
}
/// Sets the starting position to be inclusive.
///
/// # Returns
///
/// A reference to the updated builder.
///
/// # Remarks
///
/// If this method is called, the message at the starting position will be included.
///
pub fn inclusive(mut self) -> Self {
self.position.inclusive = true;
self
}
/// Builds the `StartPosition`.
///
/// # Returns
///
/// The built `StartPosition`.
///
pub fn build(self) -> StartPosition {
self.position
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tracing::info;
static INIT_LOGGING: std::sync::Once = std::sync::Once::new();
#[test]
fn setup() {
INIT_LOGGING.call_once(|| {
println!("Setting up test logger...");
tracing_subscriber::fmt::init();
});
}
#[test]
fn test_start_position_builder_with_sequence_number() {
setup();
let sequence_number = 12345i64;
let start_position = StartPosition::builder()
.with_sequence_number(sequence_number)
.build();
assert_eq!(
start_position.location,
StartLocation::SequenceNumber(sequence_number)
);
assert_eq!(
StartPosition::start_expression(&Some(start_position)),
"amqp.annotation.x-opt-sequence-number >'12345'"
);
let start_position = StartPosition::builder()
.with_sequence_number(sequence_number)
.inclusive()
.build();
assert_eq!(
StartPosition::start_expression(&Some(start_position)),
"amqp.annotation.x-opt-sequence-number >='12345'"
);
}
#[test]
fn test_start_position_builder_with_enqueued_time() {
setup();
let enqueued_time = std::time::SystemTime::now();
let start_position = StartPosition::builder()
.with_enqueued_time(enqueued_time)
.build();
info!("enqueued_time: {:?}", enqueued_time);
info!(
"enqueued_time: {:?}",
enqueued_time.duration_since(std::time::UNIX_EPOCH)
);
info!(
"enqueued_time: {:?}",
enqueued_time
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
);
assert_eq!(
start_position.location,
StartLocation::EnqueuedTime(enqueued_time)
);
assert_eq!(
StartPosition::start_expression(&Some(start_position)),
format!(
"amqp.annotation.x-opt-enqueued-time >'{}'",
enqueued_time
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
)
);
let start_position = StartPosition::builder()
.with_enqueued_time(enqueued_time)
.inclusive()
.build();
assert_eq!(
StartPosition::start_expression(&Some(start_position)),
format!(
"amqp.annotation.x-opt-enqueued-time >='{}'",
enqueued_time
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
)
);
}
#[test]
fn test_start_position_builder_with_offset() {
setup();
let offset = "12345".to_string();
let start_position = StartPosition::builder().with_offset(offset.clone()).build();
assert_eq!(
start_position.location,
StartLocation::Offset(offset.clone())
);
assert_eq!(
"amqp.annotation.x-opt-offset >'12345'",
StartPosition::start_expression(&Some(start_position)),
);
let start_position = StartPosition::builder()
.with_offset(offset.clone())
.inclusive()
.build();
assert_eq!(
"amqp.annotation.x-opt-offset >='12345'",
StartPosition::start_expression(&Some(start_position)),
);
}
#[test]
fn test_start_position_builder_inclusive() {
setup();
let start_position = StartPosition::builder().inclusive().build();
assert!(start_position.inclusive);
let start_position = StartPosition::builder().build();
assert!(!start_position.inclusive);
}
}
}

Просмотреть файл

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
// cspell: words amqp eventhubs
// cspell: words amqp
use std::sync::Mutex;

Просмотреть файл

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
//cspell: words amqp eventhub amqps servicebus eventhubs mgmt
//cspell: words amqp amqps servicebus mgmt
use azure_core_amqp::{
cbs::{AmqpClaimsBasedSecurity, AmqpClaimsBasedSecurityApis},

Просмотреть файл

@ -1,15 +1,14 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
//cspell: words eventhubs eventhub eventdata
//cspell: words eventdata
#![cfg(all(test, feature = "test_e2e"))] // to run this, do: `cargo test --features test_e2e`
use async_std::future::timeout;
use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
use azure_messaging_eventhubs::{
consumer::{ConsumerClient, ConsumerClientOptions, ReceiveOptions},
models::StartPosition,
use azure_messaging_eventhubs::consumer::{
ConsumerClient, ConsumerClientOptions, ReceiveOptions, StartPosition,
};
use futures::{pin_mut, StreamExt};
use std::{env, time::Duration};

Просмотреть файл

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
//cspell: words eventhubs eventhub eventdata amqp
//cspell: words eventdata amqp
#![cfg(all(test, feature = "test_e2e"))] // to run this, do: `cargo test --features test_e2e`

Просмотреть файл

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.
//cspell: words eventhubs eventhub eventdata amqp
//cspell: words eventdata amqp
#![cfg(all(test, feature = "test_e2e"))] // to run this, do: `cargo test --features test_e2e`
@ -12,8 +12,8 @@ use azure_core_amqp::{
};
use azure_identity::{DefaultAzureCredential, TokenCredentialOptions};
use azure_messaging_eventhubs::{
consumer::{ConsumerClient, ConsumerClientOptions, ReceiveOptions},
models::{EventData, MessageId, StartPosition},
consumer::{ConsumerClient, ConsumerClientOptions, ReceiveOptions, StartPosition},
models::{EventData, MessageId},
producer::{batch::EventDataBatchOptions, ProducerClient, ProducerClientOptions},
};
use futures::pin_mut;