Update Python OAuth2 tutorial to align with upstream / quickstart
Fix oauth_cb access token expiration bug Add additional context/documentation and troubleshooting guide
This commit is contained in:
Родитель
18b2448906
Коммит
bb8a71a5c9
|
@ -1,85 +1,198 @@
|
|||
# Using Confluent's Python Kafka client and librdkafka with Event Hubs for Apache Kafka Ecosystems
|
||||
# Using Azure Active Directory authentication with Python and Event Hubs for Apache Kafka
|
||||
|
||||
This quickstart will show how to create and connect to an Event Hubs Kafka endpoint using an example producer and consumer written in python. Azure Event Hubs for Apache Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
|
||||
This tutorial will show how to create and connect to an Event Hubs Kafka endpoint using Azure Active Directory authentication. Azure Event Hubs for Apache Kafka supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
|
||||
|
||||
This sample is based on [Confluent's Apache Kafka Python client](https://github.com/confluentinc/confluent-kafka-python), modified for use with Event Hubs for Kafka. While the tutorial is aimed at Linux users, MacOS users can follow along with Homebrew.
|
||||
This sample is based on [Confluent's Apache Kafka Python client](https://github.com/confluentinc/confluent-kafka-python), modified for use with Event Hubs for Kafka.
|
||||
|
||||
## Overview
|
||||
|
||||
Event Hubs integrates with Azure Active Directory (Azure AD), which provides an OAuth 2.0 compliant authorization server. Azure role-based access control (Azure RBAC) can be used to grant permissions to Kafka client identities.
|
||||
|
||||
To grant access to an Event Hubs resource, the security principal must be authenticated and an OAuth 2.0 token is returned. The token is then passed as part of the request to the Event Hubs service to authorize access to the resource. This is done by setting the appropriate values in the Kafka client configuration.
|
||||
|
||||
For more information on using Azure AD with Event Hubs, see [Authorize access with Azure Active Directory](https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory#overview)
|
||||
|
||||
### Retrieve Azure Active Directory (AAD) Token
|
||||
|
||||
The `DefaultAzureCredential` class from the [Azure Identity client library](https://docs.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python) can be used to get a credential with the scope of `https://<namespace>.servicebus.windows.net/.default` to retrieve the access token for the Event Hubs namespace.
|
||||
|
||||
This class is suitable for use with Azure CLI for local development, Managed Identity for Azure deployments, and with service principal client secrets/certificates. See [DefaultAzureCredential](https://docs.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python) for more information on this class.
|
||||
|
||||
### Role Assignments
|
||||
|
||||
An RBAC role must be assigned to the application to gain access to Event Hubs. Azure provides built-in roles for authorizing access to Event Hubs. See [Azure Built in Roles for Azure Event Hubs](https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory#azure-built-in-roles-for-azure-event-hubs).
|
||||
|
||||
To create a service principal and assign RBAC roles to the application, review [How to Create a Service Principal](https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal).
|
||||
|
||||
### Kafka Client Configuration
|
||||
|
||||
To connect a Kafka client to Event Hubs using OAuth 2.0, the following configuration properties are required:
|
||||
|
||||
```properties
|
||||
bootstrap.servers=<namespace>.servicebus.windows.net:9093
|
||||
security.protocol=SASL_SSL
|
||||
sasl.mechanism=OAUTHBEARER
|
||||
oauth_cb=<Callback for retrieving OAuth Bearer token>
|
||||
```
|
||||
|
||||
The return value of `oauth_cb` must be a (`token_str`, `expiry_time`) tuple where `expiry_time` is the time in seconds since the epoch as a floating point number.
|
||||
See [Confluent Kafka for Python](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html) for reference.
|
||||
|
||||
An example `oauth_cb` and kafka client configuration is shown below:
|
||||
|
||||
```python
|
||||
from azure.identity import DefaultAzureCredential
|
||||
from confluent_kafka import Producer
|
||||
from functools import partial
|
||||
|
||||
def oauth_cb(cred, namespace_fqdn, config):
|
||||
# note: confluent_kafka passes 'sasl.oauthbearer.config' as the config param
|
||||
access_token = cred.get_token(f'https://{namespace_fqdn}/.default')
|
||||
return access_token.token, access_token.expires_on
|
||||
|
||||
# A credential object retrieves access tokens
|
||||
credential = DefaultAzureCredential()
|
||||
|
||||
# The namespace FQDN is used both for specifying the server and as the token audience
|
||||
namespace_fqdn = '<namespace>.servicebus.windows.net'
|
||||
|
||||
# Minimum required configuration to connect to Event Hubs for Kafka with AAD
|
||||
p = Producer({
|
||||
'bootstrap.servers': f'{namespace_fqdn}:9093',
|
||||
'security.protocol': 'SASL_SSL',
|
||||
'sasl.mechanism': 'OAUTHBEARER',
|
||||
'oauth_cb': partial(oauth_cb, credential, namespace_fqdn),
|
||||
})
|
||||
```
|
||||
|
||||
## Prerequisites
|
||||
|
||||
If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
|
||||
|
||||
In addition:
|
||||
You'll also need:
|
||||
|
||||
* [Git](https://www.git-scm.com/downloads)
|
||||
* [Python](https://www.python.org/downloads/) (versions 2.7.x and 3.6.x are fine)
|
||||
* [Python](https://www.python.org/downloads/)
|
||||
* [Pip](https://pypi.org/project/pip/)
|
||||
* [OpenSSL](https://www.openssl.org/) (including libssl)
|
||||
* [librdkafka](https://github.com/edenhill/librdkafka)
|
||||
|
||||
Running the setup script provided in this repo will install and configure all of the required dependencies.
|
||||
|
||||
## Create an Event Hubs namespace
|
||||
|
||||
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
|
||||
An Event Hubs namespace includes a Kafka endpoint and is required to send or receive data. See [Quickstart: Create an event hub using Azure portal](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create) for instructions on creating an Event Hubs namespace.
|
||||
|
||||
## Getting ready
|
||||
|
||||
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the `quickstart/python` subfolder:
|
||||
Now that you have an Event Hubs namespace, clone the repository and navigate to the `tutorials/oauth/python` directory:
|
||||
|
||||
```bash
|
||||
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
|
||||
cd azure-event-hubs-for-kafka/quickstart/python
|
||||
cd azure-event-hubs-for-kafka/tutorials/oauth/python
|
||||
```
|
||||
|
||||
Now run the set up script:
|
||||
Install the sample dependencies:
|
||||
|
||||
```shell
|
||||
source setup.sh
|
||||
python -m pip install -r requirements.txt
|
||||
```
|
||||
|
||||
(If using MacOS, you can use Homebrew to set up your machine by running `brew install openssl python librdkafka` - `pip` can then be used to install the Python SDK with `pip install confluent-kafka`.)
|
||||
|
||||
## Running the samples
|
||||
|
||||
### Update your configurations
|
||||
### Configuring credentials
|
||||
|
||||
Both the producer and consumer samples require extra configuration to authenticate with your Event Hubs namespace. Add required configurations to environment (.env) file at the root folder.
|
||||
In this example, a service principal is used for the AAD credential. This service principal must have the `Azure Event Hubs Data Sender` and `Azure Event Hubs Data Receiver` roles (or equivalent) assigned on the target Event Hubs namespace. We configure `DefaultAzureCredential` by setting the following environment variables:
|
||||
|
||||
AZURE_AUTHORITY_HOST=login.microsoftonline.com
|
||||
|
||||
AZURE_CLIENT_ID=<<AppClientId>>
|
||||
|
||||
AZURE_CLIENT_SECRET=<<AppSecret>>
|
||||
|
||||
AZURE_TENANT_ID=<<TenantID>>
|
||||
|
||||
AZURE_CLIENT_CERTIFICATE_PATH=<<AzureAdClientSecretCertPath>> ## For Certs remove the AZURE_CLIENT_SECRET entry from .env file
|
||||
|
||||
AZURE_CLIENT_SEND_CERTIFICATE_CHAIN=<<TrueToValidateISsuesAndSubjectName>>
|
||||
|
||||
EVENT_HUB_HOSTNAME=<<EventHubNameSpace>>
|
||||
|
||||
EVENT_HUB_NAME=<<EvemtHubName>>
|
||||
|
||||
CONSUMER_GROUP=<<ConsumerGroupName>>
|
||||
|
||||
### Producing
|
||||
|
||||
```shell
|
||||
python producer.py <topic>
|
||||
```shell
|
||||
export AZURE_TENANT_ID=<TenantID>
|
||||
export AZURE_CLIENT_ID=<AppClientId>
|
||||
export AZURE_CLIENT_SECRET=<AppSecret>
|
||||
```
|
||||
|
||||
Note that the topic must already exist or else you will see an "Unknown topic or partition" error.
|
||||
### Producing
|
||||
|
||||
```shell
|
||||
# Usage: producer.py <eventhubs-namespace> <topic>.
|
||||
python producer.py mynamespace.servicebus.windows.net topic1
|
||||
```
|
||||
|
||||
> Note: the topic must already exist, or will see an "Unknown topic or partition" error when running with the `Azure Event Hubs Data Sender` role. With the `Azure Event Hubs Data Owner` role, the topic (Event Hub) will be automatically created.
|
||||
|
||||
### Consuming
|
||||
|
||||
```shell
|
||||
python consumer.py
|
||||
# Usage: consumer.py [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..
|
||||
python consumer.py mynamespace.servicebus.windows.net myconsumergroup topic1
|
||||
```
|
||||
|
||||
### Dependencies
|
||||
## Troubleshooting
|
||||
|
||||
**Azure.Identity** -> For Azure AD AUTH. Please refer defaultazurecredential
|
||||
Listed below are some common errors and possible remediations
|
||||
|
||||
**Confluent-Kafka** -> To connect to EventHub using Kafka protocol
|
||||
### Missing credentials
|
||||
|
||||
```text
|
||||
DefaultAzureCredential failed to retrieve a token from the included credentials.
|
||||
Attempted credentials:
|
||||
EnvironmentCredential: EnvironmentCredential authentication unavailable. Environment variables are not fully configured.
|
||||
Visit https://aka.ms/azsdk/python/identity/environmentcredential/troubleshoot to troubleshoot.this issue.
|
||||
ManagedIdentityCredential: ManagedIdentityCredential authentication unavailable, no response from the IMDS endpoint.
|
||||
SharedTokenCacheCredential: SharedTokenCacheCredential authentication unavailable. No accounts were found in the cache.
|
||||
VisualStudioCodeCredential: Failed to get Azure user details from Visual Studio Code.
|
||||
AzureCliCredential: Azure CLI not found on path
|
||||
AzurePowerShellCredential: PowerShell is not installed
|
||||
To mitigate this issue, please refer to the troubleshooting guidelines here at https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot.
|
||||
```
|
||||
|
||||
#### Cause
|
||||
|
||||
You are missing credentials to connect to Azure AD and retrieve an access token.
|
||||
|
||||
#### Remediation
|
||||
|
||||
* If using the [Azure CLI](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli), run `az login`
|
||||
* If using a service principal, set the following environment variables:
|
||||
* `AZURE_TENANT_ID`
|
||||
* `AZURE_CLIENT_ID`
|
||||
* `AZURE_CLIENT_SECRET`
|
||||
|
||||
### Incorrect credentials
|
||||
|
||||
```text
|
||||
DefaultAzureCredential failed to retrieve a token from the included credentials.
|
||||
Attempted credentials:
|
||||
EnvironmentCredential: Authentication failed: AADSTS7000215: Invalid client secret provided. Ensure the secret being sent in the request is the client secret value, not the client secret ID, for a secret added to app '11dba002-88d3-45a5-b8c0-75a84ec2c1eb'.
|
||||
Trace ID: ba773467-5755-4492-9c05-d35d73683600
|
||||
Correlation ID: e976bf48-228a-4114-b06e-ce97aeb52af3
|
||||
Timestamp: 2022-09-28 16:43:50Z
|
||||
To mitigate this issue, please refer to the troubleshooting guidelines here at https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot.
|
||||
```
|
||||
|
||||
#### Cause
|
||||
|
||||
Your service principal credentials are incorrect or expired
|
||||
|
||||
#### Remediation
|
||||
|
||||
Ensure that your `AZURE_CLIENT_ID` and `AZURE_CLIENT_SECRET` values are correct
|
||||
|
||||
### Missing role assignments
|
||||
|
||||
Producer:
|
||||
|
||||
```text
|
||||
% Message failed delivery: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Broker: Topic authorization failed"}
|
||||
```
|
||||
|
||||
Consumer:
|
||||
|
||||
```text
|
||||
cimpl.KafkaException: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Failed to fetch committed offset for group "<consumer group>" topic <topic name>[0]: Broker: Topic authorization failed"}
|
||||
```
|
||||
|
||||
#### Cause
|
||||
|
||||
You are missing Azure RBAC role assignments for your application's identity (Azure CLI, Service Principal, etc)
|
||||
|
||||
#### Remediation
|
||||
|
||||
* Ensure that your application's identity is assigned the correct roles
|
||||
* Producers need `Azure Event Hubs Data Sender`
|
||||
* Consumers need `Azure Event Hubs Data Receiver`
|
||||
|
|
|
@ -7,63 +7,123 @@
|
|||
#
|
||||
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
|
||||
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from confluent_kafka import Consumer
|
||||
from azure.identity import DefaultAzureCredential
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
|
||||
load_dotenv()
|
||||
|
||||
FULLY_QUALIFIED_NAMESPACE= os.environ['EVENT_HUB_HOSTNAME']
|
||||
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
|
||||
CONSUMER_GROUP='$Default'
|
||||
AUTH_SCOPE= "https://" + FULLY_QUALIFIED_NAMESPACE +"/.default"
|
||||
|
||||
# AAD
|
||||
cred = DefaultAzureCredential()
|
||||
from confluent_kafka import Consumer, KafkaException
|
||||
import sys
|
||||
import getopt
|
||||
import json
|
||||
import logging
|
||||
from functools import partial
|
||||
from pprint import pformat
|
||||
|
||||
|
||||
def _get_token(config):
|
||||
"""Note here value of config comes from sasl.oauthbearer.config below.
|
||||
It is not used in this example but you can put arbitrary values to
|
||||
configure how you can get the token (e.g. which token URL to use)
|
||||
"""
|
||||
access_token = cred.get_token(AUTH_SCOPE)
|
||||
return access_token.token, time.time() + access_token.expires_on
|
||||
def stats_cb(stats_json_str):
|
||||
stats_json = json.loads(stats_json_str)
|
||||
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))
|
||||
|
||||
|
||||
consumer = Consumer({
|
||||
"bootstrap.servers": FULLY_QUALIFIED_NAMESPACE + ":9093",
|
||||
"sasl.mechanism": "OAUTHBEARER",
|
||||
"security.protocol": "SASL_SSL",
|
||||
"oauth_cb": _get_token,
|
||||
"group.id": CONSUMER_GROUP,
|
||||
# "debug": "broker,topic,msg"
|
||||
})
|
||||
def oauth_cb(cred, namespace_fqdn, config):
|
||||
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)
|
||||
|
||||
# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
|
||||
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
|
||||
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param
|
||||
|
||||
access_token = cred.get_token('https://%s/.default' % namespace_fqdn)
|
||||
return access_token.token, access_token.expires_on
|
||||
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
print("exiting")
|
||||
consumer.close()
|
||||
sys.exit(0)
|
||||
def print_usage_and_exit(program_name):
|
||||
sys.stderr.write(
|
||||
'Usage: %s [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..\n' % program_name)
|
||||
options = '''
|
||||
Options:
|
||||
-T <intvl> Enable client statistics at specified interval (ms)
|
||||
'''
|
||||
sys.stderr.write(options)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
if __name__ == '__main__':
|
||||
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
|
||||
if len(argv) < 3:
|
||||
print_usage_and_exit(sys.argv[0])
|
||||
|
||||
print("consuming " + EVENTHUB_NAME)
|
||||
consumer.subscribe([EVENTHUB_NAME])
|
||||
namespace = argv[0]
|
||||
group = argv[1]
|
||||
topics = argv[2:]
|
||||
|
||||
while True:
|
||||
msg = consumer.poll(1.0)
|
||||
# Azure credential
|
||||
# See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
|
||||
cred = DefaultAzureCredential()
|
||||
|
||||
if msg is None:
|
||||
continue
|
||||
if msg.error():
|
||||
print(f"Consumer error: {msg.error()}")
|
||||
continue
|
||||
# Consumer configuration
|
||||
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
|
||||
conf = {
|
||||
'bootstrap.servers': '%s:9093' % namespace,
|
||||
'group.id': group,
|
||||
'session.timeout.ms': 6000,
|
||||
'auto.offset.reset': 'earliest',
|
||||
|
||||
print(
|
||||
f"Received message [{msg.partition()}]: {msg.value().decode('utf-8')}")
|
||||
# Required OAuth2 configuration properties
|
||||
'security.protocol': 'SASL_SSL',
|
||||
'sasl.mechanism': 'OAUTHBEARER',
|
||||
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
|
||||
'oauth_cb': partial(oauth_cb, cred, namespace),
|
||||
}
|
||||
|
||||
# Check to see if -T option exists
|
||||
for opt in optlist:
|
||||
if opt[0] != '-T':
|
||||
continue
|
||||
try:
|
||||
intval = int(opt[1])
|
||||
except ValueError:
|
||||
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
|
||||
sys.exit(1)
|
||||
|
||||
if intval <= 0:
|
||||
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
|
||||
sys.exit(1)
|
||||
|
||||
conf['stats_cb'] = stats_cb
|
||||
conf['statistics.interval.ms'] = int(opt[1])
|
||||
|
||||
# Create logger for consumer (logs will be emitted when poll() is called)
|
||||
logger = logging.getLogger('consumer')
|
||||
logger.setLevel(logging.DEBUG)
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
|
||||
logger.addHandler(handler)
|
||||
|
||||
# Create Consumer instance
|
||||
# Hint: try debug='fetch' to generate some log messages
|
||||
c = Consumer(conf, logger=logger)
|
||||
|
||||
def print_assignment(consumer, partitions):
|
||||
print('Assignment:', partitions)
|
||||
|
||||
# Subscribe to topics
|
||||
c.subscribe(topics, on_assign=print_assignment)
|
||||
|
||||
# Read messages from Kafka, print to stdout
|
||||
try:
|
||||
while True:
|
||||
msg = c.poll(timeout=1.0)
|
||||
if msg is None:
|
||||
continue
|
||||
if msg.error():
|
||||
raise KafkaException(msg.error())
|
||||
else:
|
||||
# Proper message
|
||||
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
|
||||
(msg.topic(), msg.partition(), msg.offset(),
|
||||
str(msg.key())))
|
||||
print(msg.value())
|
||||
|
||||
except KeyboardInterrupt:
|
||||
sys.stderr.write('%% Aborted by user\n')
|
||||
|
||||
finally:
|
||||
# Close down consumer to commit final offsets.
|
||||
c.close()
|
||||
|
|
|
@ -7,60 +7,74 @@
|
|||
#
|
||||
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
|
||||
|
||||
import time
|
||||
from confluent_kafka import Producer
|
||||
from azure.identity import DefaultAzureCredential
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
|
||||
load_dotenv()
|
||||
|
||||
FULLY_QUALIFIED_NAMESPACE= os.environ['EVENT_HUB_HOSTNAME']
|
||||
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
|
||||
AUTH_SCOPE= "https://" + FULLY_QUALIFIED_NAMESPACE +"/.default"
|
||||
|
||||
# AAD
|
||||
cred = DefaultAzureCredential()
|
||||
|
||||
def _get_token(config):
|
||||
"""Note here value of config comes from sasl.oauthbearer.config below.
|
||||
It is not used in this example but you can put arbitrary values to
|
||||
configure how you can get the token (e.g. which token URL to use)
|
||||
"""
|
||||
access_token = cred.get_token(AUTH_SCOPE)
|
||||
return access_token.token, time.time() + access_token.expires_on
|
||||
from confluent_kafka import Producer
|
||||
import sys
|
||||
from functools import partial
|
||||
|
||||
|
||||
producer = Producer({
|
||||
"bootstrap.servers": FULLY_QUALIFIED_NAMESPACE + ":9093",
|
||||
"sasl.mechanism": "OAUTHBEARER",
|
||||
"security.protocol": "SASL_SSL",
|
||||
"oauth_cb": _get_token,
|
||||
"enable.idempotence": True,
|
||||
"acks": "all",
|
||||
# "debug": "broker,topic,msg"
|
||||
})
|
||||
def oauth_cb(cred, namespace_fqdn, config):
|
||||
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)
|
||||
|
||||
# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
|
||||
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
|
||||
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param
|
||||
|
||||
access_token = cred.get_token('https://%s/.default' % namespace_fqdn)
|
||||
return access_token.token, access_token.expires_on
|
||||
|
||||
|
||||
def delivery_report(err, msg):
|
||||
"""Called once for each message produced to indicate delivery result.
|
||||
Triggered by poll() or flush()."""
|
||||
if err is not None:
|
||||
print(f"Message delivery failed: {err}")
|
||||
else:
|
||||
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) != 3:
|
||||
sys.stderr.write('Usage: %s <eventhubs-namespace> <topic>\n' % sys.argv[0])
|
||||
sys.exit(1)
|
||||
|
||||
namespace = sys.argv[1]
|
||||
topic = sys.argv[2]
|
||||
|
||||
some_data_source = [str(i) for i in range(1000)]
|
||||
for data in some_data_source:
|
||||
# Trigger any available delivery report callbacks from previous produce() calls
|
||||
producer.poll(0)
|
||||
# Azure credential
|
||||
# See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
|
||||
cred = DefaultAzureCredential()
|
||||
|
||||
# Asynchronously produce a message, the delivery report callback
|
||||
# will be triggered from poll() above, or flush() below, when the message has
|
||||
# been successfully delivered or failed permanently.
|
||||
producer.produce(EVENTHUB_NAME, f"Hello {data}".encode("utf-8"), callback=delivery_report)
|
||||
# Producer configuration
|
||||
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
|
||||
conf = {
|
||||
'bootstrap.servers': '%s:9093' % namespace,
|
||||
|
||||
# Wait for any outstanding messages to be delivered and delivery report
|
||||
# callbacks to be triggered.
|
||||
producer.flush()
|
||||
# Required OAuth2 configuration properties
|
||||
'security.protocol': 'SASL_SSL',
|
||||
'sasl.mechanism': 'OAUTHBEARER',
|
||||
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
|
||||
'oauth_cb': partial(oauth_cb, cred, namespace),
|
||||
}
|
||||
|
||||
# Create Producer instance
|
||||
p = Producer(**conf)
|
||||
|
||||
# Optional per-message delivery callback (triggered by poll() or flush())
|
||||
# when a message has been successfully delivered or permanently
|
||||
# failed delivery (after retries).
|
||||
def delivery_callback(err, msg):
|
||||
if err:
|
||||
sys.stderr.write('%% Message failed delivery: %s\n' % err)
|
||||
else:
|
||||
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
|
||||
(msg.topic(), msg.partition(), msg.offset()))
|
||||
|
||||
# Write 1-100 to topic
|
||||
for i in range(0, 100):
|
||||
try:
|
||||
p.produce(topic, str(i), callback=delivery_callback)
|
||||
except BufferError:
|
||||
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
|
||||
len(p))
|
||||
|
||||
# Serve delivery callback queue.
|
||||
# NOTE: Since produce() is an asynchronous API this poll() call
|
||||
# will most likely not serve the delivery callback for the
|
||||
# last produce()d message.
|
||||
p.poll(0)
|
||||
|
||||
# Wait until all messages have been delivered
|
||||
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
|
||||
p.flush()
|
||||
|
|
|
@ -1,10 +1,2 @@
|
|||
#python -m pip install -r requirements.txt
|
||||
aiohttp
|
||||
azure.eventhub
|
||||
azure.identity
|
||||
python-dotenv
|
||||
confluent-kafka
|
||||
azure-identity
|
||||
azure.keyvault.certificates
|
||||
azure.keyvault.secrets
|
||||
az.cli
|
||||
confluent-kafka
|
||||
|
|
|
@ -1,9 +0,0 @@
|
|||
#!/bin/bash
|
||||
|
||||
sudo apt-get update
|
||||
|
||||
sudo apt-get install python3.8
|
||||
|
||||
echo "Setting up required dependencies"
|
||||
sudo pip3 --disable-pip-version-check --no-cache-dir install -r
|
||||
echo "Try running the samples now!"
|
Загрузка…
Ссылка в новой задаче