* Added link credit support to message sender and receiver; reworked message receiver creation to move to the partition client object

* Added round trip test, including filters; fixed a hang with the consumer client caused by no messages; fixed checkpoint store storage names.

* EventDataBatch is created from EventProducer. Fixes #4868

* SendEventDataBatch renamed to Send; pass in Context on methods which require a context

* Added producer client send APIs without requiring an explicit event data batch

* close message receiver in destructor if it is open

* Consumer client and producer client only create message sender and receiver when needed
This commit is contained in:
Larry Osterman 2023-08-17 09:52:54 -07:00 коммит произвёл GitHub
Родитель 0a4d7009f4
Коммит b108bf6235
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
39 изменённых файлов: 1081 добавлений и 452 удалений

Просмотреть файл

@ -370,6 +370,11 @@
"inherits": [ "linux-basic-g++", "debug-build", "enable-tests" ],
"displayName": "Linux c++ Debug+Tests"
},
{
"name": "linux-g++-debug-tests-samples",
"inherits": [ "linux-basic-g++", "debug-build", "enable-tests", "enable-samples" ],
"displayName": "Linux c++ Debug+Tests, samples"
},
{
"name": "generate-doxygen",
"displayName": "zz-Generate Doxygen",

Просмотреть файл

@ -8,6 +8,8 @@
### Bugs Fixed
- When a message sender is destroyed, close the underlying AMQP link if it hasn't been closed already.
### Other Changes
## 1.0.0-beta.2 (2023-08-04)

Просмотреть файл

@ -74,6 +74,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
/** @brief The Maximum message size for the link associated with the message receiver. */
Nullable<uint64_t> MaxMessageSize;
/** @brief The default link credit used when communicating with the service. The link credit
* defines the maximum number of messages which can be outstanding between the service and the
* client. */
uint32_t MaxLinkCredit{};
/** @brief Attach properties for the link associated with the message receiver. */
Models::AmqpMap Properties;

Просмотреть файл

@ -90,6 +90,17 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
/** @brief The Maximum message size for the link associated with the message sender. */
Nullable<uint64_t> MaxMessageSize;
/** @brief The link maximum credits.
*
* Each message sent over a link reduces the link-credit by one. When the link-credit reaches
* zero, no more messages can be sent until the sender receives a disposition indicating that at
* least one message has been settled. The sender MAY send as many messages as it likes before
* receiving a disposition, but it MUST NOT send more messages than the link-credit. The sender
* MUST NOT send any messages after sending a disposition that indicates an error.
*
*/
uint32_t MaxLinkCredits{};
/** @brief The initial delivery count for the link associated with the message.
*
* The delivery-count is initialized by the sender when a link endpoint is created, and is
@ -130,6 +141,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
void Close();
/** @brief Returns the link negotiated maximum message size
*
* @return The negotiated maximum message size.
*/
std::uint64_t GetMaxMessageSize() const;
/** @brief Send a message synchronously to the target of the message sender.
*
* @param message The message to send.

Просмотреть файл

@ -133,6 +133,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
m_link->SetMaxMessageSize(std::numeric_limits<uint64_t>::max());
}
if (m_options.MaxLinkCredit != 0)
{
m_link->SetMaxLinkCredit(m_options.MaxLinkCredit);
}
m_link->SetAttachProperties(static_cast<Models::AmqpValue>(m_options.Properties));
}
@ -194,6 +198,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
m_eventHandler = nullptr;
}
if (m_receiverOpen)
{
Close();
}
}
MessageReceiverState MessageReceiverStateFromLowLevel(MESSAGE_RECEIVER_STATE lowLevel)
@ -304,6 +312,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
"Could not open message receiver. errno=" + std::to_string(err) + ", \"" + buf + "\".");
// LCOV_EXCL_STOP
}
m_receiverOpen = true;
}
void MessageReceiverImpl::Close()
@ -312,6 +321,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
throw std::runtime_error("Could not close message receiver"); // LCOV_EXCL_LINE
}
m_receiverOpen = false;
}
std::string MessageReceiverImpl::GetLinkName() const

Просмотреть файл

@ -51,6 +51,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
return m_impl->QueueSend(message, onSendComplete, context);
}
std::uint64_t MessageSender::GetMaxMessageSize() const { return m_impl->GetMaxMessageSize(); }
MessageSender::~MessageSender() noexcept {}
}}}} // namespace Azure::Core::Amqp::_internal
@ -107,6 +109,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
m_target);
PopulateLinkProperties();
}
void MessageSenderImpl::CreateLink()
{
m_link = std::make_shared<_detail::LinkImpl>(
@ -149,9 +152,15 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
{
m_link->SetMaxMessageSize(std::numeric_limits<uint64_t>::max());
}
if (m_options.MaxLinkCredits != 0)
{
m_link->SetMaxLinkCredit(m_options.MaxLinkCredits);
}
m_link->SetSenderSettleMode(m_options.SettleMode);
}
std::uint64_t MessageSenderImpl::GetMaxMessageSize() const { return m_link->GetMaxMessageSize(); }
_internal::MessageSenderState MessageSenderStateFromLowLevel(MESSAGE_SENDER_STATE lowLevel)
{
switch (lowLevel)

Просмотреть файл

@ -67,13 +67,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
void Close();
std::string GetLinkName() const;
std::string GetSourceName() const { return static_cast<std::string>(m_source.GetAddress()); }
uint32_t GetReceivedMessageId();
std::pair<Azure::Nullable<Models::AmqpMessage>, Models::_internal::AmqpError>
WaitForIncomingMessage(Context const& context);
private:
UniqueMessageReceiver m_messageReceiver{};
bool m_receiverOpen{false};
std::shared_ptr<_detail::LinkImpl> m_link;
_internal::MessageReceiverOptions m_options;
Models::_internal::MessageSource m_source;

Просмотреть файл

@ -62,6 +62,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Azure::Core::Amqp::_internal::MessageSender::MessageSendCompleteCallback onSendComplete,
Context const& context);
std::uint64_t GetMaxMessageSize() const;
private:
static void OnMessageSenderStateChangedFn(
void* context,

Просмотреть файл

@ -1,7 +0,0 @@
# Azure Messaging Librares for C++
The Azure Messaging EventHubs Client Library for C++ allows you to build applications against Microsoft Azure Messaging EventHubs service. For an overview of Azure Messaging, see [Introduction to Microsoft Azure Messaging](https://learn.microsoft.com/azure/messaging-services/).
## Latest release
Find the latest Messaging clients for C++ releases [here](https://azure.github.io/azure-sdk/releases/latest/cpp.html).

Просмотреть файл

@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "cpp",
"TagPrefix": "cpp/eventhubs",
"Tag": "cpp/eventhubs_3d146641d9"
"Tag": "cpp/eventhubs_ea4655bf2e"
}

Просмотреть файл

@ -4,12 +4,25 @@
### Features Added
- `ProducerClient` now has convenience methods for sending events without batching.
- Added `std::ostream` insertion operators for model types to simplify debugging.
### Breaking Changes
- Storage names used for checkpoint store have been normalized to match behavior of other Azure SDK eventhubs packages.
- `EventDataBatch` object can no longer be directly created but instead must be created via `ProducerClient::CreateEventDataBatch`.
- `EventDataBatch::AddMessage` method has been renamed to `EventDataBatch::TryAddMessage` and it now returns false if the message will not fit.
- `SendEventDataBatch` method has been renamed to `Send` and it now returns a void (throwing an exception of the send fails).
### Bugs Fixed
- Setting `PartitionClientOptions::StartPosition::EnqueuedTime` now works as expected.
- Internally restructured how AMQP senders and receivers are configured to simplify code and significantly improve reliability.
### Other Changes
- Azure CLI examples added to README.md file.
## 1.0.0-beta.1 (2023-08-08)
### Features Added

Просмотреть файл

@ -31,6 +31,103 @@ vcpkg install azure-messaging-eventhubs-cpp
- An [Event Hub namespace](https://docs.microsoft.com/azure/event-hubs/).
- An Event Hub. You can create an event hub in your Event Hubs Namespace using the [Azure Portal](https://docs.microsoft.com/azure/event-hubs/event-hubs-create), or the [Azure CLI](https://docs.microsoft.com/azure/event-hubs/event-hubs-quickstart-cli).
#### Create a namespace using the Azure CLI
Login to the CLI:
```pwsh
az login
```
Create a resource group:
```pwsh
az group create --name <your group name> --location <your location> --subscription <your subscription>
```
This should output something like:
```json
{
"id": "/subscriptions/<your subscription ID>/resourceGroups/<your group name>",
"location": "<your location>",
"managedBy": null,
"name": "<yourgroup name>",
"properties": {
"provisioningState": "Succeeded"
},
"tags": null,
"type": "Microsoft.Resources/resourceGroups"
}
```
Create an EventHubs instance:
```pwsh
az eventhubs namespace create --resource-group <your group name> --name <your namespace name> --sku Standard --subscription <your subscription>
```
This should output something like:
```json
{
"createdAt": "2023-08-10T18:41:54.19Z",
"disableLocalAuth": false,
"id": "/subscriptions/<your subscription ID>/resourceGroups/<your group name>/providers/Microsoft.EventHub/namespaces/<your namespace>",
"isAutoInflateEnabled": false,
"kafkaEnabled": true,
"location": "West US",
"maximumThroughputUnits": 0,
"metricId": "REDACTED",
"minimumTlsVersion": "1.2",
"name": "<your namespace name>",
"provisioningState": "Succeeded",
"publicNetworkAccess": "Enabled",
"resourceGroup": "<your resource group>",
"serviceBusEndpoint": "https://<your namespace name>.servicebus.windows.net:443/",
"sku": {
"capacity": 1,
"name": "Standard",
"tier": "Standard"
},
"status": "Active",
"tags": {},
"type": "Microsoft.EventHub/Namespaces",
"updatedAt": "2023-08-10T18:42:41.343Z",
"zoneRedundant": false
}
```
Create an EventHub:
```pwsh
az eventhubs eventhub create --resource-group <your resource group> --namespace-name <your namespace name> --name <your eventhub name>
```
That should output something like:
```json
{
"createdAt": "2023-08-10T21:02:07.62Z",
"id": "/subscriptions/<your subscription>/resourceGroups/<your group name>/providers/Microsoft.EventHub/namespaces/<your namespace name>/eventhubs/<your eventhub name>",
"location": "westus",
"messageRetentionInDays": 7,
"name": "<your eventhub name>",
"partitionCount": 4,
"partitionIds": [
"0",
"1",
"2",
"3"
],
"resourceGroup": "<your group name>",
"retentionDescription": {
"cleanupPolicy": "Delete",
"retentionTimeInHours": 168
},
"status": "Active",
"type": "Microsoft.EventHub/namespaces/eventhubs",
"updatedAt": "2023-08-10T21:02:16.29Z"
}
```
### Authenticate the client
Event Hub clients are created using a credential from the [Azure Identity package][azure_identity_pkg], like [DefaultAzureCredential][default_azure_credential].
@ -38,8 +135,8 @@ Alternatively, you can create a client using a connection string.
<!-- NOTE: Fix dead Links -->
#### Using a service principal
- ConsumerClient: [link](https://azure.github.io/azure-sdk-for-cpp/storage.html)
- ProducerClient: [link](https://azure.github.io/azure-sdk-for-cpp/storage.html)
- ConsumerClient: [link][consumer_client]
- ProducerClient: [link][producer_client]
<!-- NOTE: Fix dead links -->
#### Using a connection string
@ -56,7 +153,7 @@ store events.
Events are published to an event hub using an [event publisher](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#event-publishers). In this package, the event publisher is the [ProducerClient](https://azure.github.io/azure-sdk-for-cpp/storage.html)
Events can be consumed from an event hub using an [event consumer](https://docs.microsoft.com/azure/event-hubs/event-hubs-features#event-consumers). In this package there are two types for consuming events:
- The basic event consumer is the PartitionClient, in the [ConsumerClient](https://azure.github.io/azure-sdk-for-cpp/storage.html). This consumer is useful if you already known which partitions you want to receive from.
- The basic event consumer is the PartitionClient, in the [ConsumerClient][consumer_client]. This consumer is useful if you already known which partitions you want to receive from.
- A distributed event consumer, which uses Azure Blobs for checkpointing and coordination. This is implemented in the [Processor](https://azure.github.io/azure-sdk-for-cpp/storage.html).
The Processor is useful when you want to have the partition assignment be dynamically chosen, and balanced with other Processor instances.
@ -65,15 +162,16 @@ More information about Event Hubs features and terminology can be found here: [l
# Examples
<!-- NOTE: Fix dead links -->
Examples for various scenarios can be found on [azure.github.io](https://azure.github.io/azure-sdk-for-cpp/storage.html) or in the samples directory in our GitHub repo for
[EventHubs](https://github.com/Azure/azure-sdk-for-cpp/blob/main/sdk/eventhubs).
Examples for various scenarios can be found on [azure.github.io](https://azure.github.io/azure-sdk-for-cpp/eventhubs.html) or in the samples directory in our GitHub repo for
[EventHubs](https://github.com/Azure/azure-sdk-for-cpp/tree/main/sdk/eventhubs/azure-messaging-eventhubs/samples).
## Send events
The following example shows how to send events to an event hub:
```cpp
#include <azure/messaging/eventhubs.hpp>
// Your Event Hubs namespace connection string is available in the Azure portal.
std::string connectionString = "<connection_string>";
std::string eventHubName = "<event_hub_name>";
@ -101,6 +199,9 @@ auto result = client.SendEventDataBatch(eventBatch);
The following example shows how to receive events from partition 1 on an event hub:
```cpp
#include <azure/messaging/eventhubs.hpp>
// Your Event Hubs namespace connection string is available in the Azure portal.
std::string connectionString = "<connection_string>";
std::string eventHubName = "<event_hub_name>";
@ -160,6 +261,9 @@ Azure SDK for C++ is licensed under the [MIT](https://github.com/Azure/azure-sdk
[azure_sdk_for_cpp_contributing_developer_guide]: https://github.com/Azure/azure-sdk-for-cpp/blob/main/CONTRIBUTING.md#developer-guide
[azure_sdk_for_cpp_contributing_pull_requests]: https://github.com/Azure/azure-sdk-for-cpp/blob/main/CONTRIBUTING.md#pull-requests
[consumer_client]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-messaging-eventhubs/latest/class_azure_1_1_messaging_1_1_event_hubs_1_1_consumer_client.html
[producer_client]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-messaging-eventhubs/1.0.0-beta.1/class_azure_1_1_messaging_1_1_event_hubs_1_1_producer_client.html
[source]: https://github.com/Azure/azure-sdk-for-cpp/tree/main/sdk/eventhubs
[azure_identity_pkg]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-identity/latest/index.html
[default_azure_credential]: https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-identity/latest/index.html#defaultazurecredential

Просмотреть файл

@ -32,9 +32,6 @@ namespace Azure { namespace Messaging { namespace EventHubs {
*/
Azure::Core::Http::Policies::RetryOptions RetryOptions{};
/** @brief Maximum message size for messages being sent. */
Azure::Nullable<std::uint64_t> MaxMessageSize;
/** @brief Name of the consumer client. */
std::string Name{};
};
@ -133,10 +130,12 @@ namespace Azure { namespace Messaging { namespace EventHubs {
*
* @param partitionId targeted partition
* @param options client options
* @param context The context for the operation can be used for request cancellation.
*/
PartitionClient CreatePartitionClient(
std::string partitionId,
PartitionClientOptions const& options = {});
std::string const& partitionId,
PartitionClientOptions const& options = {},
Azure::Core::Context const& context = {});
/**@brief GetEventHubProperties gets properties of an eventHub. This includes data
* like name, and partitions.
@ -156,6 +155,9 @@ namespace Azure { namespace Messaging { namespace EventHubs {
Core::Context const& context = {});
private:
void EnsureSession(std::string const& partitionId = {});
Azure::Core::Amqp::_internal::Session GetSession(std::string const& partitionId = {});
/// The connection string for the Event Hubs namespace
std::string m_connectionString;
@ -181,7 +183,5 @@ namespace Azure { namespace Messaging { namespace EventHubs {
/// @brief The options used to configure the consumer client.
ConsumerClientOptions m_consumerClientOptions;
std::string GetStartExpression(Models::StartPosition const& startPosition);
};
}}} // namespace Azure::Messaging::EventHubs

Просмотреть файл

@ -13,6 +13,10 @@
// cspell: words vbin
namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail {
class EventDataBatchFactory;
}}}} // namespace Azure::Messaging::EventHubs::_detail
namespace Azure { namespace Messaging { namespace EventHubs {
/** @brief EventDataBatchOptions contains optional parameters for the
@ -27,7 +31,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
/** @brief MaxBytes overrides the max size (in bytes) for a batch.
* By default CreateEventDataBatch will use the max message size provided by the service.
*/
uint32_t MaxBytes = std::numeric_limits<int32_t>::max();
Azure::Nullable<std::uint64_t> MaxBytes;
/** @brief PartitionKey is hashed to calculate the partition assignment.Messages and message
* batches with the same PartitionKey are guaranteed to end up in the same partition.
@ -54,7 +58,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
std::mutex m_rwMutex;
std::string m_partitionId;
std::string m_partitionKey;
uint64_t m_maxBytes;
Azure::Nullable<std::uint64_t> m_maxBytes;
std::vector<std::vector<uint8_t>> m_marshalledMessages;
// Annotation properties
const uint32_t BatchedMessageFormat = 0x80013700;
@ -89,26 +93,6 @@ namespace Azure { namespace Messaging { namespace EventHubs {
return *this;
}
/** @brief Event Data Batch constructor
*
* @param options Options settings for creating the data batch
*/
EventDataBatch(EventDataBatchOptions options = {})
: m_partitionId{options.PartitionId}, m_partitionKey{options.PartitionKey},
m_maxBytes{options.MaxBytes ? options.MaxBytes : std::numeric_limits<uint16_t>::max()},
m_marshalledMessages{}, m_batchEnvelope{}, m_currentSize{0}
{
if (!options.PartitionId.empty() && !options.PartitionKey.empty())
{
throw std::runtime_error("Either PartionID or PartitionKey can be set, but not both.");
}
if (options.PartitionId.empty())
{
m_partitionId = anyPartitionId;
}
};
/** @brief Gets the partition ID for the data batch
*
* @return std::string
@ -124,19 +108,26 @@ namespace Azure { namespace Messaging { namespace EventHubs {
*
* @return uint64_t
*/
uint64_t GetMaxBytes() const { return m_maxBytes; }
uint64_t GetMaxBytes() const { return m_maxBytes.Value(); }
/** @brief Adds a message to the data batch
/** @brief Attempts to add a raw AMQP message to the data batch
*
* @param message The AMQP message to add to the batch
*
* @returns true if the message was added to the batch, false otherwise.
*/
bool TryAddMessage(Azure::Core::Amqp::Models::AmqpMessage const& message)
{
return TryAddAmqpMessage(message);
}
/** @brief Attempts to add a message to the data batch
*
* @param message The message to add to the batch
*/
void AddMessage(Azure::Core::Amqp::Models::AmqpMessage message) { AddAmqpMessage(message); }
/** @brief Adds a message to the data batch
*
* @param message The message to add to the batch
* @returns true if the message was added to the batch, false otherwise.
*/
void AddMessage(Azure::Messaging::EventHubs::Models::EventData& message);
bool TryAddMessage(Azure::Messaging::EventHubs::Models::EventData const& message);
/** @brief Gets the number of messages in the batch
*
@ -155,7 +146,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
Azure::Core::Amqp::Models::AmqpMessage ToAmqpMessage() const;
private:
void AddAmqpMessage(Azure::Core::Amqp::Models::AmqpMessage message);
bool TryAddAmqpMessage(Azure::Core::Amqp::Models::AmqpMessage message);
size_t CalculateActualSizeForPayload(std::vector<uint8_t> const& payload)
{
@ -179,5 +170,26 @@ namespace Azure { namespace Messaging { namespace EventHubs {
batchEnvelope.MessageFormat = BatchedMessageFormat;
return batchEnvelope;
}
/** @brief Event Data Batch constructor
*
* @param options Options settings for creating the data batch
*/
EventDataBatch(EventDataBatchOptions options = {})
: m_partitionId{options.PartitionId}, m_partitionKey{options.PartitionKey},
m_maxBytes{options.MaxBytes}, m_marshalledMessages{}, m_batchEnvelope{}, m_currentSize{0}
{
if (!options.PartitionId.empty() && !options.PartitionKey.empty())
{
throw std::runtime_error("Either PartionID or PartitionKey can be set, but not both.");
}
if (options.PartitionId.empty())
{
m_partitionId = anyPartitionId;
}
};
friend class _detail::EventDataBatchFactory;
};
}}} // namespace Azure::Messaging::EventHubs

