* Made setup 2.7 compatible

* Separated async tests

* Support 2.7 types

* Bumped version

* Added non-ascii tests

* Fix CI

* Fix Py27 pylint

* Added iot sample

* Updated sender/receiver client opening

* bumped version

* Updated tests

* Fixed test name

* Fixed test env settings

* Skip eph test
This commit is contained in:
annatisch 2018-11-26 08:22:42 -08:00 коммит произвёл GitHub
Родитель 0871895ca9
Коммит dbae1477ea
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
34 изменённых файлов: 943 добавлений и 411 удалений

Просмотреть файл

@ -4,17 +4,41 @@ dist: xenial
sudo: required
matrix:
include:
- os: linux
python: "2.7"
dist: trusty
script:
- pytest
- python ./setup.py check -r -s
- pylint --ignore=async_ops azure.eventhub
- os: linux
python: "3.4"
dist: trusty
script:
- pytest
- python ./setup.py check -r -s
- pylint --ignore=async_ops azure.eventhub
- os: linux
python: "3.5"
script:
- pytest
- python ./setup.py check -r -s
- pylint azure.eventhub
- pylint azure.eventprocessorhost
- os: linux
python: "3.6"
script:
- pytest
- python ./setup.py check -r -s
- pylint azure.eventhub
- pylint azure.eventprocessorhost
- os: linux
python: "3.7"
script:
- pytest
- python ./setup.py check -r -s
- pylint azure.eventhub
- pylint azure.eventprocessorhost
install:
- pip install -r dev_requirements.txt
- pip install -e .
script:
- pytest
- python ./setup.py check -r -s
- pylint azure.eventhub
- pylint azure.eventprocessorhost

Просмотреть файл

