azure-sdk-for-rust/sdk/eventhubs/azure_messaging_eventhubs
Larry Osterman 54db2ca594
Removed builders from AMQP (#1906)
* Removed builders from AMQP

* Removed dead code

* Removed buiolder from MessageHeader and MessageProperties
2024-11-08 14:03:55 -08:00
..
examples Removed builders from eventhubs; removed impl Into<String> from eventhubs and AMQP; Fixed time conversion issue with 1-1-0001 (#1892) 2024-11-08 10:06:26 -08:00
src Removed builders from AMQP (#1906) 2024-11-08 14:03:55 -08:00
tests Removed builders from AMQP (#1906) 2024-11-08 14:03:55 -08:00
Cargo.toml Better centralize and remove unused dependencies (#1798) 2024-09-12 09:19:02 -07:00
README.md Removed builders from eventhubs; removed impl Into<String> from eventhubs and AMQP; Fixed time conversion issue with 1-1-0001 (#1892) 2024-11-08 10:06:26 -08:00
build.rs
test-resources-post.ps1
test-resources.json

README.md

Azure Event Hubs Client Package for Rust

Azure Event Hubs crate for the Microsoft Azure SDK for Rust.

Azure Event Hubs is a big data streaming platform and event ingestion service from Microsoft. For more information about Event Hubs see: link.

Use the client library azure_messaging_eventhubs in your application to:

  • Send events to an event hub.
  • Consume events from an event hub.

Key links:

Getting started

Install the package

Add the Azure Event Hubs client package for rust to your cargo.toml file:

cargo add azure_messaging_eventhubs

Prerequisites

Create a namespace using the Azure CLI

Login to the CLI:

az login

Create a resource group:

az group create --name <your group name> --location <your location> --subscription <your subscription>

This should output something like:

{
    "id": "/subscriptions/<your subscription ID>/resourceGroups/<your group name>",
    "location": "<your location>",
    "managedBy": null,
    "name": "<yourgroup name>",
    "properties": {
        "provisioningState": "Succeeded"
    },
    "tags": null,
    "type": "Microsoft.Resources/resourceGroups"
}

Create an Event Hubs namespace:

 az eventhubs namespace create --resource-group <your group name> --name <your namespace name> --sku Standard  --subscription <your subscription>

This should output something like:

{
    "createdAt": "2023-08-10T18:41:54.19Z",
    "disableLocalAuth": false,
    "id": "/subscriptions/<your subscription ID>/resourceGroups/<your group name>/providers/Microsoft.EventHub/namespaces/<your namespace>",
    "isAutoInflateEnabled": false,
    "kafkaEnabled": true,
    "location": "West US",
    "maximumThroughputUnits": 0,
    "metricId": "REDACTED",
    "minimumTlsVersion": "1.2",
    "name": "<your namespace name>",
    "provisioningState": "Succeeded",
    "publicNetworkAccess": "Enabled",
    "resourceGroup": "<your resource group>",
    "serviceBusEndpoint": "https://<your namespace name>.servicebus.windows.net:443/",
    "sku": {
        "capacity": 1,
        "name": "Standard",
        "tier": "Standard"
    },
    "status": "Active",
    "tags": {},
    "type": "Microsoft.EventHub/Namespaces",
    "updatedAt": "2023-08-10T18:42:41.343Z",
    "zoneRedundant": false
}

Create an EventHub:

az eventhubs eventhub create --resource-group <your resource group> --namespace-name <your namespace name> --name <your eventhub name>

That should output something like:

{
    "createdAt": "2023-08-10T21:02:07.62Z",
    "id": "/subscriptions/<your subscription>/resourceGroups/<your group name>/providers/Microsoft.EventHub/namespaces/<your namespace name>/eventhubs/<your eventhub name>",
    "location": "westus",
    "messageRetentionInDays": 7,
    "name": "<your eventhub name>",
    "partitionCount": 4,
    "partitionIds": ["0", "1", "2", "3"],
    "resourceGroup": "<your group name>",
    "retentionDescription": {
        "cleanupPolicy": "Delete",
        "retentionTimeInHours": 168
    },
    "status": "Active",
    "type": "Microsoft.EventHub/namespaces/eventhubs",
    "updatedAt": "2023-08-10T21:02:16.29Z"
}

Authenticate the client

Event Hub clients are created using a credential from the Azure Identity package, like DefaultAzureCredential.

Key concepts

An Event Hub namespace can have multiple event hubs. Each event hub, in turn, contains partitions which store events.

Events are published to an event hub using an event publisher. In this package, the event publisher is the ProducerClient

Events can be consumed from an event hub using an event consumer. In this package there are two types for consuming events:

  • The basic event consumer is the PartitionClient, in the ConsumerClient. This consumer is useful if you already known which partitions you want to receive from.
  • A distributed event consumer, which uses Azure Blobs for checkpointing and coordination. This is implemented in the Processor. The Processor is useful when you want to have the partition assignment be dynamically chosen, and balanced with other Processor instances.

More information about Event Hubs features and terminology can be found here: link

Examples

Examples for various scenarios can be found on in the samples directory in our GitHub repo for Event Hubs.

Send events

The following example shows how to send events to an event hub:

async fn send_events() {
    let host = std::env::var("EVENTHUBS_HOST").unwrap();
    let eventhub = std::env::var("EVENTHUB_NAME").unwrap();

    let credential = azure_identity::DefaultAzureCredential::new().unwrap();

    let client = azure_messaging_eventhubs::producer::ProducerClient::new(
        host,
        eventhub.clone(),
        credential,
        Some(azure_messaging_eventhubs::producer::ProducerClientOptions{
          application_id: Some("test_create_batch".to_string()),
          ..Default::default()
        }),
      );
    client.open().await.unwrap();
    {
        let mut batch = client.create_batch(None).await.unwrap();
        assert_eq!(batch.len(), 0);
        assert!(batch.try_add_event_data(vec![1, 2, 3, 4], None).unwrap());

        let res = client.submit_batch(&batch).await;
        assert!(res.is_ok());
    }
}

Receive events

The following example shows how to receive events from partition 1 on an event hub:

async fn receive_events() {
    use futures::pin_mut;
    use async_std::stream::StreamExt;

    let host = std::env::var("EVENTHUBS_HOST").unwrap();
    let eventhub = std::env::var("EVENTHUB_NAME").unwrap();

    let credential = azure_identity::DefaultAzureCredential::new().unwrap();
    let client = azure_messaging_eventhubs::consumer::ConsumerClient::new(
        host,
        eventhub,
        None,
        credential,
        Some(
            azure_messaging_eventhubs::consumer::ConsumerClientOptions{
                application_id: Some("receive_lots_of_events".to_string()),
                ..Default::default()}
        ),
    );

    client.open().await.unwrap();

    let event_stream = client
        .receive_events_on_partition(
            "0".to_string(),
            Some(
                azure_messaging_eventhubs::consumer::ReceiveOptions{
                    start_position: Some(azure_messaging_eventhubs::consumer::StartPosition{
                        location: azure_messaging_eventhubs::consumer::StartLocation::Earliest,
                        ..Default::default()
                    }),
                    ..Default::default()
                },
            ))
        .await;

    pin_mut!(event_stream); // Needed for iteration.
    while let Some(event_result) = event_stream.next().await {
        match event_result {
            Ok(event) => {
                // Process the received event
                println!("Received event: {:?}", event);
            }
            Err(err) => {
                // Handle the error
                eprintln!("Error receiving event: {:?}", err);
            }
        }
    }
}

Troubleshooting

Logging

The Event Hubs SDK client uses the tracing package to enable diagnostics.

Contributing

For details on contributing to this repository, see the contributing guide.

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit the contributor license agreement page.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Many people all over the world have helped make this project better. You'll want to check out:

Reporting security issues and security bugs

Security issues and bugs should be reported privately, via email, to the Microsoft Security Response Center (MSRC) secure@microsoft.com. You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Further information, including the MSRC PGP key, can be found in the Security TechCenter.

License

Azure SDK for C++ is licensed under the MIT license.

Impressions