Просмотреть файл

@ -62,7 +62,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
* If this field is set to true, then retrying the operation may succeed at a later time.
*
*/
bool IsTransient;
bool IsTransient{};
friend _detail::EventHubsExceptionFactory;
};

Просмотреть файл

@ -41,7 +41,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Models {
/** The sequence number of the last observed event to be enqueued in the partition. */
int64_t LastEnqueuedSequenceNumber{};
/** The offset of the last observed event to be enqueued in the partition */
std::string LastEnqueuedOffset;
int64_t LastEnqueuedOffset{};
/** The date and time, in UTC, that the last observed event was enqueued in the partition. */
Azure::DateTime LastEnqueuedTimeUtc;

Просмотреть файл

@ -9,7 +9,11 @@
#include <azure/core/datetime.hpp>
#include <azure/core/http/policies/policy.hpp>
#include <azure/core/nullable.hpp>
namespace Azure { namespace Messaging { namespace EventHubs {
namespace _detail {
class PartitionClientFactory;
}
/**brief PartitionClientOptions provides options for the ConsumerClient::CreatePartitionClient
* function.
*/
@ -50,24 +54,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
* This type is instantiated from the [ConsumerClient] type, using
* [ConsumerClient.CreatePartitionClient].
*/
class PartitionClient final {
/// The message receivers used to receive events from the partition.
std::vector<Azure::Core::Amqp::_internal::MessageReceiver> m_receivers{};
/// The name of the offset to start receiving events from.
std::string m_offsetExpression;
/// The options used to create the PartitionClient.
PartitionClientOptions m_partitionOptions;
/// The name of the partition.
std::string m_partitionId;
/** @brief RetryOptions controls how many times we should retry an operation in
* response to being throttled or encountering a transient error.
*/
Azure::Core::Http::Policies::RetryOptions RetryOptions{};
class PartitionClient final : private Azure::Core::Amqp::_internal::MessageReceiverEvents {
public:
/// Create a PartitionClient from another PartitionClient
@ -76,6 +63,10 @@ namespace Azure { namespace Messaging { namespace EventHubs {
/// Assign a PartitionClient to another PartitionClient
PartitionClient& operator=(PartitionClient const& other) = default;
/** Destroy this partition client.
*/
virtual ~PartitionClient();
/** Receive events from the partition.
*
* @param maxMessages The maximum number of messages to receive.
@ -89,32 +80,54 @@ namespace Azure { namespace Messaging { namespace EventHubs {
/** @brief Closes the connection to the Event Hub service.
*/
void Close()
{
for (size_t i = 0; i < m_receivers.size(); i++)
{
m_receivers[i].Close();
}
}
void Close() { m_receiver.Close(); }
private:
friend class _detail::PartitionClientFactory;
/// The message receiver used to receive events from the partition.
Azure::Core::Amqp::_internal::MessageReceiver m_receiver;
/// The name of the offset to start receiving events from.
// std::string m_offsetExpression;
/// The options used to create the PartitionClient.
PartitionClientOptions m_partitionOptions;
/// The name of the partition.
// std::string m_partitionId;
/** @brief RetryOptions controls how many times we should retry an operation in
* response to being throttled or encountering a transient error.
*/
Azure::Core::Http::Policies::RetryOptions m_retryOptions{};
// Azure::Core::Amqp::Common::_internal::AsyncOperationQueue<
// Azure::Core::Amqp::Models::AmqpMessage,
// Azure::Core::Amqp::Models::_internal::AmqpError>
// m_receivedMessageQueue;
/** Creates a new PartitionClient
*
* @param options The options used to create the PartitionClient.
* @param retryOptions The retry options used to create the PartitionClient.
*
* @param messageReceiver Message Receiver for the partition client.
* @param options options used to create the PartitionClient.
* @param retryOptions controls how many times we should retry an operation in response to being
* throttled or encountering a transient error.
*/
PartitionClient(
Azure::Core::Amqp::_internal::MessageReceiver const& messageReceiver,
PartitionClientOptions options,
Azure::Core::Http::Policies::RetryOptions retryOptions)
{
m_partitionOptions = options;
RetryOptions = retryOptions;
}
Azure::Core::Http::Policies::RetryOptions retryOptions);
/// @brief Push the message receiver back to the vector of receivers.
void PushBackReceiver(Azure::Core::Amqp::_internal::MessageReceiver& receiver)
{
m_receivers.push_back(std::move(receiver));
}
std::string GetStartExpression(Models::StartPosition const& startPosition);
virtual void OnMessageReceiverStateChanged(
Azure::Core::Amqp::_internal::MessageReceiver const& receiver,
Azure::Core::Amqp::_internal::MessageReceiverState newState,
Azure::Core::Amqp::_internal::MessageReceiverState oldState);
virtual Azure::Core::Amqp::Models::AmqpValue OnMessageReceived(
Azure::Core::Amqp::_internal::MessageReceiver const& receiver,
Azure::Core::Amqp::Models::AmqpMessage const& message);
virtual void OnMessageReceiverDisconnected(
Azure::Core::Amqp::Models::_internal::AmqpError const& error);
};
}}} // namespace Azure::Messaging::EventHubs

Просмотреть файл

@ -63,7 +63,6 @@ namespace Azure { namespace Messaging { namespace EventHubs {
*/
class Processor final {
#ifdef TESTING_BUILD_AMQP
friend class Test::ProcessorTest_LoadBalancing_Test;
#endif

Просмотреть файл

@ -113,14 +113,43 @@ namespace Azure { namespace Messaging { namespace EventHubs {
m_senders.clear();
}
/**@brief Proceeds to send and EventDataBatch
/** @brief Create a new EventDataBatch to be sent to the Event Hub.
*
* @param options Optional batch options
* @param context Context for the operation can be used for request cancellation.
*
* @return newly created EventDataBatch object.
*/
EventDataBatch CreateBatch(
EventDataBatchOptions const& options = {},
Azure::Core::Context const& context = {});
/**@brief Send an EventDataBatch to the remote Event Hub.
*
* @param eventDataBatch Batch to send
* @param context Request context
*/
bool SendEventDataBatch(
EventDataBatch const& eventDataBatch,
Core::Context const& context = {});
void Send(EventDataBatch const& eventDataBatch, Core::Context const& context = {});
/**@brief Send an EventData to the remote Event Hub.
*
* @remark This method will create a new EventDataBatch and add the event to it. If the event
* exceeds the maximum size allowed by the Event Hubs service, an exception will be thrown.
*
* @param eventData event to send
* @param context Request context
*/
void Send(Models::EventData const& eventData, Core::Context const& context = {});
/**@brief Send a vector of EventData items to the remote Event Hub.
*
* @remark This method will create a new EventDataBatch and add the events to it. If the events
* exceeds the maximum size allowed by the Event Hubs service, an exception will be thrown.
*
* @param eventData events to send
* @param context Request context
*/
void Send(std::vector<Models::EventData> const& eventData, Core::Context const& context = {});
/**@brief GetEventHubProperties gets properties of an eventHub. This includes data
* like name, and partitions.
@ -141,7 +170,12 @@ namespace Azure { namespace Messaging { namespace EventHubs {
Core::Context const& context = {});
private:
void EnsureSender(
std::string const& partitionId = "",
Azure::Core::Context const& context = {});
Azure::Core::Amqp::_internal::MessageSender GetSender(std::string const& partitionId = "");
void CreateSender(std::string const& partitionId = "");
void EnsureSession(std::string const& partitionId);
Azure::Core::Amqp::_internal::Session GetSession(std::string const& partitionId = "");
};
}}} // namespace Azure::Messaging::EventHubs

Просмотреть файл

@ -20,6 +20,26 @@ granted access to the eventhubs service instance.
The tests also assume that the currently logged on user is authorized to call
into the Event Hubs service instance because they use [Azure::Core::Credentials::TokenCredential](https://azuresdkdocs.blob.core.windows.net/$web/cpp/azure-core/1.3.1/class_azure_1_1_core_1_1_credentials_1_1_token_credential.html) for authorization.
### Setting Environment Variables
For the samples which use a connection string, the connection string can be retrieved using the Azure CLI with the following:
```pwsh
az eventhubs namespace authorization-rule keys list --resource-group <your resource group> --namespace-name <your namespace name> --name RootManageSharedAccessKey
```
```json
{
"keyName": "RootManageSharedAccessKey",
"primaryConnectionString": "Endpoint=sb://REDACTED.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=REDACTED",
"primaryKey": "REDACTED",
"secondaryConnectionString": "Endpoint=sb://REDACTED.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=REDACTED",
"secondaryKey": "REDACTED"
}
```
The value of the `primaryConnectionString` property should be used as the `EVENTHUBS_CONNECTION_STRING` environment variable.
## Samples

Просмотреть файл

@ -11,8 +11,13 @@
// Both of these should be available from the Azure portal.
//
#include <azure/core/diagnostics/logger.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
#include <azure/messaging/eventhubs.hpp>
using namespace Azure::Core::Diagnostics;
using namespace Azure::Core::Diagnostics::_internal;
#include <iostream>
int main()
@ -38,6 +43,7 @@ int main()
// Retrieve properties about the EventHubs instance just created.
auto eventhubProperties{consumerClient.GetEventHubProperties()};
std::cout << "Created event hub, properties: " << eventhubProperties << std::endl;
Log::Stream(Logger::Level::Verbose) << "Created event hub, properties: " << eventhubProperties;
// Retrieve properties about the EventHubs instance just created.
auto partitionProperties{
@ -50,6 +56,20 @@ int main()
Azure::Messaging::EventHubs::PartitionClientOptions partitionClientOptions;
partitionClientOptions.StartPosition.Earliest = true;
partitionClientOptions.StartPosition.Inclusive = true;
Log::Stream(Logger::Level::Verbose)
<< "Creating partition client. Start position: " << partitionClientOptions.StartPosition;
Log::Stream(Logger::Level::Verbose)
<< "Creating partition client. Start position: " << partitionClientOptions.StartPosition;
Log::Stream(Logger::Level::Verbose) << "earliest: HasValue: " << std::boolalpha
<< partitionClientOptions.StartPosition.Earliest.HasValue();
if (partitionClientOptions.StartPosition.Earliest.HasValue())
{
std::cerr << "earliest: Value: " << std::boolalpha
<< partitionClientOptions.StartPosition.Earliest.Value() << std::endl;
}
Azure::Messaging::EventHubs::PartitionClient partitionClient{consumerClient.CreatePartitionClient(
eventhubProperties.PartitionIds[0], partitionClientOptions)};

Просмотреть файл

@ -44,42 +44,35 @@ int main()
// configure this batch processor to send to that partition.
Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions;
batchOptions.PartitionId = eventhubProperties.PartitionIds[0];
Azure::Messaging::EventHubs::EventDataBatch batch(batchOptions);
Azure::Messaging::EventHubs::EventDataBatch batch(producerClient.CreateBatch(batchOptions));
// Send an event with a simple binary body.
{
Azure::Messaging::EventHubs::Models::EventData event;
event.Body = {1, 3, 5, 7};
event.MessageId = "test-message-id";
batch.AddMessage(event);
batch.TryAddMessage(event);
}
{
Azure::Messaging::EventHubs::Models::EventData event;
event.Body = {2, 4, 6, 8, 10};
batch.AddMessage(event);
batch.TryAddMessage(event);
}
// Send an event with a body initialized at EventData constructor time.
{
Azure::Messaging::EventHubs::Models::EventData event{1, 1, 2, 3, 5, 8};
event.MessageId = "test-message-id-fibonacci";
batch.AddMessage(event);
batch.TryAddMessage(event);
}
// Send an event with a UTF-8 encoded string body.
{
Azure::Messaging::EventHubs::Models::EventData event{"Hello Eventhubs!"};
event.MessageId = "test-message-id-hellowworld";
batch.AddMessage(event);
batch.TryAddMessage(event);
}
if (!producerClient.SendEventDataBatch(batch))
{
std::cerr << "Failed to send message to the Event Hub instance." << std::endl;
}
else
{
std::cout << "Sent message to the Event Hub instance." << std::endl;
}
producerClient.Send(batch);
}

Просмотреть файл

@ -54,38 +54,31 @@ int main()
// configure this batch processor to send to that partition.
Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions;
batchOptions.PartitionId = eventhubProperties.PartitionIds[0];
Azure::Messaging::EventHubs::EventDataBatch batch(batchOptions);
Azure::Messaging::EventHubs::EventDataBatch batch{producerClient.CreateBatch(batchOptions)};
// Send an event with a simple binary body.
{
Azure::Messaging::EventHubs::Models::EventData event;
event.Body = {1, 3, 5, 7};
event.MessageId = "test-message-id";
batch.AddMessage(event);
batch.TryAddMessage(event);
}
{
Azure::Messaging::EventHubs::Models::EventData event;
event.Body = {2, 4, 6, 8, 10};
event.MessageId = "test-message-id-2";
batch.AddMessage(event);
batch.TryAddMessage(event);
}
{
Azure::Messaging::EventHubs::Models::EventData event{1, 1, 2, 3, 5, 8};
event.MessageId = "test-message-id5";
batch.AddMessage(event);
batch.TryAddMessage(event);
}
{
Azure::Messaging::EventHubs::Models::EventData event{"Hello Eventhubs via AAD!"};
event.MessageId = "test-message-id4";
batch.AddMessage(event);
batch.TryAddMessage(event);
}
if (!producerClient.SendEventDataBatch(batch))
{
std::cerr << "Failed to send message to the Event Hub instance." << std::endl;
}
else
{
std::cout << "Sent message to the Event Hub instance." << std::endl;
}
producerClient.Send(batch);
}

Просмотреть файл

@ -26,7 +26,9 @@ std::string Azure::Messaging::EventHubs::Models::Ownership::GetOwnershipPrefixNa
throw std::runtime_error("missing ownership fields");
}
std::stringstream strstr;
strstr << FullyQualifiedNamespace << "/" << EventHubName << "/" << ConsumerGroup << "/ownership/";
strstr << Azure::Core::_internal::StringExtensions::ToLower(FullyQualifiedNamespace) << "/"
<< Azure::Core::_internal::StringExtensions::ToLower(EventHubName) << "/"
<< Azure::Core::_internal::StringExtensions::ToLower(ConsumerGroup) << "/ownership/";
return strstr.str();
}
@ -38,8 +40,9 @@ std::string Azure::Messaging::EventHubs::Models::Checkpoint::GetCheckpointBlobPr
throw std::runtime_error("missing checkpoint fields");
}
std::stringstream strstr;
strstr << FullyQualifiedNamespaceName << "/" << EventHubName << "/" << ConsumerGroup
<< "/checkpoint/";
strstr << Azure::Core::_internal::StringExtensions::ToLower(FullyQualifiedNamespaceName) << "/"
<< Azure::Core::_internal::StringExtensions::ToLower(EventHubName) << "/"
<< Azure::Core::_internal::StringExtensions::ToLower(ConsumerGroup) << "/checkpoint/";
return strstr.str();
}

Просмотреть файл

@ -48,179 +48,69 @@ namespace Azure { namespace Messaging { namespace EventHubs {
+ m_consumerGroup;
}
namespace {
struct FilterDescription
void ConsumerClient::EnsureSession(std::string const& partitionId)
{
if (m_sessions.find(partitionId) == m_sessions.end())
{
std::string Name;
std::uint64_t Code;
};
void AddFilterElementToSourceOptions(
Azure::Core::Amqp::Models::_internal::MessageSourceOptions& sourceOptions,
FilterDescription description,
Azure::Core::Amqp::Models::AmqpValue const& filterValue)
{
Azure::Core::Amqp::Models::AmqpDescribed value{description.Code, filterValue};
sourceOptions.Filter.emplace(description.Name, value);
}
ConnectionOptions connectOptions;
connectOptions.ContainerId = m_consumerClientOptions.ApplicationID;
connectOptions.EnableTrace = true;
connectOptions.AuthenticationScopes = {"https://eventhubs.azure.net/.default"};
FilterDescription SelectorFilter{"apache.org:selector-filter:string", 0x0000468c00000004};
} // namespace
// Set the user agent related properties in the connectOptions based on the package
// information and application ID.
_detail::EventHubsUtilities::SetUserAgent(
connectOptions, m_consumerClientOptions.ApplicationID);
Connection connection(m_fullyQualifiedNamespace, m_credential, connectOptions);
SessionOptions sessionOptions;
sessionOptions.InitialIncomingWindowSize
= static_cast<uint32_t>(std::numeric_limits<int32_t>::max());
Session session{connection.CreateSession(sessionOptions)};
m_sessions.emplace(partitionId, session);
}
}
Azure::Core::Amqp::_internal::Session ConsumerClient::GetSession(std::string const& partitionId)
{
return m_sessions.at(partitionId);
}
PartitionClient ConsumerClient::CreatePartitionClient(
std::string partitionId,
PartitionClientOptions const& options)
std::string const& partitionId,
PartitionClientOptions const& options,
Azure::Core::Context const& context)
{
PartitionClient partitionClient(options, m_consumerClientOptions.RetryOptions);
std::string suffix = !partitionId.empty() ? "/Partitions/" + partitionId : "";
std::string hostUrl = m_hostUrl + suffix;
ConnectionOptions connectOptions;
connectOptions.ContainerId = m_consumerClientOptions.ApplicationID;
connectOptions.EnableTrace = true;
connectOptions.AuthenticationScopes = {"https://eventhubs.azure.net/.default"};
EnsureSession(partitionId);
// Set the user agent related properties in the connectOptions based on the package information
// and application ID.
_detail::EventHubsUtilities::SetUserAgent(
connectOptions, m_consumerClientOptions.ApplicationID);
Connection connection(m_fullyQualifiedNamespace, m_credential, connectOptions);
SessionOptions sessionOptions;
sessionOptions.InitialIncomingWindowSize = static_cast<uint32_t>(
m_consumerClientOptions.MaxMessageSize.ValueOr(std::numeric_limits<int32_t>::max()));
Session session{connection.CreateSession(sessionOptions)};
Azure::Core::Amqp::Models::_internal::MessageSourceOptions sourceOptions;
sourceOptions.Address = static_cast<Azure::Core::Amqp::Models::AmqpValue>(hostUrl);
AddFilterElementToSourceOptions(
sourceOptions,
SelectorFilter,
static_cast<Azure::Core::Amqp::Models::AmqpValue>(
GetStartExpression(options.StartPosition)));
Azure::Core::Amqp::Models::_internal::MessageSource messageSource(sourceOptions);
Azure::Core::Amqp::_internal::MessageReceiverOptions receiverOptions;
if (m_consumerClientOptions.MaxMessageSize)
{
receiverOptions.MaxMessageSize = m_consumerClientOptions.MaxMessageSize.Value();
}
receiverOptions.EnableTrace = true;
// receiverOptions.MessageTarget = m_consumerClientOptions.MessageTarget;
receiverOptions.Name = m_consumerClientOptions.Name;
receiverOptions.Properties.emplace("com.microsoft:receiver-name", m_consumerClientOptions.Name);
if (options.OwnerLevel.HasValue())
{
receiverOptions.Properties.emplace("com.microsoft:epoch", options.OwnerLevel.Value());
}
MessageReceiver receiver = session.CreateMessageReceiver(messageSource, receiverOptions);
// Open the connection to the remote.
receiver.Open();
m_sessions.emplace(partitionId, session);
partitionClient.PushBackReceiver(receiver);
return partitionClient;
}
std::string ConsumerClient::GetStartExpression(Models::StartPosition const& startPosition)
{
std::string greaterThan = ">";
if (startPosition.Inclusive)
{
greaterThan = ">=";
}
constexpr const char* expressionErrorText
= "Only a single start point can be set: Earliest, EnqueuedTime, "
"Latest, Offset, or SequenceNumber";
std::string returnValue;
if (startPosition.EnqueuedTime.HasValue())
{
returnValue = "amqp.annotation.x--opt-enqueued-time " + greaterThan + "'"
+ std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
startPosition.EnqueuedTime.Value().time_since_epoch())
.count())
+ "'";
}
if (startPosition.Offset.HasValue())
{
if (!returnValue.empty())
{
throw std::runtime_error(expressionErrorText);
}
returnValue = "amqp.annotation.x-opt-offset " + greaterThan + "'"
+ std::to_string(startPosition.Offset.Value()) + "'";
}
if (startPosition.SequenceNumber.HasValue())
{
if (!returnValue.empty())
{
throw std::runtime_error(expressionErrorText);
}
returnValue = "amqp.annotation.x-opt-sequence-number " + greaterThan + "'"
+ std::to_string(startPosition.SequenceNumber.Value()) + "'";
}
if (startPosition.Latest.HasValue())
{
if (!returnValue.empty())
{
throw std::runtime_error(expressionErrorText);
}
returnValue = "amqp.annotation.x-opt-offset > '@latest'";
}
if (startPosition.Earliest.HasValue())
{
if (!returnValue.empty())
{
throw std::runtime_error(expressionErrorText);
}
returnValue = "amqp.annotation.x-opt-offset > '-1'";
}
// If we don't have a filter value, then default to the start.
if (returnValue.empty())
{
return "amqp.annotation.x-opt-offset > '@latest'";
}
else
{
return returnValue;
}
return _detail::PartitionClientFactory::CreatePartitionClient(
GetSession(partitionId),
hostUrl,
m_consumerClientOptions.Name,
options,
m_consumerClientOptions.RetryOptions,
context);
}
Models::EventHubProperties ConsumerClient::GetEventHubProperties(Core::Context const& context)
{
// We need to capture the partition client here, because we need to keep it alive across the
// call to GetEventHubsProperties.
//
// If we don't keep the PartitionClient alive, the message receiver inside the partition client
// will be disconnected AFTER the outgoing ATTACH frame is sent. When the response for the
// ATTACH frame is received, it creates a new link_endpoint which is in the half attached state.
// This runs into a uAMQP bug where an incoming link detach frame will cause a crash if the
// corresponding link_endpoint is in the half attached state.
std::shared_ptr<PartitionClient> client;
if (m_sessions.find("0") == m_sessions.end())
{
client = std::make_shared<PartitionClient>(CreatePartitionClient("0"));
}
// Since EventHub properties are not tied to a partition, we don't specify a partition ID.
EnsureSession();
return _detail::EventHubsUtilities::GetEventHubsProperties(
m_sessions.at("0"), m_eventHub, context);
return _detail::EventHubsUtilities::GetEventHubsProperties(GetSession(), m_eventHub, context);
}
Models::EventHubPartitionProperties ConsumerClient::GetPartitionProperties(
std::string const& partitionId,
Core::Context const& context)
{
if (m_sessions.find(partitionId) == m_sessions.end())
{
CreatePartitionClient(partitionId);
}
EnsureSession(partitionId);
return _detail::EventHubsUtilities::GetEventHubsPartitionProperties(
m_sessions.at(partitionId), m_eventHub, partitionId, context);
GetSession(partitionId), m_eventHub, partitionId, context);
}
}}} // namespace Azure::Messaging::EventHubs

Просмотреть файл

@ -14,9 +14,9 @@ using namespace Azure::Core::Diagnostics;
namespace Azure { namespace Messaging { namespace EventHubs {
void EventDataBatch::AddMessage(Azure::Messaging::EventHubs::Models::EventData& message)
bool EventDataBatch::TryAddMessage(Azure::Messaging::EventHubs::Models::EventData const& message)
{
AddAmqpMessage(message.GetRawAmqpMessage());
return TryAddAmqpMessage(message.GetRawAmqpMessage());
}
Azure::Core::Amqp::Models::AmqpMessage EventDataBatch::ToAmqpMessage() const
@ -48,7 +48,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
return returnValue;
}
void EventDataBatch::AddAmqpMessage(Azure::Core::Amqp::Models::AmqpMessage message)
bool EventDataBatch::TryAddAmqpMessage(Azure::Core::Amqp::Models::AmqpMessage message)
{
std::lock_guard<std::mutex> lock(m_rwMutex);
@ -75,16 +75,23 @@ namespace Azure { namespace Messaging { namespace EventHubs {
m_currentSize = serializedMessage.size();
}
auto actualPayloadSize = CalculateActualSizeForPayload(serializedMessage);
if (m_currentSize + actualPayloadSize > m_maxBytes)
if (m_currentSize + actualPayloadSize > m_maxBytes.Value())
{
m_currentSize = 0;
m_batchEnvelope = nullptr;
throw std::runtime_error("EventDataBatch size is too large.");
return false;
}
m_currentSize += actualPayloadSize;
m_marshalledMessages.push_back(serializedMessage);
return true;
}
}}} // namespace Azure::Messaging::EventHubs
namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail {
EventDataBatch EventDataBatchFactory::CreateEventDataBatch(EventDataBatchOptions const& options)
{
return EventDataBatch{options};
}
}}}} // namespace Azure::Messaging::EventHubs::_detail

Просмотреть файл

@ -4,12 +4,167 @@
#include "azure/messaging/eventhubs/partition_client.hpp"
#include "azure/messaging/eventhubs/eventhubs_exception.hpp"
#include "private/eventhubs_constants.hpp"
#include "private/eventhubs_utilities.hpp"
#include "private/retry_operation.hpp"
#include <azure/core/amqp.hpp>
using namespace Azure::Core::Diagnostics::_internal;
using namespace Azure::Core::Diagnostics;
namespace Azure { namespace Messaging { namespace EventHubs {
namespace {
struct FilterDescription
{
std::string Name;
std::uint64_t Code;
};
void AddFilterElementToSourceOptions(
Azure::Core::Amqp::Models::_internal::MessageSourceOptions& sourceOptions,
FilterDescription description,
Azure::Core::Amqp::Models::AmqpValue const& filterValue)
{
Azure::Core::Amqp::Models::AmqpDescribed value{description.Code, filterValue};
sourceOptions.Filter.emplace(description.Name, value);
}
FilterDescription SelectorFilter{"apache.org:selector-filter:string", 0x0000468c00000004};
std::string GetStartExpression(Models::StartPosition const& startPosition)
{
Log::Stream(Logger::Level::Verbose)
<< "Get Start Expression, startPosition: " << startPosition;
std::string greaterThan = ">";
if (startPosition.Inclusive)
{
greaterThan = ">=";
}
constexpr const char* expressionErrorText
= "Only a single start point can be set: Earliest, EnqueuedTime, "
"Latest, Offset, or SequenceNumber";
std::string returnValue;
if (startPosition.EnqueuedTime.HasValue())
{
returnValue = "amqp.annotation.x-opt-enqueued-time " + greaterThan + "'"
+ std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
static_cast<std::chrono::system_clock::time_point>(
startPosition.EnqueuedTime.Value())
.time_since_epoch())
.count())
+ "'";
}
if (startPosition.Offset.HasValue())
{
if (!returnValue.empty())
{
throw std::runtime_error(expressionErrorText);
}
returnValue = "amqp.annotation.x-opt-offset " + greaterThan + "'"
+ std::to_string(startPosition.Offset.Value()) + "'";
}
if (startPosition.SequenceNumber.HasValue())
{
if (!returnValue.empty())
{
throw std::runtime_error(expressionErrorText);
}
returnValue = "amqp.annotation.x-opt-sequence-number " + greaterThan + "'"
+ std::to_string(startPosition.SequenceNumber.Value()) + "'";
}
if (startPosition.Latest.HasValue())
{
if (!returnValue.empty())
{
throw std::runtime_error(expressionErrorText);
}
returnValue = "amqp.annotation.x-opt-offset > '@latest'";
}
if (startPosition.Earliest.HasValue())
{
if (!returnValue.empty())
{
throw std::runtime_error(expressionErrorText);
}
returnValue = "amqp.annotation.x-opt-offset > '-1'";
}
// If we don't have a filter value, then default to the start.
if (returnValue.empty())
{
Log::Stream(Logger::Level::Verbose) << "No return value, use default.";
return "amqp.annotation.x-opt-offset > '@latest'";
}
else
{
Log::Stream(Logger::Level::Verbose) << "Get Start Expression, returnValue: " << returnValue;
return returnValue;
}
}
// Helper function to create a message receiver.
Azure::Core::Amqp::_internal::MessageReceiver CreateMessageReceiver(
Azure::Core::Amqp::_internal::Session const& session,
std::string const& partitionUrl,
std::string const& receiverName,
PartitionClientOptions const& options,
Azure::Core::Amqp::_internal::MessageReceiverEvents* events = nullptr)
{
Azure::Core::Amqp::Models::_internal::MessageSourceOptions sourceOptions;
sourceOptions.Address = static_cast<Azure::Core::Amqp::Models::AmqpValue>(partitionUrl);
AddFilterElementToSourceOptions(
sourceOptions,
SelectorFilter,
static_cast<Azure::Core::Amqp::Models::AmqpValue>(
GetStartExpression(options.StartPosition)));
Azure::Core::Amqp::Models::_internal::MessageSource messageSource(sourceOptions);
Azure::Core::Amqp::_internal::MessageReceiverOptions receiverOptions;
receiverOptions.EnableTrace = true;
// Set the link credit to the prefetch count. If the user has not set a prefetch count, then
// we will use the default value.
if (options.Prefetch >= 0)
{
receiverOptions.MaxLinkCredit = options.Prefetch;
}
receiverOptions.Name = receiverName;
receiverOptions.Properties.emplace("com.microsoft:receiver-name", receiverName);
if (options.OwnerLevel.HasValue())
{
receiverOptions.Properties.emplace("com.microsoft:epoch", options.OwnerLevel.Value());
}
return session.CreateMessageReceiver(messageSource, receiverOptions, events);
}
} // namespace
PartitionClient _detail::PartitionClientFactory::CreatePartitionClient(
Azure::Core::Amqp::_internal::Session const& session,
std::string const& partitionUrl,
std::string const& receiverName,
PartitionClientOptions options,
Azure::Core::Http::Policies::RetryOptions retryOptions,
Azure::Core::Context const& context)
{
Azure::Core::Amqp::_internal::MessageReceiver messageReceiver{
CreateMessageReceiver(session, partitionUrl, receiverName, options)};
messageReceiver.Open(context);
return PartitionClient(std::move(messageReceiver), std::move(options), std::move(retryOptions));
}
PartitionClient::PartitionClient(
Azure::Core::Amqp::_internal::MessageReceiver const& messageReceiver,
PartitionClientOptions options,
Azure::Core::Http::Policies::RetryOptions retryOptions)
: m_receiver{messageReceiver}, m_partitionOptions{options}, m_retryOptions{retryOptions}
{
}
PartitionClient::~PartitionClient() {}
/** Receive events from the partition.
*
* @param maxMessages The maximum number of messages to receive.
@ -22,11 +177,10 @@ namespace Azure { namespace Messaging { namespace EventHubs {
Core::Context const& context)
{
std::vector<Models::ReceivedEventData> messages;
// bool prefetchDisabled = m_prefetchCount < 0;
while (messages.size() < maxMessages && !context.IsCancelled())
{
auto message = m_receivers[0].WaitForIncomingMessage(context);
auto message = m_receiver.WaitForIncomingMessage(context);
if (message.first.HasValue())
{
messages.push_back(Models::ReceivedEventData{message.first.Value()});
@ -38,4 +192,26 @@ namespace Azure { namespace Messaging { namespace EventHubs {
}
return messages;
}
void PartitionClient::OnMessageReceiverStateChanged(
Azure::Core::Amqp::_internal::MessageReceiver const& receiver,
Azure::Core::Amqp::_internal::MessageReceiverState newState,
Azure::Core::Amqp::_internal::MessageReceiverState oldState)
{
(void)receiver;
(void)newState;
(void)oldState;
}
Azure::Core::Amqp::Models::AmqpValue PartitionClient::OnMessageReceived(
Azure::Core::Amqp::_internal::MessageReceiver const& receiver,
Azure::Core::Amqp::Models::AmqpMessage const& message)
{
(void)receiver;
(void)message;
return Azure::Core::Amqp::Models::_internal::Messaging::DeliveryAccepted();
}
void PartitionClient::OnMessageReceiverDisconnected(
Azure::Core::Amqp::Models::_internal::AmqpError const& error)
{
(void)error;
}
}}} // namespace Azure::Messaging::EventHubs

Просмотреть файл

@ -4,13 +4,16 @@
// Useful utilities for the Event Hubs Clients.
#pragma once
#include "azure/messaging/eventhubs/event_data_batch.hpp"
#include "azure/messaging/eventhubs/eventhubs_exception.hpp"
#include "azure/messaging/eventhubs/models/management_models.hpp"
#include "azure/messaging/eventhubs/partition_client.hpp"
#include "package_version.hpp"
#include <azure/core/amqp/management.hpp>
#include <azure/core/amqp/session.hpp>
#include <azure/core/context.hpp>
#include <azure/core/internal/diagnostics/log.hpp>
#include <azure/core/internal/http/user_agent.hpp>
#include <chrono>
@ -60,6 +63,24 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail
Azure::Core::Amqp::Models::_internal::AmqpErrorCondition const& condition);
};
class EventDataBatchFactory final {
public:
static EventDataBatch CreateEventDataBatch(EventDataBatchOptions const& options);
EventDataBatchFactory() = delete;
};
class PartitionClientFactory final {
public:
static PartitionClient CreatePartitionClient(
Azure::Core::Amqp::_internal::Session const& session,
std::string const& partitionUrl,
std::string const& receiverName,
PartitionClientOptions options,
Azure::Core::Http::Policies::RetryOptions retryOptions,
Azure::Core::Context const& context);
PartitionClientFactory() = delete;
};
class EventHubsUtilities {
public:
@ -91,12 +112,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail
// Create a management client off the session.
// Eventhubs management APIs return a status code in the "status-code" application properties.
Azure::Core::Amqp::_internal::ManagementClientOptions managementClientOptions;
managementClientOptions.EnableTrace = false;
managementClientOptions.EnableTrace = true;
managementClientOptions.ExpectedStatusCodeKeyName = "status-code";
Azure::Core::Amqp::_internal::ManagementClient managementClient{
session.CreateManagementClient(eventHubName, managementClientOptions)};
managementClient.Open();
managementClient.Open(context);
// Send a message to the management endpoint to retrieve the properties of the eventhub.
Azure::Core::Amqp::Models::AmqpMessage message;
@ -113,6 +134,10 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail
Models::EventHubProperties properties;
if (result.Status != Azure::Core::Amqp::_internal::ManagementOperationStatus::Ok)
{
Azure::Core::Diagnostics::_internal::Log::Stream(
Azure::Core::Diagnostics::Logger::Level::Error)
<< "Management operation failed. StatusCode: " << result.StatusCode
<< " Error: " << result.Error;
throw _detail::EventHubsExceptionFactory::CreateEventHubsException(
result.Error, result.StatusCode);
}
@ -154,12 +179,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail
// Create a management client off the session.
// Eventhubs management APIs return a status code in the "status-code" application properties.
Azure::Core::Amqp::_internal::ManagementClientOptions managementClientOptions;
managementClientOptions.EnableTrace = false;
managementClientOptions.EnableTrace = true;
managementClientOptions.ExpectedStatusCodeKeyName = "status-code";
Azure::Core::Amqp::_internal::ManagementClient managementClient{
session.CreateManagementClient(eventHubName, managementClientOptions)};
managementClient.Open();
managementClient.Open(context);
// Send a message to the management endpoint to retrieve the properties of the eventhub.
Azure::Core::Amqp::Models::AmqpMessage message;
@ -175,6 +200,10 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail
message,
context);
Azure::Core::Diagnostics::_internal::Log::Stream(
Azure::Core::Diagnostics::Logger::Level::Informational)
<< "Received partition properties: " << result.Message;
Models::EventHubPartitionProperties properties;
if (result.Status != Azure::Core::Amqp::_internal::ManagementOperationStatus::Ok)
{
@ -198,7 +227,29 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _detail
properties.PartitionId = static_cast<std::string>(bodyMap["partition"]);
properties.BeginningSequenceNumber = bodyMap["begin_sequence_number"];
properties.LastEnqueuedSequenceNumber = bodyMap["last_enqueued_sequence_number"];
properties.LastEnqueuedOffset = static_cast<std::string>(bodyMap["last_enqueued_offset"]);
// For <reasons> the last enqueued offset is returned as a string. Convert to an int64.
properties.LastEnqueuedOffset = std::strtoull(
static_cast<std::string>(bodyMap["last_enqueued_offset"]).c_str(), nullptr, 10);
Azure::Core::Diagnostics::_internal::Log::Stream(
Azure::Core::Diagnostics::Logger::Level::Informational)
<< "last enqueued time utc: " << bodyMap["last_enqueued_time_utc"];
Azure::Core::Diagnostics::_internal::Log::Stream(
Azure::Core::Diagnostics::Logger::Level::Informational)
<< "last enqueued time utc: "
<< static_cast<std::chrono::milliseconds>(
bodyMap["last_enqueued_time_utc"].AsTimestamp())
.count()
<< " ms";
Azure::Core::Diagnostics::_internal::Log::Stream(
Azure::Core::Diagnostics::Logger::Level::Informational)
<< "last enqueued time utc: "
<< std::chrono::duration_cast<std::chrono::seconds>(
static_cast<std::chrono::milliseconds>(
bodyMap["last_enqueued_time_utc"].AsTimestamp()))
.count()
<< " s";
properties.LastEnqueuedTimeUtc = Azure::DateTime(std::chrono::system_clock::from_time_t(
std::chrono::duration_cast<std::chrono::seconds>(
static_cast<std::chrono::milliseconds>(

Просмотреть файл

@ -3,6 +3,7 @@
#include "azure/messaging/eventhubs/producer_client.hpp"
#include "azure/messaging/eventhubs/event_data_batch.hpp"
#include "azure/messaging/eventhubs/eventhubs_exception.hpp"
#include "private/eventhubs_utilities.hpp"
#include "private/retry_operation.hpp"
@ -13,127 +14,177 @@ namespace {
const std::string DefaultAuthScope = "https://eventhubs.azure.net/.default";
}
Azure::Messaging::EventHubs::ProducerClient::ProducerClient(
std::string const& connectionString,
std::string const& eventHub,
Azure::Messaging::EventHubs::ProducerClientOptions options)
: m_connectionString{connectionString}, m_eventHub{eventHub}, m_producerClientOptions(options)
{
auto sasCredential
= std::make_shared<Azure::Core::Amqp::_internal::ServiceBusSasConnectionStringCredential>(
connectionString);
namespace Azure { namespace Messaging { namespace EventHubs {
m_credential = sasCredential;
m_eventHub = (sasCredential->GetEntityPath().empty() ? eventHub : sasCredential->GetEntityPath());
m_fullyQualifiedNamespace = sasCredential->GetHostName();
}
Azure::Messaging::EventHubs::ProducerClient::ProducerClient(
std::string const& fullyQualifiedNamespace,
std::string const& eventHub,
std::shared_ptr<Azure::Core::Credentials::TokenCredential> credential,
Azure::Messaging::EventHubs::ProducerClientOptions options)
: m_fullyQualifiedNamespace{fullyQualifiedNamespace}, m_eventHub{eventHub},
m_credential{credential}, m_producerClientOptions(options)
{
}
Azure::Core::Amqp::_internal::MessageSender Azure::Messaging::EventHubs::ProducerClient::GetSender(
std::string const& partitionId)
{
if (m_senders.find(partitionId) == m_senders.end())
ProducerClient::ProducerClient(
std::string const& connectionString,
std::string const& eventHub,
Azure::Messaging::EventHubs::ProducerClientOptions options)
: m_connectionString{connectionString}, m_eventHub{eventHub}, m_producerClientOptions(options)
{
CreateSender(partitionId);
auto sasCredential
= std::make_shared<Azure::Core::Amqp::_internal::ServiceBusSasConnectionStringCredential>(
connectionString);
m_credential = sasCredential;
m_eventHub
= (sasCredential->GetEntityPath().empty() ? eventHub : sasCredential->GetEntityPath());
m_fullyQualifiedNamespace = sasCredential->GetHostName();
}
auto& sender = m_senders.at(partitionId);
return sender;
}
void Azure::Messaging::EventHubs::ProducerClient::CreateSender(std::string const& partitionId)
{
m_targetUrl = "amqps://" + m_fullyQualifiedNamespace + "/" + m_eventHub;
Azure::Core::Amqp::_internal::ConnectionOptions connectOptions;
connectOptions.ContainerId = m_producerClientOptions.ApplicationID;
connectOptions.EnableTrace = true;
connectOptions.AuthenticationScopes = {"https://eventhubs.azure.net/.default"};
// Set the UserAgent related properties on this message sender.
_detail::EventHubsUtilities::SetUserAgent(connectOptions, m_producerClientOptions.ApplicationID);
std::string fullyQualifiedNamespace{m_fullyQualifiedNamespace};
std::string targetUrl = m_targetUrl;
if (!partitionId.empty())
ProducerClient::ProducerClient(
std::string const& fullyQualifiedNamespace,
std::string const& eventHub,
std::shared_ptr<Azure::Core::Credentials::TokenCredential> credential,
Azure::Messaging::EventHubs::ProducerClientOptions options)
: m_fullyQualifiedNamespace{fullyQualifiedNamespace}, m_eventHub{eventHub},
m_credential{credential}, m_producerClientOptions(options)
{
targetUrl += "/Partitions/" + partitionId;
}
Azure::Core::Amqp::_internal::Connection connection(
fullyQualifiedNamespace, m_credential, connectOptions);
Azure::Core::Amqp::_internal::SessionOptions sessionOptions;
sessionOptions.InitialIncomingWindowSize = std::numeric_limits<int32_t>::max();
sessionOptions.InitialOutgoingWindowSize = std::numeric_limits<uint16_t>::max();
Azure::Core::Amqp::_internal::Session session{connection.CreateSession(sessionOptions)};
m_sessions.emplace(partitionId, session);
Azure::Core::Amqp::_internal::MessageSenderOptions senderOptions;
senderOptions.Name = m_producerClientOptions.Name;
senderOptions.EnableTrace = true;
senderOptions.MaxMessageSize = m_producerClientOptions.MaxMessageSize;
Azure::Core::Amqp::_internal::MessageSender sender
= session.CreateMessageSender(targetUrl, senderOptions, nullptr);
sender.Open();
m_senders.emplace(partitionId, sender);
}
bool Azure::Messaging::EventHubs::ProducerClient::SendEventDataBatch(
EventDataBatch const& eventDataBatch,
Core::Context const& context)
{
auto message = eventDataBatch.ToAmqpMessage();
Azure::Messaging::EventHubs::_detail::RetryOperation retryOp(
m_producerClientOptions.RetryOptions);
return retryOp.Execute([&]() -> bool {
auto result = GetSender(eventDataBatch.GetPartitionId()).Send(message, context);
auto sendStatus = std::get<0>(result);
if (sendStatus == Azure::Core::Amqp::_internal::MessageSendStatus::Ok)
void ProducerClient::EnsureSession(std::string const& partitionId = {})
{
if (m_sessions.find(partitionId) == m_sessions.end())
{
return true;
Azure::Core::Amqp::_internal::ConnectionOptions connectOptions;
connectOptions.ContainerId = m_producerClientOptions.ApplicationID;
connectOptions.EnableTrace = true;
connectOptions.AuthenticationScopes = {"https://eventhubs.azure.net/.default"};
// Set the UserAgent related properties on this message sender.
_detail::EventHubsUtilities::SetUserAgent(
connectOptions, m_producerClientOptions.ApplicationID);
std::string fullyQualifiedNamespace{m_fullyQualifiedNamespace};
Azure::Core::Amqp::_internal::Connection connection(
fullyQualifiedNamespace, m_credential, connectOptions);
Azure::Core::Amqp::_internal::SessionOptions sessionOptions;
sessionOptions.InitialIncomingWindowSize = std::numeric_limits<int32_t>::max();
sessionOptions.InitialOutgoingWindowSize = std::numeric_limits<uint16_t>::max();
Azure::Core::Amqp::_internal::Session session{connection.CreateSession(sessionOptions)};
m_sessions.emplace(partitionId, session);
}
// Throw an exception about the error we just received.
throw Azure::Messaging::EventHubs::_detail::EventHubsExceptionFactory::CreateEventHubsException(
std::get<1>(result));
});
}
Azure::Messaging::EventHubs::Models::EventHubProperties
Azure::Messaging::EventHubs::ProducerClient::GetEventHubProperties(Core::Context const& context)
{
if (m_senders.find("") == m_senders.end())
{
CreateSender("");
}
return _detail::EventHubsUtilities::GetEventHubsProperties(
m_sessions.at(""), m_eventHub, context);
}
Azure::Messaging::EventHubs::Models::EventHubPartitionProperties
Azure::Messaging::EventHubs::ProducerClient::GetPartitionProperties(
std::string const& partitionId,
Core::Context const& context)
{
if (m_senders.find(partitionId) == m_senders.end())
Azure::Core::Amqp::_internal::Session ProducerClient::GetSession(std::string const& partitionId)
{
CreateSender(partitionId);
return m_sessions.at(partitionId);
}
return _detail::EventHubsUtilities::GetEventHubsPartitionProperties(
m_sessions.at(partitionId), m_eventHub, partitionId, context);
}
void ProducerClient::EnsureSender(
std::string const& partitionId,
Azure::Core::Context const& context)
{
if (m_senders.find(partitionId) == m_senders.end())
{
m_targetUrl = "amqps://" + m_fullyQualifiedNamespace + "/" + m_eventHub;
EnsureSession(partitionId);
std::string targetUrl = m_targetUrl;
if (!partitionId.empty())
{
targetUrl += "/Partitions/" + partitionId;
}
Azure::Core::Amqp::_internal::MessageSenderOptions senderOptions;
senderOptions.Name = m_producerClientOptions.Name;
senderOptions.EnableTrace = true;
senderOptions.MaxMessageSize = m_producerClientOptions.MaxMessageSize;
Azure::Core::Amqp::_internal::MessageSender sender
= GetSession(partitionId).CreateMessageSender(targetUrl, senderOptions, nullptr);
sender.Open(context);
m_senders.emplace(partitionId, sender);
}
}
Azure::Core::Amqp::_internal::MessageSender ProducerClient::GetSender(
std::string const& partitionId)
{
return m_senders.at(partitionId);
}
EventDataBatch ProducerClient::CreateBatch(
EventDataBatchOptions const& options,
Core::Context const& context)
{
EnsureSender(options.PartitionId, context);
auto messageSender = GetSender(options.PartitionId);
EventDataBatchOptions optionsToUse{options};
if (!options.MaxBytes.HasValue())
{
optionsToUse.MaxBytes = messageSender.GetMaxMessageSize();
}
return _detail::EventDataBatchFactory::CreateEventDataBatch(optionsToUse);
}
void ProducerClient::Send(EventDataBatch const& eventDataBatch, Core::Context const& context)
{
auto message = eventDataBatch.ToAmqpMessage();
Azure::Messaging::EventHubs::_detail::RetryOperation retryOp(
m_producerClientOptions.RetryOptions);
retryOp.Execute([&]() -> bool {
auto result = GetSender(eventDataBatch.GetPartitionId()).Send(message, context);
auto sendStatus = std::get<0>(result);
if (sendStatus == Azure::Core::Amqp::_internal::MessageSendStatus::Ok)
{
return true;
}
// Throw an exception about the error we just received.
throw Azure::Messaging::EventHubs::_detail::EventHubsExceptionFactory::
CreateEventHubsException(std::get<1>(result));
});
}
void ProducerClient::Send(Models::EventData const& eventData, Core::Context const& context)
{
auto batch = CreateBatch(EventDataBatchOptions{}, context);
if (!batch.TryAddMessage(eventData))
{
throw std::runtime_error("Could not add message to batch.");
}
Send(batch, context);
}
void ProducerClient::Send(
std::vector<Models::EventData> const& eventData,
Core::Context const& context)
{
auto batch = CreateBatch(EventDataBatchOptions{}, context);
for (const auto& data : eventData)
{
if (!batch.TryAddMessage(data))
{
throw std::runtime_error("Could not add message to batch.");
}
}
Send(batch, context);
}
Models::EventHubProperties ProducerClient::GetEventHubProperties(Core::Context const& context)
{
// EventHub properties are not associated with a particular partition, so create a message
// sender on the empty partition.
EnsureSession();
return _detail::EventHubsUtilities::GetEventHubsProperties(GetSession(), m_eventHub, context);
}
Models::EventHubPartitionProperties ProducerClient::GetPartitionProperties(
std::string const& partitionId,
Core::Context const& context)
{
EnsureSession(partitionId);
return _detail::EventHubsUtilities::GetEventHubsPartitionProperties(
GetSession(partitionId), m_eventHub, partitionId, context);
}
}}} // namespace Azure::Messaging::EventHubs

Просмотреть файл

@ -65,7 +65,8 @@
},
"variables": {
"apiVersion": "2017-04-01",
"eventHubName": "eventHub",
"eventHubName": "testEventHub",
"consumerGroup": "defaultGroup",
"contributorRoleId": "b24988ac-6180-42a0-ab88-20f7382dd24c",
"eventHubsDataOwnerRoleId": "f526a384-b230-433a-b45c-95f59c4a2dec",
"storageDataOwnerRoleId": "b7e6dc6d-f1e8-4753-8033-0f276bb0955b",
@ -124,7 +125,7 @@
{
"type": "Microsoft.EventHub/Namespaces/EventHubs/ConsumerGroups",
"apiVersion": "[variables('apiVersion')]",
"name": "[concat(variables('eventHubsNamespace'), '/', variables('eventHubName'), '/$Default')]",
"name": "[concat(variables('eventHubsNamespace'), '/', variables('eventHubName'), '/', variables('consumerGroup'))]",
"location": "[parameters('location')]",
"dependsOn": [
"[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventHubsNamespace'), variables('eventHubName'))]",
@ -244,6 +245,10 @@
"type": "string",
"value": "[listkeys(variables('eventHubsAuthRuleResourceId'), '2015-08-01').primaryConnectionString]"
},
"EVENTHUB_CONSUMER_GROUP": {
"type": "string",
"value": "[variables('consumerGroup')]"
},
"CHECKPOINTSTORE_STORAGE_CONNECTION_STRING": {
"type": "string",
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=', parameters('storageEndpointSuffix'))]"

Просмотреть файл

@ -107,7 +107,7 @@ private:
Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions;
batchOptions.PartitionId = m_partitionId;
Azure::Messaging::EventHubs::EventDataBatch batch(batchOptions);
Azure::Messaging::EventHubs::EventDataBatch batch(m_client->CreateBatch(batchOptions));
for (uint32_t j = 0; j < m_numberToSend; ++j)
{
@ -117,9 +117,9 @@ private:
event.Properties["PartitionId"]
= static_cast<Azure::Core::Amqp::Models::AmqpValue>(m_partitionId);
AddEndProperty(event, m_numberToSend);
batch.AddMessage(event);
batch.TryAddMessage(event);
}
m_client->SendEventDataBatch(batch, context);
m_client->Send(batch, context);
auto afterSendProps = m_client->GetPartitionProperties(m_partitionId, context);

Просмотреть файл

@ -39,8 +39,8 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace PerfTest
uint32_t m_batchSize;
uint32_t m_prefetchCount;
uint64_t m_rounds;
uint32_t m_paddingBytes;
uint32_t m_maxDeadlineExceeded;
uint32_t m_paddingBytes{};
uint32_t m_maxDeadlineExceeded{};
std::shared_ptr<Azure::Identity::ClientSecretCredential> m_credential;
std::unique_ptr<Azure::Messaging::EventHubs::ProducerClient> m_client;
@ -148,7 +148,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace PerfTest
Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions;
batchOptions.PartitionId = m_partitionId;
Azure::Messaging::EventHubs::EventDataBatch batch(batchOptions);
Azure::Messaging::EventHubs::EventDataBatch batch{m_client->CreateBatch(batchOptions)};
for (uint32_t j = 0; j < m_numberToSend; ++j)
{
@ -158,9 +158,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace PerfTest
event.Properties["PartitionId"]
= static_cast<Azure::Core::Amqp::Models::AmqpValue>(m_partitionId);
AddEndProperty(event, m_numberToSend);
batch.AddMessage(event);
batch.TryAddMessage(event);
}
m_client->SendEventDataBatch(batch, context);
m_client->Send(batch, context);
auto afterSendProps = m_client->GetPartitionProperties(m_partitionId, context);

Просмотреть файл

@ -17,14 +17,16 @@ SetUpTestProxy("sdk/eventhubs")
################## Unit Tests ##########################
add_executable (
azure-messaging-eventhubs-test
checkpoint_store_test.cpp
consumer_client_test.cpp
event_data_test.cpp
processor_load_balancer_test.cpp
processor_test.cpp
producer_client_test.cpp
retry_operation_test.cpp
event_data_test.cpp
consumer_client_test.cpp
checkpoint_store_test.cpp
processor_load_balancer_test.cpp
round_trip_test.cpp
test_checkpoint_store.hpp
processor_test.cpp)
)
create_per_service_target_build(eventhubs azure-messaging-eventhubs-test)
create_map_file(azure-messaging-eventhubs-test azure-messaging-eventhubs-test.map)

Просмотреть файл

@ -4,7 +4,6 @@
#include "eventhubs_test_base.hpp"
#include <azure/core/context.hpp>
#include <azure/core/internal/environment.hpp>
#include <azure/identity.hpp>
#include <azure/messaging/eventhubs.hpp>
@ -40,11 +39,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
TEST_F(CheckpointStoreTest, TestCheckpoints)
{
std::string const testName = GetRandomName();
std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(
Azure::Core::_internal::Environment::GetVariable(
"CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"),
testName,
m_blobClientOptions)};
GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions)};
Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient);
auto checkpoints = checkpointStore.ListCheckpoints(
@ -53,7 +50,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
EXPECT_EQ(0ul, checkpoints.size());
checkpointStore.UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
"$Default",
consumerGroup,
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
@ -62,9 +59,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
});
checkpoints = checkpointStore.ListCheckpoints(
"ns.servicebus.windows.net", "event-hub-name", "$Default");
"ns.servicebus.windows.net", "event-hub-name", consumerGroup);
EXPECT_EQ(checkpoints.size(), 1ul);
EXPECT_EQ("$Default", checkpoints[0].ConsumerGroup);
EXPECT_EQ(consumerGroup, checkpoints[0].ConsumerGroup);
EXPECT_EQ("event-hub-name", checkpoints[0].EventHubName);
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
@ -72,7 +69,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
EXPECT_EQ(101, checkpoints[0].Offset.Value());
checkpointStore.UpdateCheckpoint(Azure::Messaging::EventHubs::Models::Checkpoint{
"$Default",
consumerGroup,
"event-hub-name",
"ns.servicebus.windows.net",
"partition-id",
@ -81,9 +78,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
});
checkpoints = checkpointStore.ListCheckpoints(
"ns.servicebus.windows.net", "event-hub-name", "$Default");
"ns.servicebus.windows.net", "event-hub-name", consumerGroup);
EXPECT_EQ(checkpoints.size(), 1ul);
EXPECT_EQ("$Default", checkpoints[0].ConsumerGroup);
EXPECT_EQ(consumerGroup, checkpoints[0].ConsumerGroup);
EXPECT_EQ("event-hub-name", checkpoints[0].EventHubName);
EXPECT_EQ("ns.servicebus.windows.net", checkpoints[0].FullyQualifiedNamespaceName);
EXPECT_EQ("partition-id", checkpoints[0].PartitionId);
@ -95,10 +92,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
{
std::string const testName = GetRandomName();
auto containerClient{Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(
Azure::Core::_internal::Environment::GetVariable(
"CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"),
testName,
m_blobClientOptions)};
GetEnv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING"), testName, m_blobClientOptions)};
Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient);

Просмотреть файл

@ -22,15 +22,33 @@ void ProcessMessageSuccess(Azure::Core::Amqp::Models::AmqpMessage const& message
} // namespace LocalTest
namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
class ConsumerClientTest : public EventHubsTestBase {
void SetUp() override
{
EventHubsTestBase::SetUp();
if (m_testContext.IsLiveMode())
{
std::string const connStringNoEntityPath = GetEnv("EVENTHUB_CONNECTION_STRING");
std::string eventHubName = GetEnv("EVENTHUB_NAME");
Azure::Messaging::EventHubs::ProducerClient producer{connStringNoEntityPath, eventHubName};
EventDataBatchOptions eventBatchOptions;
eventBatchOptions.PartitionId = "1";
EventDataBatch batch{producer.CreateBatch(eventBatchOptions)};
EXPECT_TRUE(batch.TryAddMessage(Models::EventData{"Test"}));
EXPECT_NO_THROW(producer.Send(batch));
}
}
};
TEST_F(ConsumerClientTest, ConnectionStringNoEntityPath_LIVEONLY_)
{
std::string const connStringNoEntityPath = GetEnv("EVENTHUB_CONNECTION_STRING");
std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
std::string eventHubName = GetEnv("EVENTHUB_NAME");
auto client = Azure::Messaging::EventHubs::ConsumerClient(
connStringNoEntityPath, "eventhub", "$Default");
EXPECT_EQ("eventhub", client.GetEventHubName());
connStringNoEntityPath, eventHubName, consumerGroup);
EXPECT_EQ(eventHubName, client.GetEventHubName());
}
TEST_F(ConsumerClientTest, ConnectionStringEntityPath_LIVEONLY_)
@ -38,8 +56,12 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
std::string const connStringNoEntityPath
= GetEnv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=hehe";
std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
std::string eventHubName = GetEnv("EVENTHUB_NAME");
// The eventHubName parameter is ignored because the eventhub name is in the connection string.
auto client = Azure::Messaging::EventHubs::ConsumerClient(
connStringNoEntityPath, "eventhub", "$DefaultZ");
connStringNoEntityPath, eventHubName, "$DefaultZ");
EXPECT_EQ("hehe", client.GetEventHubName());
EXPECT_EQ("$DefaultZ", client.GetConsumerGroup());
}
@ -47,8 +69,9 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
TEST_F(ConsumerClientTest, ConnectionStringEntityPathNoConsumerGroup_LIVEONLY_)
{
std::string const connStringNoEntityPath = GetEnv("EVENTHUB_CONNECTION_STRING");
auto client = Azure::Messaging::EventHubs::ConsumerClient(connStringNoEntityPath, "eventhub");
EXPECT_EQ("eventhub", client.GetEventHubName());
std::string eventHubName = GetEnv("EVENTHUB_NAME");
auto client = Azure::Messaging::EventHubs::ConsumerClient(connStringNoEntityPath, eventHubName);
EXPECT_EQ(eventHubName, client.GetEventHubName());
EXPECT_EQ("$Default", client.GetConsumerGroup());
}
@ -121,7 +144,6 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
options.ApplicationID = "unit-test";
options.Name = "unit-test";
options.MaxMessageSize = std::numeric_limits<uint16_t>::max();
auto client = Azure::Messaging::EventHubs::ConsumerClient(connStringEntityPath);
Azure::Messaging::EventHubs::PartitionClientOptions partitionOptions;

Просмотреть файл

@ -33,6 +33,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
Azure::Messaging::EventHubs::BlobCheckpointStore checkpointStore(containerClient);
std::string eventHubName{GetEnv("EVENTHUB_NAME")};
std::string consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
std::string const connStringNoEntityPath
= GetEnv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + eventHubName;
@ -42,7 +43,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
options.Name = "processor unittest";
auto client = Azure::Messaging::EventHubs::ConsumerClient(
connStringNoEntityPath, eventHubName, "$Default", options);
connStringNoEntityPath, eventHubName, consumerGroup, options);
ProcessorOptions processorOptions;
processorOptions.LoadBalancingStrategy = Models::ProcessorStrategy::ProcessorStrategyBalanced;
processorOptions.UpdateInterval = std::chrono::seconds(2);

Просмотреть файл

@ -59,6 +59,9 @@ TEST_F(ProducerClientTest, SendMessage_LIVEONLY_)
producerOptions.Name = "sender-link";
producerOptions.ApplicationID = "some";
auto client = Azure::Messaging::EventHubs::ProducerClient(
connStringEntityPath, eventHubName, producerOptions);
Azure::Core::Amqp::Models::AmqpMessage message2;
Azure::Messaging::EventHubs::Models::EventData message1;
message2.SetBody(Azure::Core::Amqp::Models::AmqpValue("Hello7"));
@ -71,27 +74,46 @@ TEST_F(ProducerClientTest, SendMessage_LIVEONLY_)
Azure::Messaging::EventHubs::EventDataBatchOptions edboptions;
edboptions.MaxBytes = std::numeric_limits<uint16_t>::max();
edboptions.PartitionId = "1";
Azure::Messaging::EventHubs::EventDataBatch eventBatch(edboptions);
Azure::Messaging::EventHubs::EventDataBatch eventBatch{client.CreateBatch(edboptions)};
Azure::Messaging::EventHubs::EventDataBatchOptions edboptions2;
edboptions2.MaxBytes = std::numeric_limits<uint16_t>::max();
;
edboptions2.PartitionId = "2";
Azure::Messaging::EventHubs::EventDataBatch eventBatch2(edboptions2);
Azure::Messaging::EventHubs::EventDataBatch eventBatch2{client.CreateBatch(edboptions2)};
eventBatch.AddMessage(message1);
eventBatch.AddMessage(message2);
eventBatch.TryAddMessage(message1);
eventBatch.TryAddMessage(message2);
eventBatch2.AddMessage(message3);
eventBatch2.AddMessage(message2);
eventBatch2.TryAddMessage(message3);
eventBatch2.TryAddMessage(message2);
for (int i = 0; i < 5; i++)
{
EXPECT_NO_THROW(client.Send(eventBatch));
}
}
TEST_F(ProducerClientTest, EventHubRawMessageSend_LIVEONLY_)
{
std::string eventHubName{GetEnv("EVENTHUB_NAME")};
std::string const connStringEntityPath
= GetEnv("EVENTHUB_CONNECTION_STRING") + ";EntityPath=" + eventHubName;
Azure::Messaging::EventHubs::ProducerClientOptions producerOptions;
producerOptions.Name = "sender-link";
producerOptions.ApplicationID = "some";
auto client = Azure::Messaging::EventHubs::ProducerClient(
connStringEntityPath, eventHubName, producerOptions);
for (int i = 0; i < 5; i++)
{
auto result = client.SendEventDataBatch(eventBatch);
EXPECT_TRUE(result);
}
client.Send(Azure::Messaging::EventHubs::Models::EventData{"This is a test message"});
// Send using the implicit EventData constructor.
client.Send(std::string{"String test message"});
// Send using a vector of implicit EventData constructor with a binary buffer.
client.Send({{12, 13, 14, 15}, {16, 17, 18, 19}});
}
TEST_F(ProducerClientTest, GetEventHubProperties_LIVEONLY_)

Просмотреть файл

@ -0,0 +1,155 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "eventhubs_test_base.hpp"
#include <azure/core/context.hpp>
#include <azure/identity.hpp>
#include <azure/messaging/eventhubs.hpp>
#include <gtest/gtest.h>
namespace Azure { namespace Messaging { namespace EventHubs { namespace Test {
class RoundTripTests : public EventHubsTestBase {
};
// Round trip a message with a string body using a sequence number filter.
TEST_F(RoundTripTests, SendAndReceiveStringSequenceNumber_LIVEONLY_)
{
std::string const connectionString = GetEnv("EVENTHUB_CONNECTION_STRING");
std::string const eventHubName = GetEnv("EVENTHUB_NAME");
std::string const consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
int64_t startSequenceNumber = 0;
{
auto producer = Azure::Messaging::EventHubs::ProducerClient(connectionString, eventHubName);
auto partitionProperties = producer.GetPartitionProperties("1");
startSequenceNumber = partitionProperties.LastEnqueuedSequenceNumber;
Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions;
batchOptions.PartitionId = "1";
Azure::Messaging::EventHubs::EventDataBatch eventBatch{producer.CreateBatch(batchOptions)};
eventBatch.TryAddMessage(Azure::Messaging::EventHubs::Models::EventData("Hello world!"));
EXPECT_NO_THROW(producer.Send(eventBatch));
}
{
Azure::Messaging::EventHubs::PartitionClientOptions partitionOptions;
partitionOptions.StartPosition.SequenceNumber = startSequenceNumber;
auto consumer = Azure::Messaging::EventHubs::ConsumerClient(
connectionString, eventHubName, consumerGroup);
auto receiver = consumer.CreatePartitionClient("1", partitionOptions);
auto receivedEvents = receiver.ReceiveEvents(1);
ASSERT_EQ(1ul, receivedEvents.size());
std::vector<uint8_t> expected{'H', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd', '!'};
EXPECT_EQ(expected, receivedEvents[0].Body);
}
}
// Round trip a message with a binary body using an offset filter.
TEST_F(RoundTripTests, SendAndReceiveBinaryDataOffset_LIVEONLY_)
{
std::string const connectionString = GetEnv("EVENTHUB_CONNECTION_STRING");
std::string const eventHubName = GetEnv("EVENTHUB_NAME");
std::string const consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
int64_t startOffset = 0;
{
auto producer = Azure::Messaging::EventHubs::ProducerClient(connectionString, eventHubName);
auto partitionProperties = producer.GetPartitionProperties("1");
startOffset = partitionProperties.LastEnqueuedOffset;
Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions;
batchOptions.PartitionId = "1";
Azure::Messaging::EventHubs::EventDataBatch eventBatch{producer.CreateBatch(batchOptions)};
eventBatch.TryAddMessage(Azure::Messaging::EventHubs::Models::EventData({1, 2, 3, 4, 5}));
EXPECT_NO_THROW(producer.Send(eventBatch));
}
{
auto consumer = Azure::Messaging::EventHubs::ConsumerClient(
connectionString, eventHubName, consumerGroup);
Azure::Messaging::EventHubs::PartitionClientOptions partitionOptions;
partitionOptions.StartPosition.Offset = startOffset;
auto receiver = consumer.CreatePartitionClient("1", partitionOptions);
auto receivedEvents = receiver.ReceiveEvents(1);
ASSERT_EQ(1ul, receivedEvents.size());
for (auto const& event : receivedEvents)
{
GTEST_LOG_(INFO) << "Event: " << event;
EXPECT_TRUE(event.EnqueuedTime);
EXPECT_TRUE(event.Offset);
EXPECT_TRUE(event.SequenceNumber);
}
std::vector<uint8_t> expected{1, 2, 3, 4, 5};
EXPECT_EQ(expected, receivedEvents[0].Body);
}
}
// Round trip a message with a binary body using a queued time filter.
TEST_F(RoundTripTests, SendAndReceiveTimestamp_LIVEONLY_)
{
std::string const connectionString = GetEnv("EVENTHUB_CONNECTION_STRING");
std::string const eventHubName = GetEnv("EVENTHUB_NAME");
std::string const consumerGroup = GetEnv("EVENTHUB_CONSUMER_GROUP");
Azure::DateTime startTime;
{
auto producer = Azure::Messaging::EventHubs::ProducerClient(connectionString, eventHubName);
auto partitionProperties = producer.GetPartitionProperties("1");
GTEST_LOG_(INFO) << "Partition Properties: " << partitionProperties;
startTime = partitionProperties.LastEnqueuedTimeUtc + std::chrono::seconds(1);
GTEST_LOG_(INFO) << "Sleep for a second to reset enqueued time";
std::this_thread::sleep_for(std::chrono::seconds(2));
Azure::Messaging::EventHubs::EventDataBatchOptions batchOptions;
batchOptions.PartitionId = "1";
Azure::Messaging::EventHubs::EventDataBatch eventBatch{producer.CreateBatch(batchOptions)};
Azure::Messaging::EventHubs::Models::EventData eventData;
eventData.Body = {1, 2, 3, 4, 5, 6, 7};
eventData.ContentType = "application/binary";
eventData.MessageId = "Test Message Id";
EXPECT_TRUE(eventBatch.TryAddMessage(eventData));
EXPECT_NO_THROW(producer.Send(eventBatch));
}
{
auto consumer = Azure::Messaging::EventHubs::ConsumerClient(
connectionString, eventHubName, consumerGroup);
Azure::Messaging::EventHubs::PartitionClientOptions partitionOptions;
partitionOptions.StartPosition.EnqueuedTime = startTime;
partitionOptions.StartPosition.Inclusive = false;
auto receiver = consumer.CreatePartitionClient("1", partitionOptions);
auto receivedEvents = receiver.ReceiveEvents(1);
ASSERT_EQ(1ul, receivedEvents.size());
for (auto const& event : receivedEvents)
{
GTEST_LOG_(INFO) << "Event: " << event;
EXPECT_TRUE(event.EnqueuedTime);
EXPECT_TRUE(event.Offset);
EXPECT_TRUE(event.SequenceNumber);
}
std::vector<uint8_t> expected{1, 2, 3, 4, 5, 6, 7};
EXPECT_EQ(expected, receivedEvents[0].Body);
ASSERT_TRUE(receivedEvents[0].ContentType);
EXPECT_EQ("application/binary", receivedEvents[0].ContentType.Value());
ASSERT_TRUE(receivedEvents[0].MessageId);
EXPECT_EQ("Test Message Id", static_cast<std::string>(receivedEvents[0].MessageId.Value()));
}
}
}}}} // namespace Azure::Messaging::EventHubs::Test

Просмотреть файл

@ -49,6 +49,12 @@ stages:
Value: "non-real-secret"
- Name: AZURE_SUBSCRIPTION_ID
Value: "non-real-sub"
- Name: EVENTHUB_CONSUMER_GROUP
Value: "defaultgroup"
- Name: EVENTHUB_NAME
Value: "non-real-eventhub-name"
- Name: EVENTHUB_CONNECTION_STRING
Value: "Endpoint=sb://notReal.servicebus.windows.net/;SharedAccessKeyName=notReal"
- Name: CHECKPOINTSTORE_STORAGE_CONNECTION_STRING
Value: "DefaultEndpointsProtocol=https;AccountName=notReal;AccountKey=3333333333333333333333333333333333333333333333333333333333333333333333333333333333333333;EndpointSuffix=core.windows.net"