@ -3,6 +3,13 @@
Release History
===============
1.2.0
+++++
- Support for Python 2.7 in azure.eventhub module (azure.eventprocessorhost will not support Python 2.7).
- Parse EventData.enqueued_time as a UTC timestamp (issue #72, thanks @vjrantal)
1.1.1 (2019-10-03)
++++++++++++++++++

Просмотреть файл

@ -31,17 +31,6 @@ Wheels are provided for all major operating systems, so you can install directly
$ pip install azure-eventhub
Python 2.7 support
++++++++++++++++++
Python 2.7 will be supported for the synchronous operations in azure.eventhub from v1.2.0.
This is available as a pre-release.
.. code:: shell
$ pip install azure-eventhub --pre
Python 2.7 support is not planned for azure.eventprocessorhost.
Documentation
+++++++++++++

Просмотреть файл

@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
__version__ = "1.1.1"
__version__ = "1.2.0"
from azure.eventhub.common import EventData, EventHubError, Offset
from azure.eventhub.client import EventHubClient

Просмотреть файл

@ -7,10 +7,7 @@ import logging
import asyncio
import time
import datetime
try:
from urllib import urlparse, unquote_plus, urlencode, quote_plus
except ImportError:
from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus
from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus
from uamqp import authentication, constants, types, errors
from uamqp import (

Просмотреть файл

@ -104,8 +104,8 @@ class AsyncReceiver(Receiver):
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
async def reconnect_async(self):
"""If the Receiver was disconnected from the service with
@ -132,8 +132,8 @@ class AsyncReceiver(Receiver):
loop=self.loop)
try:
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
@ -163,6 +163,7 @@ class AsyncReceiver(Receiver):
Whether the handler has completed all start up processes such as
establishing the connection, session, link and authentication, and
is not ready to process messages.
**This function is now deprecated and will be removed in v2.0+.**
:rtype: bool
"""

Просмотреть файл

@ -97,8 +97,8 @@ class AsyncSender(Sender):
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async() # pylint: disable=protected-access
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
async def reconnect_async(self):
"""If the Receiver was disconnected from the service with
@ -148,6 +148,7 @@ class AsyncSender(Sender):
Whether the handler has completed all start up processes such as
establishing the connection, session, link and authentication, and
is not ready to process messages.
**This function is now deprecated and will be removed in v2.0+.**
:rtype: bool
"""

Просмотреть файл

@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals
import logging
import datetime
@ -10,7 +11,8 @@ import uuid
import time
import functools
try:
from urllib import urlparse, unquote_plus, urlencode, quote_plus
from urlparse import urlparse
from urllib import unquote_plus, urlencode, quote_plus
except ImportError:
from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus

Просмотреть файл

@ -2,11 +2,14 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals
import datetime
import time
import json
import six
from uamqp import Message, BatchMessage
from uamqp import types, constants, errors
from uamqp.message import MessageHeader, MessageProperties
@ -63,6 +66,8 @@ class EventData(object):
:type body: str, bytes or list
:param batch: A data generator to send batched messages.
:type batch: Generator
:param to_device: An IoT device to route to.
:type to_device: str
:param message: The received message.
:type message: ~uamqp.message.Message
"""
@ -94,7 +99,7 @@ class EventData(object):
"""
The sequence number of the event data object.
:rtype: int
:rtype: int or long
"""
return self._annotations.get(EventData.PROP_SEQ_NUMBER, None)
@ -103,7 +108,7 @@ class EventData(object):
"""
The offset of the event data object.
:rtype: int
:rtype: ~azure.eventhub.common.Offset
"""
try:
return Offset(self._annotations[EventData.PROP_OFFSET].decode('UTF-8'))
@ -200,13 +205,13 @@ class EventData(object):
:param encoding: The encoding to use for decoding message data.
Default is 'UTF-8'
:rtype: str
:rtype: str or unicode
"""
data = self.body
try:
return "".join(b.decode(encoding) for b in data)
except TypeError:
return str(data)
return six.text_type(data)
except: # pylint: disable=bare-except
pass
try:
@ -269,7 +274,7 @@ class Offset(object):
if isinstance(self.value, datetime.datetime):
timestamp = (time.mktime(self.value.timetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
if isinstance(self.value, int):
if isinstance(self.value, six.integer_types):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')
@ -310,7 +315,7 @@ class EventHubError(Exception):
def _parse_error(self, error_list):
details = []
self.message = error_list if isinstance(error_list, str) else error_list.decode('UTF-8')
self.message = error_list if isinstance(error_list, six.text_type) else error_list.decode('UTF-8')
details_index = self.message.find(" Reference:")
if details_index >= 0:
details_msg = self.message[details_index + 1:]

Просмотреть файл

@ -2,9 +2,11 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals
import uuid
import logging
import time
from uamqp import types, errors
from uamqp import ReceiveClient, Source
@ -14,7 +16,7 @@ from azure.eventhub.common import EventHubError, EventData, _error_handler
log = logging.getLogger(__name__)
class Receiver:
class Receiver(object):
"""
Implements a Receiver.
"""
@ -97,8 +99,8 @@ class Receiver:
client_name=self.name,
properties=self.client.create_properties())
self._handler.open()
while not self.has_started():
self._handler._connection.work()
while not self._handler.client_ready():
time.sleep(0.05)
def reconnect(self):
"""If the Receiver was disconnected from the service with
@ -106,7 +108,7 @@ class Receiver:
# pylint: disable=protected-access
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
"password": self.client._auth_config.get("iot_password")}
self._handler.close()
source = Source(self.source)
if self.offset is not None:
@ -124,8 +126,8 @@ class Receiver:
properties=self.client.create_properties())
try:
self._handler.open()
while not self.has_started():
self._handler._connection.work()
while not self._handler.client_ready():
time.sleep(0.05)
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
self.reconnect()
@ -160,6 +162,7 @@ class Receiver:
Whether the handler has completed all start up processes such as
establishing the connection, session, link and authentication, and
is not ready to process messages.
**This function is now deprecated and will be removed in v2.0+.**
:rtype: bool
"""

Просмотреть файл

@ -2,9 +2,11 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import unicode_literals
import uuid
import logging
import time
from uamqp import constants, errors
from uamqp import SendClient
@ -14,7 +16,7 @@ from azure.eventhub.common import EventHubError, _error_handler
log = logging.getLogger(__name__)
class Sender:
class Sender(object):
"""
Implements a Sender.
"""
@ -88,8 +90,8 @@ class Sender:
client_name=self.name,
properties=self.client.create_properties())
self._handler.open()
while not self.has_started():
self._handler._connection.work() # pylint: disable=protected-access
while not self._handler.client_ready():
time.sleep(0.05)
def reconnect(self):
"""If the Sender was disconnected from the service with
@ -144,6 +146,7 @@ class Sender:
Whether the handler has completed all start up processes such as
establishing the connection, session, link and authentication, and
is not ready to process messages.
**This function is now deprecated and will be removed in v2.0+.**
:rtype: bool
"""

Просмотреть файл

@ -9,18 +9,25 @@ import pytest
import logging
import sys
# Ignore async tests for Python < 3.5
collect_ignore = []
if sys.version_info < (3, 5):
collect_ignore.append("tests/asynctests")
collect_ignore.append("features")
else:
from tests.asynctests import MockEventProcessor
from azure.eventprocessorhost import EventProcessorHost
from azure.eventprocessorhost import EventHubPartitionPump
from azure.eventprocessorhost import AzureStorageCheckpointLeaseManager
from azure.eventprocessorhost import AzureBlobLease
from azure.eventprocessorhost import EventHubConfig
from azure.eventprocessorhost.lease import Lease
from azure.eventprocessorhost.partition_pump import PartitionPump
from azure.eventprocessorhost.partition_manager import PartitionManager
from tests import get_logger
from azure import eventhub
from azure.eventhub import EventHubClient, Receiver, Offset
from azure.eventprocessorhost import EventProcessorHost
from azure.eventprocessorhost import EventHubPartitionPump
from azure.eventprocessorhost import AzureStorageCheckpointLeaseManager
from azure.eventprocessorhost import AzureBlobLease
from azure.eventprocessorhost import EventHubConfig
from azure.eventprocessorhost.lease import Lease
from azure.eventprocessorhost.partition_pump import PartitionPump
from azure.eventprocessorhost.partition_manager import PartitionManager
from azure.eventprocessorhost.abstract_event_processor import AbstractEventProcessor
log = get_logger(None, logging.DEBUG)
@ -181,47 +188,3 @@ def partition_pump(eph):
def partition_manager(eph):
partition_manager = PartitionManager(eph)
return partition_manager
class MockEventProcessor(AbstractEventProcessor):
"""
Mock Implmentation of AbstractEventProcessor for testing
"""
def __init__(self, params=None):
"""
Init Event processor
"""
self.params = params
self._msg_counter = 0
async def open_async(self, context):
"""
Called by processor host to initialize the event processor.
"""
logging.info("Connection established {}".format(context.partition_id))
async def close_async(self, context, reason):
"""
Called by processor host to indicate that the event processor is being stopped.
(Params) Context:Information about the partition
"""
logging.info("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
reason, context.partition_id, context.offset, context.sequence_number))
async def process_events_async(self, context, messages):
"""
Called by the processor host when a batch of events has arrived.
This is where the real work of the event processor is done.
(Params) Context: Information about the partition, Messages: The events to be processed.
"""
logging.info("Events processed {} {}".format(context.partition_id, messages))
await context.checkpoint_async()
async def process_error_async(self, context, error):
"""
Called when the underlying client experiences an error while receiving.
EventProcessorHost will take care of recovering from the error and
continuing to pump messages,so no action is required from
(Params) Context: Information about the partition, Error: The error that occured.
"""
logging.error("Event Processor Error {!r}".format(error))

Просмотреть файл

@ -1,6 +1,7 @@
pytest>=3.4.1
pytest-asyncio>=0.8.0
pytest-asyncio>=0.8.0; python_version > '3.4'
docutils>=0.14
pygments>=2.2.0
pylint==2.1.1
pylint==2.1.1; python_version >= '3.4'
pylint==1.8.4; python_version < '3.4'
behave==1.2.6

27
examples/iothub_recv.py Normal file
Просмотреть файл

@ -0,0 +1,27 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
An example to show receiving events from an IoT Hub partition.
"""
from azure import eventhub
from azure.eventhub import EventData, EventHubClient, Offset
import logging
logger = logging.getLogger('azure.eventhub')
CONNSTR = os.environ['IOTHUB_CONNECTION_STR']
client = EventHubClient.from_iothub_connection_string(iot_connection_str, debug=True)
receiver = client.add_receiver("$default", "0", operation='/messages/events')
try:
client.run()
eh_info = client.get_eventhub_info()
print(eh_info)
received = receiver.receive(timeout=5)
print(received)
finally:
client.stop()

2
setup.cfg Normal file
Просмотреть файл

@ -0,0 +1,2 @@
[bdist_wheel]
universal=1

Просмотреть файл

@ -29,9 +29,9 @@ with open(os.path.join(package_folder_path, '__init__.py'), 'r') as fd:
if not version:
raise RuntimeError('Cannot find version information')
with open('README.rst', encoding='utf-8') as f:
with open('README.rst') as f:
readme = f.read()
with open('HISTORY.rst', encoding='utf-8') as f:
with open('HISTORY.rst') as f:
history = f.read()
setup(
@ -46,6 +46,8 @@ setup(
classifiers=[
'Development Status :: 5 - Production/Stable',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
@ -54,11 +56,15 @@ setup(
'License :: OSI Approved :: MIT License',
],
zip_safe=False,
packages=find_packages(exclude=["azure", "examples", "tests"]),
packages=find_packages(exclude=[
"azure",
"examples",
"tests",
"tests.asynctests"]),
install_requires=[
'uamqp>=1.0.0,<2.0.0',
'msrestazure~=0.4.11',
'uamqp>=1.1.0,<2.0.0',
'msrestazure>=0.4.32,<2.0.0',
'azure-common~=1.1',
'azure-storage~=0.36.0'
'azure-storage-blob~=1.3'
]
)

Просмотреть файл

@ -0,0 +1,54 @@
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import asyncio
import logging
from azure.eventprocessorhost.abstract_event_processor import AbstractEventProcessor
class MockEventProcessor(AbstractEventProcessor):
"""
Mock Implmentation of AbstractEventProcessor for testing
"""
def __init__(self, params=None):
"""
Init Event processor
"""
self.params = params
self._msg_counter = 0
async def open_async(self, context):
"""
Called by processor host to initialize the event processor.
"""
logging.info("Connection established {}".format(context.partition_id))
async def close_async(self, context, reason):
"""
Called by processor host to indicate that the event processor is being stopped.
(Params) Context:Information about the partition
"""
logging.info("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
reason, context.partition_id, context.offset, context.sequence_number))
async def process_events_async(self, context, messages):
"""
Called by the processor host when a batch of events has arrived.
This is where the real work of the event processor is done.
(Params) Context: Information about the partition, Messages: The events to be processed.
"""
logging.info("Events processed {} {}".format(context.partition_id, messages))
await context.checkpoint_async()
async def process_error_async(self, context, error):
"""
Called when the underlying client experiences an error while receiving.
EventProcessorHost will take care of recovering from the error and
continuing to pump messages,so no action is required from
(Params) Context: Information about the partition, Error: The error that occured.
"""
logging.error("Event Processor Error {!r}".format(error))

Просмотреть файл

@ -0,0 +1,166 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------
import logging
import asyncio
import sys
import os
import argparse
from logging.handlers import RotatingFileHandler
from azure.eventprocessorhost import (
AbstractEventProcessor,
AzureStorageCheckpointLeaseManager,
EventHubConfig,
EventProcessorHost,
EPHOptions)
def get_logger(filename, level=logging.INFO):
azure_logger = logging.getLogger("azure")
azure_logger.setLevel(level)
uamqp_logger = logging.getLogger("uamqp")
uamqp_logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(formatter)
azure_logger.addHandler(console_handler)
uamqp_logger.addHandler(console_handler)
if filename:
file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3)
file_handler.setFormatter(formatter)
azure_logger.addHandler(file_handler)
uamqp_logger.addHandler(file_handler)
return azure_logger
logger = get_logger("eph_test_async.log", logging.INFO)
class EventProcessor(AbstractEventProcessor):
"""
Example Implmentation of AbstractEventProcessor
"""
def __init__(self, params=None):
"""
Init Event processor
"""
super().__init__(params)
self._msg_counter = 0
async def open_async(self, context):
"""
Called by processor host to initialize the event processor.
"""
logger.info("Connection established {}".format(context.partition_id))
async def close_async(self, context, reason):
"""
Called by processor host to indicate that the event processor is being stopped.
:param context: Information about the partition
:type context: ~azure.eventprocessorhost.PartitionContext
"""
logger.info("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
reason,
context.partition_id,
context.offset,
context.sequence_number))
async def process_events_async(self, context, messages):
"""
Called by the processor host when a batch of events has arrived.
This is where the real work of the event processor is done.
:param context: Information about the partition
:type context: ~azure.eventprocessorhost.PartitionContext
:param messages: The events to be processed.
:type messages: list[~azure.eventhub.common.EventData]
"""
print("Processing id {}, offset {}, sq_number {})".format(
context.partition_id,
context.offset,
context.sequence_number))
await context.checkpoint_async()
async def process_error_async(self, context, error):
"""
Called when the underlying client experiences an error while receiving.
EventProcessorHost will take care of recovering from the error and
continuing to pump messages,so no action is required from
:param context: Information about the partition
:type context: ~azure.eventprocessorhost.PartitionContext
:param error: The error that occured.
"""
logger.info("Event Processor Error for partition {}, {!r}".format(context.partition_id, error))
async def wait_and_close(host, duration):
"""
Run EventProcessorHost for 30 seconds then shutdown.
"""
await asyncio.sleep(duration)
await host.close_async()
def test_long_running_eph():
parser = argparse.ArgumentParser()
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
parser.add_argument("--storage-account", help="Storage account name", default=os.environ.get('AZURE_STORAGE_ACCOUNT'))
parser.add_argument("--storage-key", help="Storage account access key", default=os.environ.get('AZURE_STORAGE_ACCESS_KEY'))
parser.add_argument("--container", help="Lease container name", default="leases")
parser.add_argument("--eventhub", help="Name of EventHub", default=os.environ.get('EVENT_HUB_NAME'))
parser.add_argument("--namespace", help="Namespace of EventHub", default=os.environ.get('EVENT_HUB_NAMESPACE'))
parser.add_argument("--suffix", help="Namespace of EventHub", default="servicebus.windows.net")
parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with", default=os.environ.get('EVENT_HUB_SAS_POLICY'))
parser.add_argument("--sas-key", help="Shared access key", default=os.environ.get('EVENT_HUB_SAS_KEY'))
loop = asyncio.get_event_loop()
args, _ = parser.parse_known_args()
if not args.namespace or not args.eventhub:
try:
import pytest
pytest.skip("Must specify '--namespace' and '--eventhub'")
except ImportError:
raise ValueError("Must specify '--namespace' and '--eventhub'")
# Eventhub config and storage manager
eh_config = EventHubConfig(
args.namespace,
args.eventhub,
args.sas_policy,
args.sas_key,
consumer_group="$default",
namespace_suffix=args.suffix)
eh_options = EPHOptions()
eh_options.release_pump_on_timeout = True
eh_options.debug_trace = False
eh_options.receive_timeout = 120
storage_manager = AzureStorageCheckpointLeaseManager(
storage_account_name=args.storage_account,
storage_account_key=args.storage_key,
lease_renew_interval=30,
lease_container_name=args.container,
lease_duration=60)
# Event loop and host
host = EventProcessorHost(
EventProcessor,
eh_config,
storage_manager,
ep_params=["param1","param2"],
eph_options=eh_options,
loop=loop)
tasks = asyncio.gather(
host.open_async(),
wait_and_close(host, args.duration), return_exceptions=True)
results = loop.run_until_complete(tasks)
assert not any(results)
if __name__ == '__main__':
test_long_running_eph()

Просмотреть файл

@ -0,0 +1,133 @@
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
receive test.
"""
import logging
import asyncio
import argparse
import time
import os
import sys
from logging.handlers import RotatingFileHandler
from azure.eventhub import Offset
from azure.eventhub import EventHubClientAsync
def get_logger(filename, level=logging.INFO):
azure_logger = logging.getLogger("azure")
azure_logger.setLevel(level)
uamqp_logger = logging.getLogger("uamqp")
uamqp_logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(formatter)
azure_logger.addHandler(console_handler)
uamqp_logger.addHandler(console_handler)
if filename:
file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3)
file_handler.setFormatter(formatter)
azure_logger.addHandler(file_handler)
uamqp_logger.addHandler(file_handler)
return azure_logger
logger = get_logger("recv_test_async.log", logging.INFO)
async def get_partitions(client):
eh_data = await client.get_eventhub_info_async()
return eh_data["partition_ids"]
async def pump(_pid, receiver, _args, _dl):
total = 0
iteration = 0
deadline = time.time() + _dl
try:
while time.time() < deadline:
batch = await receiver.receive(timeout=1)
size = len(batch)
total += size
iteration += 1
if size == 0:
print("{}: No events received, queue size {}, delivered {}".format(
_pid,
receiver.queue_size,
total))
elif iteration >= 5:
iteration = 0
print("{}: total received {}, last sn={}, last offset={}".format(
_pid,
total,
batch[-1].sequence_number,
batch[-1].offset.value))
print("{}: total received {}".format(
_pid,
total))
except Exception as e:
print("Partition {} receiver failed: {}".format(_pid, e))
raise
def test_long_running_receive_async():
parser = argparse.ArgumentParser()
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
parser.add_argument("--consumer", help="Consumer group name", default="$default")
parser.add_argument("--partitions", help="Comma seperated partition IDs")
parser.add_argument("--offset", help="Starting offset", default="-1")
parser.add_argument("--conn-str", help="EventHub connection string", default=os.environ.get('EVENT_HUB_CONNECTION_STR'))
parser.add_argument("--eventhub", help="Name of EventHub")
parser.add_argument("--address", help="Address URI to the EventHub entity")
parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with")
parser.add_argument("--sas-key", help="Shared access key")
loop = asyncio.get_event_loop()
args, _ = parser.parse_known_args()
if args.conn_str:
client = EventHubClientAsync.from_connection_string(
args.conn_str,
eventhub=args.eventhub, auth_timeout=240, debug=False)
elif args.address:
client = EventHubClientAsync(
args.address,
auth_timeout=240,
username=args.sas_policy,
password=args.sas_key)
else:
try:
import pytest
pytest.skip("Must specify either '--conn-str' or '--address'")
except ImportError:
raise ValueError("Must specify either '--conn-str' or '--address'")
try:
if not args.partitions:
partitions = loop.run_until_complete(get_partitions(client))
else:
partitions = args.partitions.split(",")
pumps = []
for pid in partitions:
receiver = client.add_async_receiver(
consumer_group=args.consumer,
partition=pid,
offset=Offset(args.offset),
prefetch=50)
pumps.append(pump(pid, receiver, args, args.duration))
loop.run_until_complete(client.run_async())
loop.run_until_complete(asyncio.gather(*pumps))
finally:
loop.run_until_complete(client.stop_async())
if __name__ == '__main__':
test_long_running_receive_async()

