Merge pull request #210 from noelbundick-msft/python-oauth-pr

OAuth2 + Python fixes and updates
This commit is contained in:
Serkant Karaca 2022-10-10 13:39:50 -07:00 коммит произвёл GitHub
Родитель 18b2448906 bb8a71a5c9
Коммит 60c7fc8d06
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 328 добавлений и 158 удалений

Просмотреть файл

@ -1,85 +1,198 @@
# Using Confluent's Python Kafka client and librdkafka with Event Hubs for Apache Kafka Ecosystems
# Using Azure Active Directory authentication with Python and Event Hubs for Apache Kafka
This quickstart will show how to create and connect to an Event Hubs Kafka endpoint using an example producer and consumer written in python. Azure Event Hubs for Apache Kafka Ecosystems supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
This tutorial will show how to create and connect to an Event Hubs Kafka endpoint using Azure Active Directory authentication. Azure Event Hubs for Apache Kafka supports [Apache Kafka version 1.0](https://kafka.apache.org/10/documentation.html) and later.
This sample is based on [Confluent's Apache Kafka Python client](https://github.com/confluentinc/confluent-kafka-python), modified for use with Event Hubs for Kafka. While the tutorial is aimed at Linux users, MacOS users can follow along with Homebrew.
This sample is based on [Confluent's Apache Kafka Python client](https://github.com/confluentinc/confluent-kafka-python), modified for use with Event Hubs for Kafka.
## Overview
Event Hubs integrates with Azure Active Directory (Azure AD), which provides an OAuth 2.0 compliant authorization server. Azure role-based access control (Azure RBAC) can be used to grant permissions to Kafka client identities.
To grant access to an Event Hubs resource, the security principal must be authenticated and an OAuth 2.0 token is returned. The token is then passed as part of the request to the Event Hubs service to authorize access to the resource. This is done by setting the appropriate values in the Kafka client configuration.
For more information on using Azure AD with Event Hubs, see [Authorize access with Azure Active Directory](https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory#overview)
### Retrieve Azure Active Directory (AAD) Token
The `DefaultAzureCredential` class from the [Azure Identity client library](https://docs.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python) can be used to get a credential with the scope of `https://<namespace>.servicebus.windows.net/.default` to retrieve the access token for the Event Hubs namespace.
This class is suitable for use with Azure CLI for local development, Managed Identity for Azure deployments, and with service principal client secrets/certificates. See [DefaultAzureCredential](https://docs.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python) for more information on this class.
### Role Assignments
An RBAC role must be assigned to the application to gain access to Event Hubs. Azure provides built-in roles for authorizing access to Event Hubs. See [Azure Built in Roles for Azure Event Hubs](https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory#azure-built-in-roles-for-azure-event-hubs).
To create a service principal and assign RBAC roles to the application, review [How to Create a Service Principal](https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal).
### Kafka Client Configuration
To connect a Kafka client to Event Hubs using OAuth 2.0, the following configuration properties are required:
```properties
bootstrap.servers=<namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
oauth_cb=<Callback for retrieving OAuth Bearer token>
```
The return value of `oauth_cb` must be a (`token_str`, `expiry_time`) tuple where `expiry_time` is the time in seconds since the epoch as a floating point number.
See [Confluent Kafka for Python](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html) for reference.
An example `oauth_cb` and kafka client configuration is shown below:
```python
from azure.identity import DefaultAzureCredential
from confluent_kafka import Producer
from functools import partial
def oauth_cb(cred, namespace_fqdn, config):
# note: confluent_kafka passes 'sasl.oauthbearer.config' as the config param
access_token = cred.get_token(f'https://{namespace_fqdn}/.default')
return access_token.token, access_token.expires_on
# A credential object retrieves access tokens
credential = DefaultAzureCredential()
# The namespace FQDN is used both for specifying the server and as the token audience
namespace_fqdn = '<namespace>.servicebus.windows.net'
# Minimum required configuration to connect to Event Hubs for Kafka with AAD
p = Producer({
'bootstrap.servers': f'{namespace_fqdn}:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'oauth_cb': partial(oauth_cb, credential, namespace_fqdn),
})
```
## Prerequisites
If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.
In addition:
You'll also need:
* [Git](https://www.git-scm.com/downloads)
* [Python](https://www.python.org/downloads/) (versions 2.7.x and 3.6.x are fine)
* [Python](https://www.python.org/downloads/)
* [Pip](https://pypi.org/project/pip/)
* [OpenSSL](https://www.openssl.org/) (including libssl)
* [librdkafka](https://github.com/edenhill/librdkafka)
Running the setup script provided in this repo will install and configure all of the required dependencies.
## Create an Event Hubs namespace
An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. Make sure to copy the Event Hubs connection string for later use.
An Event Hubs namespace includes a Kafka endpoint and is required to send or receive data. See [Quickstart: Create an event hub using Azure portal](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create) for instructions on creating an Event Hubs namespace.
## Getting ready
Now that you have a Kafka-enabled Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the `quickstart/python` subfolder:
Now that you have an Event Hubs namespace, clone the repository and navigate to the `tutorials/oauth/python` directory:
```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/quickstart/python
cd azure-event-hubs-for-kafka/tutorials/oauth/python
```
Now run the set up script:
Install the sample dependencies:
```shell
source setup.sh
python -m pip install -r requirements.txt
```
(If using MacOS, you can use Homebrew to set up your machine by running `brew install openssl python librdkafka` - `pip` can then be used to install the Python SDK with `pip install confluent-kafka`.)
## Running the samples
### Update your configurations
### Configuring credentials
Both the producer and consumer samples require extra configuration to authenticate with your Event Hubs namespace. Add required configurations to environment (.env) file at the root folder.
In this example, a service principal is used for the AAD credential. This service principal must have the `Azure Event Hubs Data Sender` and `Azure Event Hubs Data Receiver` roles (or equivalent) assigned on the target Event Hubs namespace. We configure `DefaultAzureCredential` by setting the following environment variables:
AZURE_AUTHORITY_HOST=login.microsoftonline.com
AZURE_CLIENT_ID=<<AppClientId>>
AZURE_CLIENT_SECRET=<<AppSecret>>
AZURE_TENANT_ID=<<TenantID>>
AZURE_CLIENT_CERTIFICATE_PATH=<<AzureAdClientSecretCertPath>> ## For Certs remove the AZURE_CLIENT_SECRET entry from .env file
AZURE_CLIENT_SEND_CERTIFICATE_CHAIN=<<TrueToValidateISsuesAndSubjectName>>
EVENT_HUB_HOSTNAME=<<EventHubNameSpace>>
EVENT_HUB_NAME=<<EvemtHubName>>
CONSUMER_GROUP=<<ConsumerGroupName>>
### Producing
```shell
python producer.py <topic>
```shell
export AZURE_TENANT_ID=<TenantID>
export AZURE_CLIENT_ID=<AppClientId>
export AZURE_CLIENT_SECRET=<AppSecret>
```
Note that the topic must already exist or else you will see an "Unknown topic or partition" error.
### Producing
```shell
# Usage: producer.py <eventhubs-namespace> <topic>.
python producer.py mynamespace.servicebus.windows.net topic1
```
> Note: the topic must already exist, or will see an "Unknown topic or partition" error when running with the `Azure Event Hubs Data Sender` role. With the `Azure Event Hubs Data Owner` role, the topic (Event Hub) will be automatically created.
### Consuming
```shell
python consumer.py
# Usage: consumer.py [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..
python consumer.py mynamespace.servicebus.windows.net myconsumergroup topic1
```
### Dependencies
## Troubleshooting
**Azure.Identity** -> For Azure AD AUTH. Please refer defaultazurecredential
Listed below are some common errors and possible remediations
**Confluent-Kafka** -> To connect to EventHub using Kafka protocol
### Missing credentials
```text
DefaultAzureCredential failed to retrieve a token from the included credentials.
Attempted credentials:
EnvironmentCredential: EnvironmentCredential authentication unavailable. Environment variables are not fully configured.
Visit https://aka.ms/azsdk/python/identity/environmentcredential/troubleshoot to troubleshoot.this issue.
ManagedIdentityCredential: ManagedIdentityCredential authentication unavailable, no response from the IMDS endpoint.
SharedTokenCacheCredential: SharedTokenCacheCredential authentication unavailable. No accounts were found in the cache.
VisualStudioCodeCredential: Failed to get Azure user details from Visual Studio Code.
AzureCliCredential: Azure CLI not found on path
AzurePowerShellCredential: PowerShell is not installed
To mitigate this issue, please refer to the troubleshooting guidelines here at https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot.
```
#### Cause
You are missing credentials to connect to Azure AD and retrieve an access token.
#### Remediation
* If using the [Azure CLI](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli), run `az login`
* If using a service principal, set the following environment variables:
* `AZURE_TENANT_ID`
* `AZURE_CLIENT_ID`
* `AZURE_CLIENT_SECRET`
### Incorrect credentials
```text
DefaultAzureCredential failed to retrieve a token from the included credentials.
Attempted credentials:
EnvironmentCredential: Authentication failed: AADSTS7000215: Invalid client secret provided. Ensure the secret being sent in the request is the client secret value, not the client secret ID, for a secret added to app '11dba002-88d3-45a5-b8c0-75a84ec2c1eb'.
Trace ID: ba773467-5755-4492-9c05-d35d73683600
Correlation ID: e976bf48-228a-4114-b06e-ce97aeb52af3
Timestamp: 2022-09-28 16:43:50Z
To mitigate this issue, please refer to the troubleshooting guidelines here at https://aka.ms/azsdk/python/identity/defaultazurecredential/troubleshoot.
```
#### Cause
Your service principal credentials are incorrect or expired
#### Remediation
Ensure that your `AZURE_CLIENT_ID` and `AZURE_CLIENT_SECRET` values are correct
### Missing role assignments
Producer:
```text
% Message failed delivery: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Broker: Topic authorization failed"}
```
Consumer:
```text
cimpl.KafkaException: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Failed to fetch committed offset for group "<consumer group>" topic <topic name>[0]: Broker: Topic authorization failed"}
```
#### Cause
You are missing Azure RBAC role assignments for your application's identity (Azure CLI, Service Principal, etc)
#### Remediation
* Ensure that your application's identity is assigned the correct roles
* Producers need `Azure Event Hubs Data Sender`
* Consumers need `Azure Event Hubs Data Receiver`

Просмотреть файл

@ -7,63 +7,123 @@
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
import signal
import sys
import time
from confluent_kafka import Consumer
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv
import os
load_dotenv()
FULLY_QUALIFIED_NAMESPACE= os.environ['EVENT_HUB_HOSTNAME']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
CONSUMER_GROUP='$Default'
AUTH_SCOPE= "https://" + FULLY_QUALIFIED_NAMESPACE +"/.default"
# AAD
cred = DefaultAzureCredential()
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from functools import partial
from pprint import pformat
def _get_token(config):
"""Note here value of config comes from sasl.oauthbearer.config below.
It is not used in this example but you can put arbitrary values to
configure how you can get the token (e.g. which token URL to use)
"""
access_token = cred.get_token(AUTH_SCOPE)
return access_token.token, time.time() + access_token.expires_on
def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))
consumer = Consumer({
"bootstrap.servers": FULLY_QUALIFIED_NAMESPACE + ":9093",
"sasl.mechanism": "OAUTHBEARER",
"security.protocol": "SASL_SSL",
"oauth_cb": _get_token,
"group.id": CONSUMER_GROUP,
# "debug": "broker,topic,msg"
})
def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)
# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param
access_token = cred.get_token('https://%s/.default' % namespace_fqdn)
return access_token.token, access_token.expires_on
def signal_handler(sig, frame):
print("exiting")
consumer.close()
sys.exit(0)
def print_usage_and_exit(program_name):
sys.stderr.write(
'Usage: %s [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..\n' % program_name)
options = '''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)
signal.signal(signal.SIGINT, signal_handler)
if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) < 3:
print_usage_and_exit(sys.argv[0])
print("consuming " + EVENTHUB_NAME)
consumer.subscribe([EVENTHUB_NAME])
namespace = argv[0]
group = argv[1]
topics = argv[2:]
while True:
msg = consumer.poll(1.0)
# Azure credential
# See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
cred = DefaultAzureCredential()
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '%s:9093' % namespace,
'group.id': group,
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest',
print(
f"Received message [{msg.partition()}]: {msg.value().decode('utf-8')}")
# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
'oauth_cb': partial(oauth_cb, cred, namespace),
}
# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except ValueError:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)
if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)
conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])
# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)
# Create Consumer instance
# Hint: try debug='fetch' to generate some log messages
c = Consumer(conf, logger=logger)
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
# Subscribe to topics
c.subscribe(topics, on_assign=print_assignment)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
# Close down consumer to commit final offsets.
c.close()

Просмотреть файл

@ -7,60 +7,74 @@
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems
import time
from confluent_kafka import Producer
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv
import os
load_dotenv()
FULLY_QUALIFIED_NAMESPACE= os.environ['EVENT_HUB_HOSTNAME']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
AUTH_SCOPE= "https://" + FULLY_QUALIFIED_NAMESPACE +"/.default"
# AAD
cred = DefaultAzureCredential()
def _get_token(config):
"""Note here value of config comes from sasl.oauthbearer.config below.
It is not used in this example but you can put arbitrary values to
configure how you can get the token (e.g. which token URL to use)
"""
access_token = cred.get_token(AUTH_SCOPE)
return access_token.token, time.time() + access_token.expires_on
from confluent_kafka import Producer
import sys
from functools import partial
producer = Producer({
"bootstrap.servers": FULLY_QUALIFIED_NAMESPACE + ":9093",
"sasl.mechanism": "OAUTHBEARER",
"security.protocol": "SASL_SSL",
"oauth_cb": _get_token,
"enable.idempotence": True,
"acks": "all",
# "debug": "broker,topic,msg"
})
def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)
# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param
access_token = cred.get_token('https://%s/.default' % namespace_fqdn)
return access_token.token, access_token.expires_on
def delivery_report(err, msg):
"""Called once for each message produced to indicate delivery result.
Triggered by poll() or flush()."""
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
if __name__ == '__main__':
if len(sys.argv) != 3:
sys.stderr.write('Usage: %s <eventhubs-namespace> <topic>\n' % sys.argv[0])
sys.exit(1)
namespace = sys.argv[1]
topic = sys.argv[2]
some_data_source = [str(i) for i in range(1000)]
for data in some_data_source:
# Trigger any available delivery report callbacks from previous produce() calls
producer.poll(0)
# Azure credential
# See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
cred = DefaultAzureCredential()
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
producer.produce(EVENTHUB_NAME, f"Hello {data}".encode("utf-8"), callback=delivery_report)
# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '%s:9093' % namespace,
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
producer.flush()
# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
'oauth_cb': partial(oauth_cb, cred, namespace),
}
# Create Producer instance
p = Producer(**conf)
# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
# Write 1-100 to topic
for i in range(0, 100):
try:
p.produce(topic, str(i), callback=delivery_callback)
except BufferError:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
len(p))
# Serve delivery callback queue.
# NOTE: Since produce() is an asynchronous API this poll() call
# will most likely not serve the delivery callback for the
# last produce()d message.
p.poll(0)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()

Просмотреть файл

@ -1,10 +1,2 @@
#python -m pip install -r requirements.txt
aiohttp
azure.eventhub
azure.identity
python-dotenv
confluent-kafka
azure-identity
azure.keyvault.certificates
azure.keyvault.secrets
az.cli
confluent-kafka

Просмотреть файл

@ -1,9 +0,0 @@
#!/bin/bash
sudo apt-get update
sudo apt-get install python3.8
echo "Setting up required dependencies"
sudo pip3 --disable-pip-version-check --no-cache-dir install -r
echo "Try running the samples now!"