Просмотреть файл

@ -7,18 +7,35 @@ send test
import logging
import argparse
import time
import threading
import os
import asyncio
import sys
from logging.handlers import RotatingFileHandler
from azure.eventhub import EventHubClientAsync, EventData
try:
import tests
logger = tests.get_logger("send_test.log", logging.INFO)
except ImportError:
logger = logging.getLogger("uamqp")
logger.setLevel(logging.INFO)
def get_logger(filename, level=logging.INFO):
azure_logger = logging.getLogger("azure")
azure_logger.setLevel(level)
uamqp_logger = logging.getLogger("uamqp")
uamqp_logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(formatter)
azure_logger.addHandler(console_handler)
uamqp_logger.addHandler(console_handler)
if filename:
file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3)
file_handler.setFormatter(formatter)
azure_logger.addHandler(file_handler)
uamqp_logger.addHandler(file_handler)
return azure_logger
logger = get_logger("send_test_async.log", logging.INFO)
def check_send_successful(outcome, condition):
@ -27,9 +44,6 @@ def check_send_successful(outcome, condition):
async def get_partitions(args):
#client = EventHubClientAsync.from_connection_string(
# args.conn_str,
# eventhub=args.eventhub, debug=True)
eh_data = await args.get_eventhub_info_async()
return eh_data["partition_ids"]
@ -43,9 +57,9 @@ async def pump(pid, sender, args, duration):
yield b"D" * args.payload
if args.batch > 1:
logger.error("Sending batched messages")
logger.info("{}: Sending batched messages".format(pid))
else:
logger.error("Sending single messages")
logger.info("{}: Sending single messages".format(pid))
try:
while time.time() < deadline:
@ -55,37 +69,41 @@ async def pump(pid, sender, args, duration):
data = EventData(body=b"D" * args.payload)
sender.transfer(data, callback=check_send_successful)
total += args.batch
if total % 10000 == 0:
if total % 100 == 0:
await sender.wait_async()
logger.error("Send total {}".format(total))
logger.info("{}: Send total {}".format(pid, total))
except Exception as err:
logger.error("Send failed {}".format(err))
logger.error("Sent total {}".format(total))
logger.error("{}: Send failed {}".format(pid, err))
raise
print("{}: Final Sent total {}".format(pid, total))
def test_long_running_partition_send_async():
parser = argparse.ArgumentParser()
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
parser.add_argument("--payload", help="payload size", type=int, default=512)
parser.add_argument("--batch", help="Number of events to send and wait", type=int, default=1)
parser.add_argument("--payload", help="payload size", type=int, default=1024)
parser.add_argument("--batch", help="Number of events to send and wait", type=int, default=200)
parser.add_argument("--partitions", help="Comma seperated partition IDs")
parser.add_argument("--conn-str", help="EventHub connection string", default=os.environ.get('EVENT_HUB_CONNECTION_STR'))
parser.add_argument("--eventhub", help="Name of EventHub")
parser.add_argument("--address", help="Address URI to the EventHub entity")
parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with")
parser.add_argument("--sas-key", help="Shared access key")
parser.add_argument("--logger-name", help="Unique log file ID")
loop = asyncio.get_event_loop()
args, _ = parser.parse_known_args()
if args.conn_str:
client = EventHubClientAsync.from_connection_string(
args.conn_str,
eventhub=args.eventhub, debug=True)
elif args.address:
client = EventHubClient(
client = EventHubClientAsync(
args.address,
username=args.sas_policy,
password=args.sas_key)
password=args.sas_key,
auth_timeout=500)
else:
try:
import pytest
@ -97,14 +115,22 @@ def test_long_running_partition_send_async():
if not args.partitions:
partitions = loop.run_until_complete(get_partitions(client))
else:
partitions = args.partitions.split(",")
pid_range = args.partitions.split("-")
if len(pid_range) > 1:
partitions = [str(i) for i in range(int(pid_range[0]), int(pid_range[1]) + 1)]
else:
partitions = args.partitions.split(",")
pumps = []
for pid in partitions:
sender = client.add_async_sender(partition=pid)
sender = client.add_async_sender(partition=pid, send_timeout=0, keep_alive=False)
pumps.append(pump(pid, sender, args, args.duration))
loop.run_until_complete(client.run_async())
loop.run_until_complete(asyncio.gather(*pumps))
results = loop.run_until_complete(asyncio.gather(*pumps, return_exceptions=True))
assert not results
except Exception as e:
logger.error("Sender failed: {}".format(e))
finally:
logger.info("Shutting down sender")
loop.run_until_complete(client.stop_async())
if __name__ == '__main__':

Просмотреть файл

@ -0,0 +1,183 @@
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import os
import asyncio
import pytest
import time
from azure import eventhub
from azure.eventhub import (
EventHubClientAsync,
EventData,
Offset,
EventHubError)
@pytest.mark.asyncio
async def test_send_with_invalid_hostname_async(invalid_hostname, receivers):
client = EventHubClientAsync.from_connection_string(invalid_hostname, debug=True)
sender = client.add_async_sender()
with pytest.raises(EventHubError):
await client.run_async()
@pytest.mark.asyncio
async def test_receive_with_invalid_hostname_async(invalid_hostname):
client = EventHubClientAsync.from_connection_string(invalid_hostname, debug=True)
sender = client.add_async_receiver("$default", "0")
with pytest.raises(EventHubError):
await client.run_async()
@pytest.mark.asyncio
async def test_send_with_invalid_key_async(invalid_key, receivers):
client = EventHubClientAsync.from_connection_string(invalid_key, debug=False)
sender = client.add_async_sender()
with pytest.raises(EventHubError):
await client.run_async()
@pytest.mark.asyncio
async def test_receive_with_invalid_key_async(invalid_key):
client = EventHubClientAsync.from_connection_string(invalid_key, debug=True)
sender = client.add_async_receiver("$default", "0")
with pytest.raises(EventHubError):
await client.run_async()
@pytest.mark.asyncio
async def test_send_with_invalid_policy_async(invalid_policy, receivers):
client = EventHubClientAsync.from_connection_string(invalid_policy, debug=False)
sender = client.add_async_sender()
with pytest.raises(EventHubError):
await client.run_async()
@pytest.mark.asyncio
async def test_receive_with_invalid_policy_async(invalid_policy):
client = EventHubClientAsync.from_connection_string(invalid_policy, debug=True)
sender = client.add_async_receiver("$default", "0")
with pytest.raises(EventHubError):
await client.run_async()
@pytest.mark.asyncio
async def test_send_partition_key_with_partition_async(connection_str):
client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
sender = client.add_async_sender(partition="1")
try:
await client.run_async()
data = EventData(b"Data")
data.partition_key = b"PKey"
with pytest.raises(ValueError):
await sender.send(data)
finally:
await client.stop_async()
@pytest.mark.asyncio
async def test_non_existing_entity_sender_async(connection_str):
client = EventHubClientAsync.from_connection_string(connection_str, eventhub="nemo", debug=False)
sender = client.add_async_sender(partition="1")
with pytest.raises(EventHubError):
await client.run_async()
@pytest.mark.asyncio
async def test_non_existing_entity_receiver_async(connection_str):
client = EventHubClientAsync.from_connection_string(connection_str, eventhub="nemo", debug=False)
receiver = client.add_async_receiver("$default", "0")
with pytest.raises(EventHubError):
await client.run_async()
@pytest.mark.asyncio
async def test_receive_from_invalid_partitions_async(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
for p in partitions:
client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
receiver = client.add_async_receiver("$default", p)
try:
with pytest.raises(EventHubError):
await client.run_async()
await receiver.receive(timeout=10)
finally:
await client.stop_async()
@pytest.mark.asyncio
async def test_send_to_invalid_partitions_async(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
for p in partitions:
client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
sender = client.add_async_sender(partition=p)
try:
with pytest.raises(EventHubError):
await client.run_async()
finally:
await client.stop_async()
@pytest.mark.asyncio
async def test_send_too_large_message_async(connection_str):
client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
sender = client.add_async_sender()
try:
await client.run_async()
data = EventData(b"A" * 300000)
with pytest.raises(EventHubError):
await sender.send(data)
finally:
await client.stop_async()
@pytest.mark.asyncio
async def test_send_null_body_async(connection_str):
client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
sender = client.add_async_sender()
try:
await client.run_async()
with pytest.raises(ValueError):
data = EventData(None)
await sender.send(data)
finally:
await client.stop_async()
async def pump(receiver):
messages = 0
count = 0
batch = await receiver.receive(timeout=10)
while batch and count <= 5:
count += 1
messages += len(batch)
batch = await receiver.receive(timeout=10)
return messages
@pytest.mark.asyncio
async def test_max_receivers_async(connection_str, senders):
client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
receivers = []
for i in range(6):
receivers.append(client.add_async_receiver("$default", "0", prefetch=1000, offset=Offset('@latest')))
try:
await client.run_async()
outputs = await asyncio.gather(
pump(receivers[0]),
pump(receivers[1]),
pump(receivers[2]),
pump(receivers[3]),
pump(receivers[4]),
pump(receivers[5]),
return_exceptions=True)
print(outputs)
failed = [o for o in outputs if isinstance(o, EventHubError)]
assert len(failed) == 1
print(failed[0].message)
finally:
await client.stop_async()

Просмотреть файл

Просмотреть файл

Просмотреть файл

@ -0,0 +1,90 @@
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import os
import time
import asyncio
import pytest
from azure import eventhub
from azure.eventhub import (
EventHubClientAsync,
EventData,
Offset,
EventHubError)
@pytest.mark.asyncio
async def test_send_with_long_interval_async(connection_str, receivers):
#pytest.skip("long running")
client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
sender = client.add_async_sender()
try:
await client.run_async()
await sender.send(EventData(b"A single event"))
for _ in range(2):
await asyncio.sleep(300)
await sender.send(EventData(b"A single event"))
finally:
await client.stop_async()
received = []
for r in receivers:
received.extend(r.receive(timeout=1))
assert len(received) == 3
assert list(received[0].body)[0] == b"A single event"
def pump(receiver):
messages = []
batch = receiver.receive(timeout=1)
messages.extend(batch)
while batch:
batch = receiver.receive(timeout=1)
messages.extend(batch)
return messages
@pytest.mark.asyncio
async def test_send_with_forced_conn_close_async(connection_str, receivers):
#pytest.skip("long running")
client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
sender = client.add_async_sender()
try:
await client.run_async()
await sender.send(EventData(b"A single event"))
sender._handler._message_sender.destroy()
await asyncio.sleep(300)
await sender.send(EventData(b"A single event"))
await sender.send(EventData(b"A single event"))
sender._handler._message_sender.destroy()
await asyncio.sleep(300)
await sender.send(EventData(b"A single event"))
await sender.send(EventData(b"A single event"))
finally:
await client.stop_async()
received = []
for r in receivers:
received.extend(pump(r))
assert len(received) == 5
assert list(received[0].body)[0] == b"A single event"
# def test_send_with_forced_link_detach(connection_str, receivers):
# client = EventHubClient.from_connection_string(connection_str, debug=True)
# sender = client.add_sender()
# size = 20 * 1024
# try:
# client.run()
# for i in range(1000):
# sender.transfer(EventData([b"A"*size, b"B"*size, b"C"*size, b"D"*size, b"A"*size, b"B"*size, b"C"*size, b"D"*size, b"A"*size, b"B"*size, b"C"*size, b"D"*size]))
# sender.wait()
# finally:
# client.stop()
# received = []
# for r in receivers:
# received.extend(r.receive(timeout=10))

Просмотреть файл

@ -1,3 +1,4 @@
# -- coding: utf-8 --
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
@ -8,6 +9,7 @@ import os
import asyncio
import pytest
import time
import json
from azure.eventhub import EventData, EventHubClientAsync
@ -123,6 +125,25 @@ async def test_send_partition_async(connection_str, receivers):
assert len(partition_1) == 1
@pytest.mark.asyncio
async def test_send_non_ascii_async(connection_str, receivers):
client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
sender = client.add_async_sender(partition="0")
try:
await client.run_async()
await sender.send(EventData("é,è,à,ù,â,ê,î,ô,û"))
await sender.send(EventData(json.dumps({"foo": "漢字"})))
except:
raise
finally:
await client.stop_async()
partition_0 = receivers[0].receive(timeout=2)
assert len(partition_0) == 2
assert partition_0[0].body_as_str() == "é,è,à,ù,â,ê,î,ô,û"
assert partition_0[1].body_as_json() == {"foo": "漢字"}
@pytest.mark.asyncio
async def test_send_partition_batch_async(connection_str, receivers):
def batched():

Просмотреть файл

@ -10,50 +10,70 @@ receive test.
"""
import logging
import asyncio
import argparse
import time
import os
from urllib.parse import quote_plus
import sys
from logging.handlers import RotatingFileHandler
from azure.eventhub import Offset
from azure.eventhub import EventHubClientAsync
from azure.eventhub import EventHubClient
try:
import tests
logger = tests.get_logger("recv_test.log", logging.INFO)
except ImportError:
logger = logging.getLogger("uamqp")
logger.setLevel(logging.INFO)
def get_logger(filename, level=logging.INFO):
azure_logger = logging.getLogger("azure")
azure_logger.setLevel(level)
uamqp_logger = logging.getLogger("uamqp")
uamqp_logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(formatter)
azure_logger.addHandler(console_handler)
uamqp_logger.addHandler(console_handler)
if filename:
file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3)
file_handler.setFormatter(formatter)
azure_logger.addHandler(file_handler)
uamqp_logger.addHandler(file_handler)
return azure_logger
logger = get_logger("recv_test.log", logging.INFO)
async def pump(_pid, receiver, _args, _dl):
def get_partitions(args):
eh_data = args.get_eventhub_info()
return eh_data["partition_ids"]
def pump(receivers, duration):
total = 0
iteration = 0
deadline = time.time() + _dl
deadline = time.time() + duration
try:
while time.time() < deadline:
batch = await receiver.receive(timeout=5)
size = len(batch)
total += size
iteration += 1
if size == 0:
print("{}: No events received, queue size {}, delivered {}".format(
_pid,
receiver.queue_size,
total))
elif iteration >= 80:
iteration = 0
print("{}: total received {}, last sn={}, last offset={}".format(
_pid,
total,
batch[-1].sequence_number,
batch[-1].offset.value))
print("{}: total received {}".format(
_pid,
total))
for pid, receiver in receivers.items():
batch = receiver.receive(timeout=5)
size = len(batch)
total += size
iteration += 1
if size == 0:
print("{}: No events received, queue size {}, delivered {}".format(
pid,
receiver.queue_size,
total))
elif iteration >= 50:
iteration = 0
print("{}: total received {}, last sn={}, last offset={}".format(
pid,
total,
batch[-1].sequence_number,
batch[-1].offset.value))
print("Total received {}".format(total))
except Exception as e:
print("Partition {} receiver failed: {}".format(_pid, e))
print("Receiver failed: {}".format(e))
raise
@ -61,7 +81,7 @@ def test_long_running_receive():
parser = argparse.ArgumentParser()
parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30)
parser.add_argument("--consumer", help="Consumer group name", default="$default")
parser.add_argument("--partitions", help="Comma seperated partition IDs", default="0")
parser.add_argument("--partitions", help="Comma seperated partition IDs")
parser.add_argument("--offset", help="Starting offset", default="-1")
parser.add_argument("--conn-str", help="EventHub connection string", default=os.environ.get('EVENT_HUB_CONNECTION_STR'))
parser.add_argument("--eventhub", help="Name of EventHub")
@ -69,14 +89,13 @@ def test_long_running_receive():
parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with")
parser.add_argument("--sas-key", help="Shared access key")
loop = asyncio.get_event_loop()
args, _ = parser.parse_known_args()
if args.conn_str:
client = EventHubClientAsync.from_connection_string(
client = EventHubClient.from_connection_string(
args.conn_str,
eventhub=args.eventhub, debug=False)
elif args.address:
client = EventHubClientAsync(
client = EventHubClient(
args.address,
username=args.sas_policy,
password=args.sas_key)
@ -88,18 +107,21 @@ def test_long_running_receive():
raise ValueError("Must specify either '--conn-str' or '--address'")
try:
pumps = []
for pid in args.partitions.split(","):
receiver = client.add_async_receiver(
if not args.partitions:
partitions = get_partitions(client)
else:
partitions = args.partitions.split(",")
pumps = {}
for pid in partitions:
pumps[pid] = client.add_receiver(
consumer_group=args.consumer,
partition=pid,
offset=Offset(args.offset),
prefetch=5000)
pumps.append(pump(pid, receiver, args, args.duration))
loop.run_until_complete(client.run_async())
loop.run_until_complete(asyncio.gather(*pumps))
prefetch=50)
client.run()
pump(pumps, args.duration)
finally:
loop.run_until_complete(client.stop_async())
client.stop()
if __name__ == '__main__':

Просмотреть файл

@ -4,20 +4,37 @@
send test
"""
import logging
import argparse
import time
import threading
import os
import sys
import logging
from logging.handlers import RotatingFileHandler
from azure.eventhub import EventHubClient, Sender, EventData
try:
import tests
logger = tests.get_logger("send_test.log", logging.INFO)
except ImportError:
logger = logging.getLogger("uamqp")
logger.setLevel(logging.INFO)
def get_logger(filename, level=logging.INFO):
azure_logger = logging.getLogger("azure")
azure_logger.setLevel(level)
uamqp_logger = logging.getLogger("uamqp")
uamqp_logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(formatter)
azure_logger.addHandler(console_handler)
uamqp_logger.addHandler(console_handler)
if filename:
file_handler = RotatingFileHandler(filename, maxBytes=20*1024*1024, backupCount=3)
file_handler.setFormatter(formatter)
azure_logger.addHandler(file_handler)
uamqp_logger.addHandler(file_handler)
return azure_logger
logger = get_logger("send_test.log", logging.INFO)
def check_send_successful(outcome, condition):

Просмотреть файл

@ -5,13 +5,11 @@
#--------------------------------------------------------------------------
import os
import asyncio
import pytest
import time
from azure import eventhub
from azure.eventhub import (
EventHubClientAsync,
EventData,
Offset,
EventHubError,
@ -25,14 +23,6 @@ def test_send_with_invalid_hostname(invalid_hostname, receivers):
client.run()
@pytest.mark.asyncio
async def test_send_with_invalid_hostname_async(invalid_hostname, receivers):
client = EventHubClientAsync.from_connection_string(invalid_hostname, debug=True)
sender = client.add_async_sender()
with pytest.raises(EventHubError):
await client.run_async()
def test_receive_with_invalid_hostname_sync(invalid_hostname):
client = EventHubClient.from_connection_string(invalid_hostname, debug=True)
receiver = client.add_receiver("$default", "0")
@ -40,14 +30,6 @@ def test_receive_with_invalid_hostname_sync(invalid_hostname):
client.run()
@pytest.mark.asyncio
async def test_receive_with_invalid_hostname_async(invalid_hostname):
client = EventHubClientAsync.from_connection_string(invalid_hostname, debug=True)
sender = client.add_async_receiver("$default", "0")
with pytest.raises(EventHubError):
await client.run_async()
def test_send_with_invalid_key(invalid_key, receivers):
client = EventHubClient.from_connection_string(invalid_key, debug=False)
sender = client.add_sender()
@ -55,14 +37,6 @@ def test_send_with_invalid_key(invalid_key, receivers):
client.run()
@pytest.mark.asyncio
async def test_send_with_invalid_key_async(invalid_key, receivers):
client = EventHubClientAsync.from_connection_string(invalid_key, debug=False)
sender = client.add_async_sender()
with pytest.raises(EventHubError):
await client.run_async()
def test_receive_with_invalid_key_sync(invalid_key):
client = EventHubClient.from_connection_string(invalid_key, debug=True)
receiver = client.add_receiver("$default", "0")
@ -70,14 +44,6 @@ def test_receive_with_invalid_key_sync(invalid_key):
client.run()
@pytest.mark.asyncio
async def test_receive_with_invalid_key_async(invalid_key):
client = EventHubClientAsync.from_connection_string(invalid_key, debug=True)
sender = client.add_async_receiver("$default", "0")
with pytest.raises(EventHubError):
await client.run_async()
def test_send_with_invalid_policy(invalid_policy, receivers):
client = EventHubClient.from_connection_string(invalid_policy, debug=False)
sender = client.add_sender()
@ -85,14 +51,6 @@ def test_send_with_invalid_policy(invalid_policy, receivers):
client.run()
@pytest.mark.asyncio
async def test_send_with_invalid_policy_async(invalid_policy, receivers):
client = EventHubClientAsync.from_connection_string(invalid_policy, debug=False)
sender = client.add_async_sender()
with pytest.raises(EventHubError):
await client.run_async()
def test_receive_with_invalid_policy_sync(invalid_policy):
client = EventHubClient.from_connection_string(invalid_policy, debug=True)
receiver = client.add_receiver("$default", "0")
@ -100,14 +58,6 @@ def test_receive_with_invalid_policy_sync(invalid_policy):
client.run()
@pytest.mark.asyncio
async def test_receive_with_invalid_policy_async(invalid_policy):
client = EventHubClientAsync.from_connection_string(invalid_policy, debug=True)
sender = client.add_async_receiver("$default", "0")
with pytest.raises(EventHubError):
await client.run_async()
def test_send_partition_key_with_partition_sync(connection_str):
client = EventHubClient.from_connection_string(connection_str, debug=True)
sender = client.add_sender(partition="1")
@ -121,20 +71,6 @@ def test_send_partition_key_with_partition_sync(connection_str):
client.stop()
@pytest.mark.asyncio
async def test_send_partition_key_with_partition_async(connection_str):
client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
sender = client.add_async_sender(partition="1")
try:
await client.run_async()
data = EventData(b"Data")
data.partition_key = b"PKey"
with pytest.raises(ValueError):
await sender.send(data)
finally:
await client.stop_async()
def test_non_existing_entity_sender(connection_str):
client = EventHubClient.from_connection_string(connection_str, eventhub="nemo", debug=False)
sender = client.add_sender(partition="1")
@ -142,14 +78,6 @@ def test_non_existing_entity_sender(connection_str):
client.run()
@pytest.mark.asyncio
async def test_non_existing_entity_sender_async(connection_str):
client = EventHubClientAsync.from_connection_string(connection_str, eventhub="nemo", debug=False)
sender = client.add_async_sender(partition="1")
with pytest.raises(EventHubError):
await client.run_async()
def test_non_existing_entity_receiver(connection_str):
client = EventHubClient.from_connection_string(connection_str, eventhub="nemo", debug=False)
receiver = client.add_receiver("$default", "0")
@ -157,14 +85,6 @@ def test_non_existing_entity_receiver(connection_str):
client.run()
@pytest.mark.asyncio
async def test_non_existing_entity_receiver_async(connection_str):
client = EventHubClientAsync.from_connection_string(connection_str, eventhub="nemo", debug=False)
receiver = client.add_async_receiver("$default", "0")
with pytest.raises(EventHubError):
await client.run_async()
def test_receive_from_invalid_partitions_sync(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
for p in partitions:
@ -178,49 +98,18 @@ def test_receive_from_invalid_partitions_sync(connection_str):
client.stop()
@pytest.mark.asyncio
async def test_receive_from_invalid_partitions_async(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
for p in partitions:
client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
receiver = client.add_async_receiver("$default", p)
try:
with pytest.raises(EventHubError):
await client.run_async()
await receiver.receive(timeout=10)
finally:
await client.stop_async()
def test_send_to_invalid_partitions(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
for p in partitions:
client = EventHubClient.from_connection_string(connection_str, debug=False)
sender = client.add_sender(partition=p)
client.run()
data = EventData(b"A" * 300000)
try:
with pytest.raises(EventHubError):
sender.send(data)
client.run()
finally:
client.stop()
@pytest.mark.asyncio
async def test_send_to_invalid_partitions_async(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
for p in partitions:
client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
sender = client.add_async_sender(partition=p)
await client.run_async()
data = EventData(b"A" * 300000)
try:
with pytest.raises(EventHubError):
await sender.send(data)
finally:
await client.stop_async()
def test_send_too_large_message(connection_str):
client = EventHubClient.from_connection_string(connection_str, debug=True)
sender = client.add_sender()
@ -233,19 +122,6 @@ def test_send_too_large_message(connection_str):
client.stop()
@pytest.mark.asyncio
async def test_send_too_large_message_async(connection_str):
client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
sender = client.add_async_sender()
try:
await client.run_async()
data = EventData(b"A" * 300000)
with pytest.raises(EventHubError):
await sender.send(data)
finally:
await client.stop_async()
def test_send_null_body(connection_str):
partitions = ["XYZ", "-1", "1000", "-" ]
client = EventHubClient.from_connection_string(connection_str, debug=False)
@ -259,54 +135,6 @@ def test_send_null_body(connection_str):
client.stop()
@pytest.mark.asyncio
async def test_send_null_body_async(connection_str):
client = EventHubClientAsync.from_connection_string(connection_str, debug=False)
sender = client.add_async_sender()
try:
await client.run_async()
with pytest.raises(ValueError):
data = EventData(None)
await sender.send(data)
finally:
await client.stop_async()
async def pump(receiver):
messages = 0
count = 0
batch = await receiver.receive(timeout=10)
while batch and count <= 5:
count += 1
messages += len(batch)
batch = await receiver.receive(timeout=10)
return messages
@pytest.mark.asyncio
async def test_max_receivers_async(connection_str, senders):
client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
receivers = []
for i in range(6):
receivers.append(client.add_async_receiver("$default", "0", prefetch=1000, offset=Offset('@latest')))
try:
await client.run_async()
outputs = await asyncio.gather(
pump(receivers[0]),
pump(receivers[1]),
pump(receivers[2]),
pump(receivers[3]),
pump(receivers[4]),
pump(receivers[5]),
return_exceptions=True)
print(outputs)
failed = [o for o in outputs if isinstance(o, EventHubError)]
assert len(failed) == 1
print(failed[0].message)
finally:
await client.stop_async()
def test_message_body_types(connection_str, senders):
client = EventHubClient.from_connection_string(connection_str, debug=False)
receiver = client.add_receiver("$default", "0", offset=Offset('@latest'))
@ -354,8 +182,7 @@ def test_message_body_types(connection_str, senders):
received = receiver.receive(timeout=5)
assert len(received) == 1
assert received[0].body_as_str() == "42"
with pytest.raises(ValueError):
received[0].body
assert received[0].body == 42
except:
raise
finally:

Просмотреть файл

@ -6,12 +6,10 @@
import os
import time
import asyncio
import pytest
from azure import eventhub
from azure.eventhub import (
EventHubClientAsync,
EventData,
Offset,
EventHubError,
@ -39,27 +37,6 @@ def test_send_with_long_interval_sync(connection_str, receivers):
assert list(received[0].body)[0] == b"A single event"
@pytest.mark.asyncio
async def test_send_with_long_interval_async(connection_str, receivers):
#pytest.skip("long running")
client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
sender = client.add_async_sender()
try:
await client.run_async()
await sender.send(EventData(b"A single event"))
for _ in range(2):
await asyncio.sleep(300)
await sender.send(EventData(b"A single event"))
finally:
await client.stop_async()
received = []
for r in receivers:
received.extend(r.receive(timeout=1))
assert len(received) == 3
assert list(received[0].body)[0] == b"A single event"
def test_send_with_forced_conn_close_sync(connection_str, receivers):
#pytest.skip("long running")
client = EventHubClient.from_connection_string(connection_str, debug=True)
@ -85,41 +62,6 @@ def test_send_with_forced_conn_close_sync(connection_str, receivers):
assert list(received[0].body)[0] == b"A single event"
def pump(receiver):
messages = []
batch = receiver.receive(timeout=1)
messages.extend(batch)
while batch:
batch = receiver.receive(timeout=1)
messages.extend(batch)
return messages
@pytest.mark.asyncio
async def test_send_with_forced_conn_close_async(connection_str, receivers):
#pytest.skip("long running")
client = EventHubClientAsync.from_connection_string(connection_str, debug=True)
sender = client.add_async_sender()
try:
await client.run_async()
await sender.send(EventData(b"A single event"))
sender._handler._message_sender.destroy()
await asyncio.sleep(300)
await sender.send(EventData(b"A single event"))
await sender.send(EventData(b"A single event"))
sender._handler._message_sender.destroy()
await asyncio.sleep(300)
await sender.send(EventData(b"A single event"))
await sender.send(EventData(b"A single event"))
finally:
await client.stop_async()
received = []
for r in receivers:
received.extend(pump(r))
assert len(received) == 5
assert list(received[0].body)[0] == b"A single event"
# def test_send_with_forced_link_detach(connection_str, receivers):
# client = EventHubClient.from_connection_string(connection_str, debug=True)
# sender = client.add_sender()

Просмотреть файл

@ -1,3 +1,4 @@
# -- coding: utf-8 --
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
@ -7,6 +8,7 @@
import os
import pytest
import time
import json
from azure import eventhub
from azure.eventhub import EventData, EventHubClient
@ -142,6 +144,24 @@ def test_send_partition(connection_str, receivers):
assert len(partition_1) == 1
def test_send_non_ascii(connection_str, receivers):
client = EventHubClient.from_connection_string(connection_str, debug=False)
sender = client.add_sender(partition="0")
try:
client.run()
sender.send(EventData(u"é,è,à,ù,â,ê,î,ô,û"))
sender.send(EventData(json.dumps({"foo": u"漢字"})))
except:
raise
finally:
client.stop()
partition_0 = receivers[0].receive(timeout=2)
assert len(partition_0) == 2
assert partition_0[0].body_as_str() == u"é,è,à,ù,â,ê,î,ô,û"
assert partition_0[1].body_as_json() == {"foo": u"漢字"}
def test_send_partition_batch(connection_str, receivers):
def batched():
for i in range(10):