[Webpubsub] async API (#34764)
* init * basic function and perf * update * async test * black * optimize retry logic * update for sample * set policy for async test in windows * remove async lock * fix cspell * fix mypy * fix pylint * update dependency * aiohttp>=3.8.0 in setup.py * fix for Linux * optimize for sync api * fix sample * optiize for async api * review * update test * update dependency * update * fix recovery and reconnect logic * update test case * update * optimize * async test for open client error * typing fix * typing fix * typing fix * review for on_error * stress test * fix cspell * fix cspell * update logging for stress * rename stress readme * update * Update sdk/webpubsub/azure-messaging-webpubsubclient/azure/messaging/webpubsubclient/_util.py Co-authored-by: Anna Tisch <antisch@microsoft.com> * Update sdk/webpubsub/azure-messaging-webpubsubclient/azure/messaging/webpubsubclient/_client.py Co-authored-by: Anna Tisch <antisch@microsoft.com> * Update sdk/webpubsub/azure-messaging-webpubsubclient/azure/messaging/webpubsubclient/_client.py Co-authored-by: Anna Tisch <antisch@microsoft.com> * Update sdk/webpubsub/azure-messaging-webpubsubclient/azure/messaging/webpubsubclient/_client.py Co-authored-by: Anna Tisch <antisch@microsoft.com> * Update sdk/webpubsub/azure-messaging-webpubsubclient/azure/messaging/webpubsubclient/_client.py Co-authored-by: Anna Tisch <antisch@microsoft.com> * Update sdk/webpubsub/azure-messaging-webpubsubclient/azure/messaging/webpubsubclient/_client.py Co-authored-by: Anna Tisch <antisch@microsoft.com> * Update sdk/webpubsub/azure-messaging-webpubsubclient/azure/messaging/webpubsubclient/aio/_client.py Co-authored-by: Anna Tisch <antisch@microsoft.com> * review * review * review * update changelog * update test case name * fix test * optimize close logic * mypy fix * optimize client.close logic for async api * udpate changelog * expose is_connected * review * remove nested function * set all created thread as daemon * Update CHANGELOG.md --------- Co-authored-by: Anna Tisch <antisch@microsoft.com>
This commit is contained in:
Родитель
f892ccf857
Коммит
a4a0ad14ac
|
@ -1712,11 +1712,17 @@
|
|||
]
|
||||
},
|
||||
{
|
||||
"filename": "sdk/webpubsub/azure-messaging-webpubsubclient/azure/messaging/webpubsubclient/_client.py",
|
||||
"filename": "sdk/webpubsub/azure-messaging-webpubsubclient/azure/messaging/webpubsubclient/**/_client.py",
|
||||
"words": [
|
||||
"awps"
|
||||
]
|
||||
},
|
||||
{
|
||||
"filename": "sdk/webpubsub/azure-messaging-webpubsubclient/stress/*.py",
|
||||
"words": [
|
||||
"psutil"
|
||||
]
|
||||
},
|
||||
{
|
||||
"filename": "sdk/contentsafety/azure-ai-contentsafety/azure/ai/contentsafety/*.py",
|
||||
"words": [
|
||||
|
|
|
@ -1,15 +1,20 @@
|
|||
# Release History
|
||||
|
||||
## 1.0.1 (Unreleased)
|
||||
## 1.1.0 (2024-04-24)
|
||||
|
||||
### Features Added
|
||||
|
||||
### Breaking Changes
|
||||
|
||||
### Bugs Fixed
|
||||
- Add Async API with same name of Sync API
|
||||
- Add api `is_connected`
|
||||
|
||||
### Other Changes
|
||||
|
||||
- Change default reconnect times to unlimited
|
||||
- Optimize reconnect/recover logic
|
||||
- Optimize error message hint when failed to open client
|
||||
- Optimize typing annotations
|
||||
- Optimize close logic for Sync API.
|
||||
|
||||
## 1.0.0 (2024-01-31)
|
||||
|
||||
### Features Added
|
||||
|
|
|
@ -60,7 +60,7 @@ Note that a client can only receive messages from groups that it has joined and
|
|||
|
||||
# Registers a listener for the event 'group-message' early before joining a group to not miss messages
|
||||
group_name = "group1";
|
||||
client.subscribe("group-message", lambda e: print(f"Received message: {e.data}"));
|
||||
client.subscribe(CallbackType.GROUP_MESSAGE, lambda e: print(f"Received message: {e.data}"));
|
||||
|
||||
# A client needs to join the group it wishes to receive messages from
|
||||
client.join_group(groupName);
|
||||
|
@ -72,7 +72,7 @@ client.join_group(groupName);
|
|||
# ...continues the code snippet from above
|
||||
|
||||
# Send a message to a joined group
|
||||
client.send_to_group(group_name, "hello world", "text");
|
||||
client.send_to_group(group_name, "hello world", WebPubSubDataType.TEXT);
|
||||
|
||||
# In the Console tab of your developer tools found in your browser, you should see the message printed there.
|
||||
```
|
||||
|
@ -84,19 +84,19 @@ client.send_to_group(group_name, "hello world", "text");
|
|||
1. When a client is successfully connected to your Web PubSub resource, the `connected` event is triggered.
|
||||
|
||||
```python
|
||||
client.subscribe("connected", lambda e: print(f"Connection {e.connection_id} is connected"))
|
||||
client.subscribe(CallbackType.CONNECTED, lambda e: print(f"Connection {e.connection_id} is connected"))
|
||||
```
|
||||
|
||||
2. When a client is disconnected and fails to recover the connection, the `disconnected` event is triggered.
|
||||
|
||||
```python
|
||||
client.subscribe("disconnected", lambda e: print(f"Connection disconnected: {e.message}"))
|
||||
client.subscribe(CallbackType.DISCONNECTED, lambda e: print(f"Connection disconnected: {e.message}"))
|
||||
```
|
||||
|
||||
3. The `stopped` event will be triggered when the client is disconnected *and* the client stops trying to reconnect. This usually happens after the `client.close()` is called, or `auto_reconnect` is disabled or a specified limit to trying to reconnect has reached. If you want to restart the client, you can call `client.open()` in the stopped event.
|
||||
|
||||
```python
|
||||
client.subscribe("stopped", lambda : print("Client has stopped"))
|
||||
client.subscribe(CallbackType.STOPPED, lambda : print("Client has stopped"))
|
||||
```
|
||||
|
||||
---
|
||||
|
@ -106,10 +106,10 @@ A client can add callbacks to consume messages from your application server or g
|
|||
|
||||
```python
|
||||
# Registers a listener for the "server-message". The callback will be invoked when your application server sends message to the connectionID, to or broadcast to all connections.
|
||||
client.subscribe("server-message", lambda e: print(f"Received message {e.data}"))
|
||||
client.subscribe(CallbackType.SERVER_MESSAGE, lambda e: print(f"Received message {e.data}"))
|
||||
|
||||
# Registers a listener for the "group-message". The callback will be invoked when the client receives a message from the groups it has joined.
|
||||
client.subscribe("group-message", lambda e: print(f"Received message from {e.group}: {e.data}"))
|
||||
client.subscribe(CallbackType.GROUP_MESSAGE, lambda e: print(f"Received message from {e.group}: {e.data}"))
|
||||
```
|
||||
|
||||
---
|
||||
|
@ -125,7 +125,7 @@ However, you should be aware of `auto_rejoin_groups`'s limitations.
|
|||
client = WebPubSubClient("<client-access-url>", auto_rejoin_groups=True);
|
||||
|
||||
# Registers a listener to handle "rejoin-group-failed" event
|
||||
client.subscribe("rejoin-group-failed", lambda e: print(f"Rejoin group {e.group} failed: {e.error}"))
|
||||
client.subscribe(CallbackType.REJOIN_GROUP_FAILED, lambda e: print(f"Rejoin group {e.group} failed: {e.error}"))
|
||||
```
|
||||
|
||||
---
|
||||
|
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -320,8 +320,7 @@ class _MyMutableMapping(MutableMapping[str, typing.Any]): # pylint: disable=uns
|
|||
...
|
||||
|
||||
@typing.overload
|
||||
def pop(self, key: str, default: typing.Any) -> typing.Any:
|
||||
...
|
||||
def pop(self, key: str, default: typing.Any) -> typing.Any: ...
|
||||
|
||||
def pop(self, key: str, default: typing.Any = _UNSET) -> typing.Any:
|
||||
if default is _UNSET:
|
||||
|
@ -338,12 +337,10 @@ class _MyMutableMapping(MutableMapping[str, typing.Any]): # pylint: disable=uns
|
|||
self._data.update(*args, **kwargs)
|
||||
|
||||
@typing.overload # type: ignore
|
||||
def setdefault(self, key: str) -> typing.Any:
|
||||
...
|
||||
def setdefault(self, key: str) -> typing.Any: ...
|
||||
|
||||
@typing.overload
|
||||
def setdefault(self, key: str, default: typing.Any) -> typing.Any:
|
||||
...
|
||||
def setdefault(self, key: str, default: typing.Any) -> typing.Any: ...
|
||||
|
||||
def setdefault(self, key: str, default: typing.Any = _UNSET) -> typing.Any:
|
||||
if default is _UNSET:
|
||||
|
|
|
@ -3,10 +3,14 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------
|
||||
from typing import Optional
|
||||
from typing import Optional, Any
|
||||
from azure.core.pipeline.policies import UserAgentPolicy
|
||||
|
||||
from .models._models import SendMessageError, AckMessageError
|
||||
from ._version import VERSION
|
||||
|
||||
NO_ACK_MESSAGE_ERROR = "NoAckMessageReceivedFromServer"
|
||||
|
||||
|
||||
def format_user_agent(user_agent: Optional[str] = None) -> str:
|
||||
"""Format user agent string.
|
||||
|
@ -16,3 +20,15 @@ def format_user_agent(user_agent: Optional[str] = None) -> str:
|
|||
:rtype: str
|
||||
"""
|
||||
return UserAgentPolicy(user_agent=user_agent, sdk_moniker=f"webpubsub-client/{VERSION}").user_agent
|
||||
|
||||
|
||||
def raise_for_empty_message_ack(message_ack: Optional[Any], ack_id: Optional[int] = None):
|
||||
if message_ack is None:
|
||||
raise SendMessageError(
|
||||
message="Failed to send message.",
|
||||
ack_id=ack_id,
|
||||
error_detail=AckMessageError(
|
||||
name=NO_ACK_MESSAGE_ERROR,
|
||||
message="The connection may have been lost during message sending, or the service didn't send ack message.", # pylint: disable=line-too-long
|
||||
),
|
||||
)
|
||||
|
|
|
@ -4,4 +4,4 @@
|
|||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
VERSION = "1.0.1"
|
||||
VERSION = "1.1.0"
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
# coding=utf-8
|
||||
# --------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------
|
||||
try:
|
||||
from ._client import WebPubSubClient, WebPubSubClientCredential
|
||||
except ImportError as e:
|
||||
raise ImportError("package aiohttp is not installed.") from e
|
||||
|
||||
__all__ = [
|
||||
"WebPubSubClient",
|
||||
"WebPubSubClientCredential",
|
||||
]
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -3,9 +3,11 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------
|
||||
# pylint: disable=too-many-lines
|
||||
import sys
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any, Mapping, overload, Union, Optional, TypeVar, Tuple, Dict
|
||||
from typing import Any, Mapping, overload, Union, Optional, TypeVar, Tuple, Dict, Literal
|
||||
import json
|
||||
import math
|
||||
import threading
|
||||
|
@ -20,10 +22,6 @@ if sys.version_info >= (3, 9):
|
|||
from collections.abc import MutableMapping
|
||||
else:
|
||||
from typing import MutableMapping # type: ignore # pylint: disable=ungrouped-imports
|
||||
if sys.version_info >= (3, 8):
|
||||
from typing import Literal # pylint: disable=no-name-in-module, ungrouped-imports
|
||||
else:
|
||||
from typing_extensions import Literal # type: ignore # pylint: disable=ungrouped-imports
|
||||
JSON = MutableMapping[str, Any] # pylint: disable=unsubscriptable-object
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
@ -110,7 +108,7 @@ class SendEventMessage:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
data_type: Union[WebPubSubDataType, str],
|
||||
data_type: WebPubSubDataType,
|
||||
data: Any,
|
||||
event: str,
|
||||
ack_id: Optional[int] = None,
|
||||
|
@ -144,12 +142,10 @@ class JoinGroupData(_model_base.Model):
|
|||
type: Literal["joinGroup"] = "joinGroup", # pylint: disable=redefined-builtin
|
||||
group: str,
|
||||
ack_id: Optional[int] = None,
|
||||
) -> None:
|
||||
...
|
||||
) -> None: ...
|
||||
|
||||
@overload
|
||||
def __init__(self, mapping: Mapping[str, Any]):
|
||||
...
|
||||
def __init__(self, mapping: Mapping[str, Any]): ...
|
||||
|
||||
def __init__(self, *args, **kwargs): # pylint: disable=useless-super-delegation
|
||||
super().__init__(*args, **kwargs)
|
||||
|
@ -177,12 +173,10 @@ class LeaveGroupData(_model_base.Model):
|
|||
type: Literal["leaveGroup"] = "leaveGroup", # pylint: disable=redefined-builtin
|
||||
group: str,
|
||||
ack_id: Optional[int] = None,
|
||||
) -> None:
|
||||
...
|
||||
) -> None: ...
|
||||
|
||||
@overload
|
||||
def __init__(self, mapping: Mapping[str, Any]):
|
||||
...
|
||||
def __init__(self, mapping: Mapping[str, Any]): ...
|
||||
|
||||
def __init__(self, *args, **kwargs): # pylint: disable=useless-super-delegation
|
||||
super().__init__(*args, **kwargs)
|
||||
|
@ -214,16 +208,14 @@ class SendEventData(_model_base.Model):
|
|||
self,
|
||||
*,
|
||||
type: Literal["event"] = "event", # pylint: disable=redefined-builtin
|
||||
data_type: Union[WebPubSubDataType, str],
|
||||
data_type: WebPubSubDataType,
|
||||
data: Any,
|
||||
event: str,
|
||||
ack_id: Optional[int] = None,
|
||||
) -> None:
|
||||
...
|
||||
) -> None: ...
|
||||
|
||||
@overload
|
||||
def __init__(self, mapping: Mapping[str, Any]):
|
||||
...
|
||||
def __init__(self, mapping: Mapping[str, Any]): ...
|
||||
|
||||
def __init__(self, *args, **kwargs): # pylint: disable=useless-super-delegation
|
||||
super().__init__(*args, **kwargs)
|
||||
|
@ -259,16 +251,14 @@ class SendToGroupData(_model_base.Model):
|
|||
*,
|
||||
type: Literal["sendToGroup"] = "sendToGroup", # pylint: disable=redefined-builtin
|
||||
group: str,
|
||||
data_type: Union[WebPubSubDataType, str],
|
||||
data_type: WebPubSubDataType,
|
||||
data: Any,
|
||||
no_echo: bool,
|
||||
ack_id: Optional[int] = None,
|
||||
) -> None:
|
||||
...
|
||||
) -> None: ...
|
||||
|
||||
@overload
|
||||
def __init__(self, mapping: Mapping[str, Any]):
|
||||
...
|
||||
def __init__(self, mapping: Mapping[str, Any]): ...
|
||||
|
||||
def __init__(self, *args, **kwargs): # pylint: disable=useless-super-delegation
|
||||
super().__init__(*args, **kwargs)
|
||||
|
@ -292,12 +282,10 @@ class SequenceAckData(_model_base.Model):
|
|||
*,
|
||||
type: Literal["sequenceAck"] = "sequenceAck", # pylint: disable=redefined-builtin
|
||||
sequence_id: int,
|
||||
) -> None:
|
||||
...
|
||||
) -> None: ...
|
||||
|
||||
@overload
|
||||
def __init__(self, mapping: Mapping[str, Any]):
|
||||
...
|
||||
def __init__(self, mapping: Mapping[str, Any]): ...
|
||||
|
||||
def __init__(self, *args, **kwargs): # pylint: disable=useless-super-delegation
|
||||
super().__init__(*args, **kwargs)
|
||||
|
@ -371,7 +359,7 @@ class GroupDataMessage:
|
|||
def __init__(
|
||||
self,
|
||||
*,
|
||||
data_type: Union[WebPubSubDataType, str],
|
||||
data_type: WebPubSubDataType,
|
||||
data: Any,
|
||||
group: str,
|
||||
from_user_id: Optional[str] = None,
|
||||
|
@ -398,7 +386,7 @@ class ServerDataMessage:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
data_type: Union[WebPubSubDataType, str],
|
||||
data_type: WebPubSubDataType,
|
||||
data: Any,
|
||||
sequence_id: Optional[int] = None,
|
||||
) -> None:
|
||||
|
@ -425,7 +413,7 @@ class SendToGroupMessage:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
data_type: Union[WebPubSubDataType, str],
|
||||
data_type: WebPubSubDataType,
|
||||
data: Any,
|
||||
group: str,
|
||||
no_echo: bool,
|
||||
|
@ -453,6 +441,14 @@ WebPubSubMessage = TypeVar(
|
|||
AckMessage,
|
||||
)
|
||||
|
||||
SendMessageType = TypeVar(
|
||||
"SendMessageType",
|
||||
SendToGroupMessage,
|
||||
SendEventMessage,
|
||||
JoinGroupMessage,
|
||||
LeaveGroupMessage,
|
||||
)
|
||||
|
||||
|
||||
def get_pay_load(data: Any, data_type: Union[WebPubSubDataType, str]) -> Any:
|
||||
"""Get payload from data with data_type
|
||||
|
@ -516,7 +512,14 @@ class WebPubSubClientProtocol:
|
|||
@staticmethod
|
||||
def parse_messages(
|
||||
raw_message: str,
|
||||
) -> Union[ConnectedMessage, DisconnectedMessage, GroupDataMessage, ServerDataMessage, AckMessage, None,]:
|
||||
) -> Union[
|
||||
ConnectedMessage,
|
||||
DisconnectedMessage,
|
||||
GroupDataMessage,
|
||||
ServerDataMessage,
|
||||
AckMessage,
|
||||
None,
|
||||
]:
|
||||
"""Parse messages from raw message
|
||||
|
||||
:param raw_message: The raw message. Required.
|
||||
|
@ -566,9 +569,9 @@ class WebPubSubClientProtocol:
|
|||
return AckMessage(
|
||||
ack_id=message["ackId"],
|
||||
success=message["success"],
|
||||
error=AckMessageError(name=error["name"], message=error["message"])
|
||||
if isinstance(error, dict)
|
||||
else None,
|
||||
error=(
|
||||
AckMessageError(name=error["name"], message=error["message"]) if isinstance(error, dict) else None
|
||||
),
|
||||
)
|
||||
_LOGGER.error("wrong message type: %s", message["type"])
|
||||
return None
|
||||
|
@ -632,7 +635,7 @@ class WebPubSubJsonReliableProtocol(WebPubSubClientProtocol):
|
|||
self.name = "json.reliable.webpubsub.azure.v1"
|
||||
|
||||
|
||||
class SendMessageErrorOptions:
|
||||
class _SendMessageErrorOptions:
|
||||
"""Options for send message error
|
||||
:ivar ack_id: The ack id of the message.
|
||||
:vartype ack_id: int
|
||||
|
@ -647,9 +650,42 @@ class SendMessageErrorOptions:
|
|||
) -> None:
|
||||
self.ack_id = ack_id
|
||||
self.error_detail = error_detail
|
||||
|
||||
|
||||
class SendMessageErrorOptions(_SendMessageErrorOptions):
|
||||
"""Options for send message error
|
||||
:ivar ack_id: The ack id of the message.
|
||||
:vartype ack_id: int
|
||||
:ivar error_detail: The error details of the message.
|
||||
:vartype error_detail: ~azure.messaging.webpubsubclient.models.AckMessageError
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ack_id: Optional[int] = None,
|
||||
error_detail: Optional[AckMessageError] = None,
|
||||
) -> None:
|
||||
super().__init__(ack_id, error_detail)
|
||||
self.cv = threading.Condition()
|
||||
|
||||
|
||||
class SendMessageErrorOptionsAsync(_SendMessageErrorOptions):
|
||||
"""Async Options for send message error
|
||||
:ivar ack_id: The ack id of the message.
|
||||
:vartype ack_id: int
|
||||
:ivar error_detail: The error details of the message.
|
||||
:vartype error_detail: ~azure.messaging.webpubsubclient.models.AckMessageError
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ack_id: Optional[int] = None,
|
||||
error_detail: Optional[AckMessageError] = None,
|
||||
) -> None:
|
||||
super().__init__(ack_id, error_detail)
|
||||
self.event = asyncio.Event()
|
||||
|
||||
|
||||
class SendMessageError(AzureError):
|
||||
"""Exception raised when fail to send message
|
||||
|
||||
|
@ -691,7 +727,7 @@ class OnGroupDataMessageArgs:
|
|||
def __init__(
|
||||
self,
|
||||
*,
|
||||
data_type: Union[WebPubSubDataType, str],
|
||||
data_type: WebPubSubDataType,
|
||||
data: Any,
|
||||
group: str,
|
||||
from_user_id: Optional[str] = None,
|
||||
|
@ -717,7 +753,7 @@ class OnServerDataMessageArgs:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
data_type: Union[WebPubSubDataType, str],
|
||||
data_type: WebPubSubDataType,
|
||||
data: Any,
|
||||
sequence_id: Optional[int] = None,
|
||||
) -> None:
|
||||
|
@ -936,5 +972,57 @@ class AckMap:
|
|||
self.ack_map.clear()
|
||||
|
||||
|
||||
class AckMapAsync:
|
||||
"""Async Ack map"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.ack_map: Dict[int, SendMessageErrorOptionsAsync] = {}
|
||||
|
||||
def add(self, ack_id: int, options: SendMessageErrorOptionsAsync) -> None:
|
||||
"""Add ack id to ack map
|
||||
|
||||
:param ack_id: The ack id. Required.
|
||||
:type ack_id: int
|
||||
:param options: The options. Required.
|
||||
:type options: SendMessageErrorOptions
|
||||
"""
|
||||
self.ack_map[ack_id] = options
|
||||
|
||||
def pop(self, ack_id: int) -> Optional[SendMessageErrorOptionsAsync]:
|
||||
"""Pop ack id from ack map
|
||||
|
||||
:param ack_id: The ack id. Required.
|
||||
:type ack_id: int
|
||||
:return: The options.
|
||||
:rtype: SendMessageErrorOptions or None
|
||||
"""
|
||||
return self.ack_map.pop(ack_id, None)
|
||||
|
||||
def get(self, ack_id: int) -> Optional[SendMessageErrorOptionsAsync]:
|
||||
"""Get ack id from ack map
|
||||
|
||||
:param ack_id: The ack id. Required.
|
||||
:type ack_id: int
|
||||
:return: The options.
|
||||
:rtype: SendMessageErrorOptions or None
|
||||
"""
|
||||
return self.ack_map.get(ack_id)
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Clear ack map"""
|
||||
for key, value in self.ack_map.items():
|
||||
_LOGGER.debug("clear ack map with ack id: %s", key)
|
||||
value.event.set()
|
||||
self.ack_map.clear()
|
||||
|
||||
|
||||
class OpenClientError(AzureError):
|
||||
"""Exception raised when fail to start the client"""
|
||||
|
||||
|
||||
class ReconnectError(AzureError):
|
||||
"""Exception raised when fail to reconnect"""
|
||||
|
||||
|
||||
class RecoverError(AzureError):
|
||||
"""Exception raised when fail to reconnect or recover the client"""
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
-e ../../../tools/azure-sdk-tools
|
||||
../../core/azure-core
|
||||
../../identity/azure-identity
|
||||
aiohttp
|
||||
azure-messaging-webpubsubservice==1.1.0b1
|
||||
psutil
|
||||
aiohttp>=3.9.3
|
||||
|
|
|
@ -10,6 +10,8 @@ from azure.messaging.webpubsubclient.models import (
|
|||
OnConnectedArgs,
|
||||
OnGroupDataMessageArgs,
|
||||
OnDisconnectedArgs,
|
||||
CallbackType,
|
||||
WebPubSubDataType,
|
||||
)
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
@ -35,7 +37,7 @@ def on_group_message(msg: OnGroupDataMessageArgs):
|
|||
|
||||
|
||||
def main():
|
||||
service_client = WebPubSubServiceClient.from_connection_string(
|
||||
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
|
||||
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
|
||||
)
|
||||
client = WebPubSubClient(
|
||||
|
@ -47,16 +49,16 @@ def main():
|
|||
)
|
||||
|
||||
with client:
|
||||
client.subscribe("connected", on_connected)
|
||||
client.subscribe("disconnected", on_disconnected)
|
||||
client.subscribe("group-message", on_group_message)
|
||||
group_name = "test"
|
||||
client.subscribe(CallbackType.CONNECTED, on_connected)
|
||||
client.subscribe(CallbackType.DISCONNECTED, on_disconnected)
|
||||
client.subscribe(CallbackType.GROUP_MESSAGE, on_group_message)
|
||||
group_name = "hello_world"
|
||||
client.join_group(group_name)
|
||||
client.send_to_group(group_name, "hello text", "text", no_echo=False, ack=False)
|
||||
client.send_to_group(group_name, {"hello": "json"}, "json")
|
||||
client.send_to_group(group_name, "hello json", "json")
|
||||
client.send_to_group(group_name, "hello text", WebPubSubDataType.TEXT, no_echo=False, ack=False)
|
||||
client.send_to_group(group_name, {"hello": "json"}, WebPubSubDataType.JSON)
|
||||
client.send_to_group(group_name, "hello text", WebPubSubDataType.TEXT)
|
||||
content = memoryview("hello binary".encode())
|
||||
client.send_to_group(group_name, content, "binary")
|
||||
client.send_to_group(group_name, content, WebPubSubDataType.BINARY)
|
||||
|
||||
# If you can't run client in context, please open/close client manually like:
|
||||
# client.open()
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
# coding=utf-8
|
||||
# --------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------
|
||||
import os
|
||||
import asyncio
|
||||
from azure.messaging.webpubsubclient.aio import WebPubSubClient, WebPubSubClientCredential
|
||||
from azure.messaging.webpubsubservice.aio import WebPubSubServiceClient
|
||||
from azure.messaging.webpubsubclient.models import (
|
||||
OnConnectedArgs,
|
||||
OnGroupDataMessageArgs,
|
||||
OnDisconnectedArgs,
|
||||
CallbackType,
|
||||
WebPubSubDataType,
|
||||
)
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
async def on_connected(msg: OnConnectedArgs):
|
||||
print("======== connected ===========")
|
||||
print(f"Connection {msg.connection_id} is connected")
|
||||
|
||||
|
||||
async def on_disconnected(msg: OnDisconnectedArgs):
|
||||
print("========== disconnected =========")
|
||||
print(f"connection is disconnected: {msg.message}")
|
||||
|
||||
|
||||
async def on_group_message(msg: OnGroupDataMessageArgs):
|
||||
print("========== group message =========")
|
||||
if isinstance(msg.data, memoryview):
|
||||
print(f"Received message from {msg.group}: {bytes(msg.data).decode()}")
|
||||
else:
|
||||
print(f"Received message from {msg.group}: {msg.data}")
|
||||
|
||||
|
||||
async def main():
|
||||
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
|
||||
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
|
||||
)
|
||||
async def client_access_url_provider():
|
||||
return (await service_client.get_client_access_token(
|
||||
roles=["webpubsub.joinLeaveGroup", "webpubsub.sendToGroup"]
|
||||
))["url"]
|
||||
client = WebPubSubClient(
|
||||
credential=WebPubSubClientCredential(client_access_url_provider=client_access_url_provider),
|
||||
)
|
||||
|
||||
async with client:
|
||||
await client.subscribe(CallbackType.CONNECTED, on_connected)
|
||||
await client.subscribe(CallbackType.DISCONNECTED, on_disconnected)
|
||||
await client.subscribe(CallbackType.GROUP_MESSAGE, on_group_message)
|
||||
group_name = "hello_world_async"
|
||||
await client.join_group(group_name)
|
||||
await client.send_to_group(
|
||||
group_name, "hello text", WebPubSubDataType.TEXT, no_echo=False, ack=False
|
||||
)
|
||||
await client.send_to_group(group_name, {"hello": "json"}, WebPubSubDataType.JSON)
|
||||
await client.send_to_group(group_name, "hello text", WebPubSubDataType.TEXT)
|
||||
content = memoryview("hello binary".encode())
|
||||
await client.send_to_group(group_name, content, WebPubSubDataType.BINARY)
|
||||
|
||||
# If you can't run client in context, please open/close client manually like:
|
||||
# await client.open()
|
||||
# ...
|
||||
# await client.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
|
@ -7,14 +7,17 @@ import os
|
|||
import logging
|
||||
from azure.messaging.webpubsubclient import WebPubSubClient, WebPubSubClientCredential
|
||||
from azure.messaging.webpubsubservice import WebPubSubServiceClient
|
||||
from azure.messaging.webpubsubservice.models import OpenClientError, SendMessageError
|
||||
from azure.messaging.webpubsubclient.models import OpenClientError, SendMessageError
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# The following code is to show how to handle exceptions in WebPubSubClient, and it
|
||||
# may not run directly
|
||||
def main():
|
||||
service_client = WebPubSubServiceClient.from_connection_string(
|
||||
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
|
||||
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
|
||||
)
|
||||
client = WebPubSubClient(
|
||||
|
@ -36,7 +39,7 @@ def main():
|
|||
# catch SendMessageError
|
||||
while True:
|
||||
try:
|
||||
client.join_group(group_name="test")
|
||||
client.join_group(group_name="hello_world_exceptions")
|
||||
break
|
||||
except SendMessageError as err:
|
||||
if err.error_detail is None:
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
# coding=utf-8
|
||||
# --------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------
|
||||
import os
|
||||
import time
|
||||
import asyncio
|
||||
from azure.messaging.webpubsubservice import WebPubSubServiceClient
|
||||
from azure.messaging.webpubsubservice.aio import WebPubSubServiceClient as WebPubSubServiceClientAsync
|
||||
from azure.messaging.webpubsubclient.aio import WebPubSubClient as AsyncClient
|
||||
from azure.messaging.webpubsubclient.aio import WebPubSubClientCredential as WebPubSubClientCredentialAsync
|
||||
from azure.messaging.webpubsubclient import WebPubSubClient as Client
|
||||
from azure.messaging.webpubsubclient import WebPubSubClientCredential
|
||||
from azure.messaging.webpubsubclient.models import WebPubSubDataType
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
MESSAGE_COUNT = 100
|
||||
TIME_COST = 0.0
|
||||
TIME_COST_ASYNC = 0.0
|
||||
|
||||
|
||||
def client_access_url_provider():
|
||||
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
|
||||
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
|
||||
)
|
||||
return service_client.get_client_access_token(
|
||||
roles=["webpubsub.joinLeaveGroup", "webpubsub.sendToGroup"]
|
||||
)["url"]
|
||||
|
||||
async def client_access_url_provider_async():
|
||||
service_client_async = WebPubSubServiceClientAsync.from_connection_string( # type: ignore
|
||||
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
|
||||
)
|
||||
return (await service_client_async.get_client_access_token(
|
||||
roles=["webpubsub.joinLeaveGroup", "webpubsub.sendToGroup"]
|
||||
))["url"]
|
||||
|
||||
|
||||
def send() -> None:
|
||||
global TIME_COST
|
||||
client = Client(
|
||||
credential=WebPubSubClientCredential(
|
||||
client_access_url_provider=client_access_url_provider
|
||||
),
|
||||
)
|
||||
|
||||
with client:
|
||||
group_name = "test"
|
||||
client.join_group(group_name)
|
||||
start = time.time()
|
||||
for i in range(MESSAGE_COUNT):
|
||||
client.send_to_group(group_name, {"hello": "json"}, WebPubSubDataType.JSON)
|
||||
print(f"send message {i} with Sync API ")
|
||||
TIME_COST = time.time() - start
|
||||
|
||||
|
||||
async def send_item_async(client_async, idx):
|
||||
print(f"send message {idx} with Async API")
|
||||
await client_async.send_to_group("test", {"hello": "json"}, WebPubSubDataType.JSON)
|
||||
|
||||
|
||||
async def send_async() -> None:
|
||||
global TIME_COST_ASYNC
|
||||
client_async = AsyncClient(
|
||||
credential=WebPubSubClientCredentialAsync(
|
||||
client_access_url_provider=client_access_url_provider_async
|
||||
),
|
||||
)
|
||||
async with client_async:
|
||||
group_name = "send_compare_between_async_sync"
|
||||
await client_async.join_group(group_name)
|
||||
start = time.time()
|
||||
await asyncio.gather(
|
||||
*[send_item_async(client_async, i) for i in range(MESSAGE_COUNT)]
|
||||
)
|
||||
TIME_COST_ASYNC = time.time() - start
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
send()
|
||||
asyncio.get_event_loop().run_until_complete(send_async())
|
||||
print(
|
||||
f"it takes {TIME_COST} seconds to send {MESSAGE_COUNT} messages with Sync API"
|
||||
)
|
||||
print(
|
||||
f"it takes {TIME_COST_ASYNC} seconds to send {MESSAGE_COUNT} messages with Async API"
|
||||
)
|
|
@ -42,11 +42,11 @@ setup(
|
|||
"Programming Language :: Python",
|
||||
"Programming Language :: Python :: 3 :: Only",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
],
|
||||
zip_safe=False,
|
||||
|
@ -63,10 +63,14 @@ setup(
|
|||
"pytyped": ["py.typed"],
|
||||
},
|
||||
install_requires=[
|
||||
"isodate<1.0.0,>=0.6.1",
|
||||
"azure-core<2.0.0,>=1.24.0",
|
||||
"typing-extensions>=4.3.0; python_version<'3.8.0'",
|
||||
"websocket-client<2.0.0,>=1.4.2"
|
||||
"isodate>=0.6.1",
|
||||
"azure-core>=1.26.3",
|
||||
"websocket-client>=1.6.0",
|
||||
],
|
||||
python_requires=">=3.7",
|
||||
extras_require={
|
||||
"aio": [
|
||||
"aiohttp>=3.9.3",
|
||||
]
|
||||
},
|
||||
python_requires=">=3.8",
|
||||
)
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
# WebPubSub Client Stress Tests
|
||||
|
||||
This doc shows how to run stress test.
|
||||
|
||||
## Setup for local stress test runs
|
||||
|
||||
```cmd
|
||||
(env) ~/azure-messaging-webpubsubclient> pip install .
|
||||
(env) ~/azure-messaging-webpubsubclient> pip install -r dev_requirements.txt
|
||||
```
|
||||
|
||||
## Test commands for local testing
|
||||
|
||||
Run the chosen test file with the following command:
|
||||
|
||||
```cmd
|
||||
(env) ~/azure-messaging-webpubsubclient/stress> python stress_base_async.py
|
||||
```
|
||||
|
||||
You can also add `--help` in command to see the arguments for stress test:
|
||||
|
||||
```cmd
|
||||
(env) ~/azure-messaging-webpubsubclient/stress> python stress_base_async.py --help
|
||||
```
|
|
@ -0,0 +1,85 @@
|
|||
# --------------------------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
import os
|
||||
import sys
|
||||
import logging
|
||||
import psutil
|
||||
import threading
|
||||
import time
|
||||
|
||||
|
||||
def get_base_logger(logger_name: str, log_file_name: str = ""):
|
||||
logger = logging.getLogger(logger_name)
|
||||
logger.setLevel(logging.INFO)
|
||||
formatter = logging.Formatter("%(asctime)s %(name)-12s %(levelname)-8s %(message)s")
|
||||
handers = [logging.StreamHandler(sys.stdout)]
|
||||
if log_file_name:
|
||||
handers.append(logging.FileHandler(log_file_name, mode="w"))
|
||||
for handler in handers:
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
|
||||
return logger
|
||||
|
||||
|
||||
class ProcessMonitor:
|
||||
def __init__(self, logger_name, log_file_name: str, log_interval: int):
|
||||
"""
|
||||
Process Monitor monitors the CPU usage, memory usage of a specific process.
|
||||
:param logger_name: The name for the logger.
|
||||
:param log_interval: The interval of logging.
|
||||
"""
|
||||
self._monitor_thread = None
|
||||
self._logger = get_base_logger(
|
||||
logger_name=logger_name,
|
||||
log_file_name=log_file_name,
|
||||
)
|
||||
self._pid = os.getpid()
|
||||
self._process_instance = psutil.Process(self._pid)
|
||||
self._log_interval = log_interval
|
||||
self.running = False
|
||||
|
||||
def __enter__(self):
|
||||
print("Process monitor start working.")
|
||||
self.start()
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.stop()
|
||||
print("Process monitor stop working.")
|
||||
|
||||
def _monitor_work(self):
|
||||
while self.running:
|
||||
log_content = (
|
||||
"process status: {},"
|
||||
"process cpu usage percent: {},"
|
||||
"process memory usage percent: {:.3f}".format(
|
||||
self._process_instance.status(),
|
||||
self._process_instance.cpu_percent(),
|
||||
self._process_instance.memory_percent(),
|
||||
)
|
||||
)
|
||||
self._logger.info(log_content)
|
||||
time.sleep(self._log_interval)
|
||||
|
||||
@property
|
||||
def memory_usage_percent(self):
|
||||
return self._process_instance.memory_percent() * 100
|
||||
|
||||
@property
|
||||
def cpu_usage_percent(self):
|
||||
return self._process_instance.cpu_percent()
|
||||
|
||||
def start(self):
|
||||
self.running = True
|
||||
self._monitor_thread = threading.Thread(target=self._monitor_work, daemon=True)
|
||||
self._monitor_thread.start()
|
||||
self._logger.info("Start monitoring process id:{}".format(self._pid))
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
self._monitor_thread.join()
|
||||
self._logger.info("Stop monitoring process id:{}".format(self._pid))
|
||||
self._monitor_thread = None
|
|
@ -0,0 +1,71 @@
|
|||
# --------------------------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for license information.
|
||||
# --------------------------------------------------------------------------------------------
|
||||
import os
|
||||
import time
|
||||
import asyncio
|
||||
from argparse import ArgumentParser
|
||||
from pathlib import Path
|
||||
from azure.messaging.webpubsubclient.aio import WebPubSubClient, WebPubSubClientCredential
|
||||
from azure.messaging.webpubsubservice.aio import WebPubSubServiceClient
|
||||
from azure.messaging.webpubsubclient.models import (
|
||||
WebPubSubDataType,
|
||||
)
|
||||
from process_monitor import ProcessMonitor, get_base_logger
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
async def main(log_file_name: str = "", log_interval: int = 5, duration: int = 24 * 3600, messages_num: int = 1000):
|
||||
logger = get_base_logger(__name__, log_file_name)
|
||||
load_dotenv()
|
||||
service_client = WebPubSubServiceClient.from_connection_string( # type: ignore
|
||||
connection_string=os.getenv("WEBPUBSUB_CONNECTION_STRING", ""), hub="hub"
|
||||
)
|
||||
async def client_access_url_provider():
|
||||
return (await service_client.get_client_access_token(
|
||||
roles=["webpubsub.joinLeaveGroup", "webpubsub.sendToGroup"]
|
||||
))["url"]
|
||||
client = WebPubSubClient(
|
||||
credential=WebPubSubClientCredential(
|
||||
client_access_url_provider=client_access_url_provider
|
||||
),
|
||||
)
|
||||
message = "0" * 1024
|
||||
start_time = time.time()
|
||||
with ProcessMonitor(logger_name=Path(__file__).name, log_file_name=log_file_name, log_interval=log_interval):
|
||||
async with client:
|
||||
while time.time() - start_time < duration:
|
||||
group_name = "test"
|
||||
# await client.join_group(group_name)
|
||||
await asyncio.gather(
|
||||
*[client.send_to_group(group_name, message, WebPubSubDataType.TEXT) for _ in range(messages_num)]
|
||||
)
|
||||
logger.info(f"Succeed to send {messages_num} messages")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--log-file-name",
|
||||
help="output log file name. Default value is '' which means doesn't output",
|
||||
type=str,
|
||||
default="",
|
||||
)
|
||||
parser.add_argument("--log-interval", help="interval to log. Default value is 5s", type=int, default=5)
|
||||
parser.add_argument(
|
||||
"--duration", help="how long the test continue. Default value is 24 hours", type=int, default=24 * 3600
|
||||
)
|
||||
parser.add_argument(
|
||||
"--messages-num", help="Messages number to send every time. Default value is 1000", type=int, default=1000
|
||||
)
|
||||
|
||||
args, _ = parser.parse_known_args()
|
||||
asyncio.run(
|
||||
main(
|
||||
log_file_name=args.log_file_name,
|
||||
log_interval=args.log_interval,
|
||||
duration=args.duration,
|
||||
messages_num=args.messages_num,
|
||||
)
|
||||
)
|
|
@ -25,14 +25,11 @@
|
|||
# --------------------------------------------------------------------------
|
||||
import os
|
||||
import pytest
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from devtools_testutils import test_proxy, add_general_regex_sanitizer
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def add_sanitizers(test_proxy):
|
||||
connection_string = os.environ.get("WEBPUBSUBCLIENT_CONNECTION_STRING", "WEBPUBSUBCLIENT_CONNECTION_STRING")
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
# coding: utf-8
|
||||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# -------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import pytest
|
||||
from devtools_testutils.aio import recorded_by_proxy_async
|
||||
from testcase_async import WebpubsubClientTestAsync, TEST_RESULT_ASYNC, on_group_message
|
||||
from testcase import WebpubsubClientPowerShellPreparer
|
||||
from azure.messaging.webpubsubclient.models import WebPubSubProtocolType
|
||||
|
||||
@pytest.mark.live_test_only
|
||||
class TestWebpubsubClientAutoConnectAsync(WebpubsubClientTestAsync):
|
||||
# auto_connect will be triggered if connection is dropped by accident and we disable recovery
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_auto_connect_async(self, webpubsubclient_connection_string):
|
||||
client = await self.create_client(
|
||||
connection_string=webpubsubclient_connection_string,
|
||||
protocol_type=WebPubSubProtocolType.JSON,
|
||||
message_retry_total=10,
|
||||
reconnect_retry_total=10,
|
||||
reconnect_retry_mode="fixed",
|
||||
reconnect_retry_backoff_factor=0.1,
|
||||
)
|
||||
name = "test_auto_connect_async"
|
||||
async with client:
|
||||
await asyncio.sleep(0.001) # wait for connection_id to be updated
|
||||
conn_id0 = client._connection_id
|
||||
group_name = name
|
||||
await client.subscribe("group-message", on_group_message)
|
||||
await client.join_group(group_name)
|
||||
await client._ws.sock.close(
|
||||
code=1001
|
||||
) # close the connection to trigger auto connect
|
||||
await asyncio.sleep(3) # wait for reconnect
|
||||
await client.send_to_group(group_name, name, "text")
|
||||
await asyncio.sleep(1) # wait for on_group_message to be called
|
||||
conn_id1 = client._connection_id
|
||||
assert conn_id0 is not None
|
||||
assert conn_id1 is not None
|
||||
assert conn_id0 != conn_id1
|
||||
assert name in TEST_RESULT_ASYNC
|
|
@ -57,7 +57,7 @@ class TestWebpubsubClientNoRecoveryNoReconnect(WebpubsubClientTest):
|
|||
)
|
||||
|
||||
with client:
|
||||
group_name = "test"
|
||||
group_name = "test_disable_recovery_and_autoconnect_send_concurrently"
|
||||
client.join_group(group_name)
|
||||
|
||||
def send(idx):
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
# coding: utf-8
|
||||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# -------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import pytest
|
||||
from devtools_testutils.aio import recorded_by_proxy_async
|
||||
from testcase import WebpubsubClientPowerShellPreparer
|
||||
from testcase_async import WebpubsubClientTestAsync, TEST_RESULT_ASYNC, on_group_message
|
||||
from azure.messaging.webpubsubclient.models import (
|
||||
WebPubSubProtocolType,
|
||||
SendMessageError,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.live_test_only
|
||||
class TestWebpubsubClientNoRecoveryNoReconnectAsync(WebpubsubClientTestAsync):
|
||||
# disable recovery and auto reconnect
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_disable_recovery_and_autoconnect_async(self, webpubsubclient_connection_string):
|
||||
client = await self.create_client(
|
||||
connection_string=webpubsubclient_connection_string,
|
||||
reconnect_retry_total=0,
|
||||
protocol_type=WebPubSubProtocolType.JSON,
|
||||
)
|
||||
name = "test_disable_recovery_and_autoconnect_async"
|
||||
async with client:
|
||||
group_name = name
|
||||
await client.subscribe("group-message", on_group_message)
|
||||
await client.join_group(group_name)
|
||||
await client._ws.session.close() # close connection
|
||||
with pytest.raises(SendMessageError):
|
||||
await client.send_to_group(group_name, name, "text")
|
||||
await asyncio.sleep(1) # wait for on_group_message to be called
|
||||
|
||||
assert name not in TEST_RESULT_ASYNC
|
||||
|
||||
# disable recovery and auto reconnect, then send message concurrently
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_disable_recovery_and_autoconnect_send_concurrently_async(
|
||||
self, webpubsubclient_connection_string
|
||||
):
|
||||
client = await self.create_client(
|
||||
connection_string=webpubsubclient_connection_string,
|
||||
reconnect_retry_total=0,
|
||||
message_retry_total=3,
|
||||
protocol_type=WebPubSubProtocolType.JSON,
|
||||
)
|
||||
|
||||
async with client:
|
||||
group_name = "test_disable_recovery_and_autoconnect_send_concurrently_async"
|
||||
await client.join_group(group_name)
|
||||
count = 10
|
||||
tasks = [client.send_to_group(group_name, "hello", "text") for _ in range(10)]
|
||||
await client._ws.session.close() # close connection
|
||||
for task in asyncio.as_completed(tasks):
|
||||
with pytest.raises(SendMessageError):
|
||||
await task
|
|
@ -0,0 +1,36 @@
|
|||
# coding: utf-8
|
||||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# -------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import pytest
|
||||
from devtools_testutils.aio import recorded_by_proxy_async
|
||||
from testcase import WebpubsubClientPowerShellPreparer
|
||||
from testcase_async import WebpubsubClientTestAsync, TEST_RESULT_ASYNC, on_group_message
|
||||
|
||||
|
||||
@pytest.mark.live_test_only
|
||||
class TestWebpubsubClientRecoveryAsync(WebpubsubClientTestAsync):
|
||||
# recovery will be triggered if connection is dropped by accident
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_recovery_async(self, webpubsubclient_connection_string):
|
||||
client = await self.create_client(connection_string=webpubsubclient_connection_string, message_retry_total=10)
|
||||
name = "test_recovery_async"
|
||||
async with client:
|
||||
await asyncio.sleep(0.001) # wait for connection_id to be updated
|
||||
conn_id0 = client._connection_id
|
||||
group_name = name
|
||||
await client.subscribe("group-message", on_group_message)
|
||||
await client.join_group(group_name)
|
||||
await client._ws.session.close() # close connection to trigger recovery
|
||||
await client.send_to_group(group_name, name, "text")
|
||||
conn_id1 = client._connection_id
|
||||
await asyncio.sleep(1) # wait for on_group_message to be called
|
||||
|
||||
assert name in TEST_RESULT_ASYNC
|
||||
assert conn_id0 is not None
|
||||
assert conn_id1 is not None
|
||||
assert conn_id0 == conn_id1
|
|
@ -16,7 +16,7 @@ class TestWebpubsubClientSendConcurrently(WebpubsubClientTest):
|
|||
def test_send_concurrently(self, webpubsubclient_connection_string):
|
||||
client = self.create_client(connection_string=webpubsubclient_connection_string)
|
||||
with client:
|
||||
group_name = "test"
|
||||
group_name = "test_send_concurrently"
|
||||
client.join_group(group_name)
|
||||
|
||||
def send(idx):
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
# coding: utf-8
|
||||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# -------------------------------------------------------------------------
|
||||
import pytest
|
||||
import asyncio
|
||||
from devtools_testutils.aio import recorded_by_proxy_async
|
||||
from testcase import WebpubsubClientPowerShellPreparer
|
||||
from testcase_async import WebpubsubClientTestAsync
|
||||
|
||||
|
||||
@pytest.mark.live_test_only
|
||||
class TestWebpubsubClientSendConcurrentlyAsync(WebpubsubClientTestAsync):
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_send_concurrently_async(self, webpubsubclient_connection_string):
|
||||
client = await self.create_client(connection_string=webpubsubclient_connection_string)
|
||||
async with client:
|
||||
group_name = "test_send_concurrently_async"
|
||||
await client.join_group(group_name)
|
||||
await asyncio.gather(*[client.send_to_group(group_name, f"hello_{idx}", "text") for idx in range(100)])
|
|
@ -17,6 +17,7 @@ from testcase import (
|
|||
from azure.messaging.webpubsubclient.models import (
|
||||
OnGroupDataMessageArgs,
|
||||
OpenClientError,
|
||||
SendMessageError,
|
||||
)
|
||||
|
||||
|
||||
|
@ -26,7 +27,7 @@ class TestWebpubsubClientSmoke(WebpubsubClientTest):
|
|||
@recorded_by_proxy
|
||||
def test_call_back_deadlock(self, webpubsubclient_connection_string):
|
||||
client = self.create_client(connection_string=webpubsubclient_connection_string)
|
||||
group_name = "test"
|
||||
group_name = "test_call_back_deadlock"
|
||||
|
||||
def on_group_message(msg: OnGroupDataMessageArgs):
|
||||
client.send_to_group(group_name, msg.data, "text", no_echo=True)
|
||||
|
@ -38,14 +39,14 @@ class TestWebpubsubClientSmoke(WebpubsubClientTest):
|
|||
client.send_to_group(group_name, "hello test_call_back_deadlock2", "text")
|
||||
client.send_to_group(group_name, "hello test_call_back_deadlock3", "text")
|
||||
# sleep to make sure the callback has enough time to execute before close
|
||||
time.sleep(0.001)
|
||||
time.sleep(1)
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy
|
||||
def test_context_manager(self, webpubsubclient_connection_string):
|
||||
client = self.create_client(connection_string=webpubsubclient_connection_string)
|
||||
with client:
|
||||
group_name = "test"
|
||||
group_name = "test_context_manager"
|
||||
client.join_group(group_name)
|
||||
client.send_to_group(group_name, "test_context_manager", "text")
|
||||
time.sleep(2.0)
|
||||
|
@ -63,16 +64,17 @@ class TestWebpubsubClientSmoke(WebpubsubClientTest):
|
|||
with client:
|
||||
# open client again after close
|
||||
client.subscribe("stopped", on_stop)
|
||||
assert client._is_connected()
|
||||
time.sleep(0.1)
|
||||
assert client.is_connected()
|
||||
client.close()
|
||||
time.sleep(1.0)
|
||||
assert client._is_connected()
|
||||
assert client.is_connected()
|
||||
|
||||
# remove stopped event and close again
|
||||
client.unsubscribe("stopped", on_stop)
|
||||
client.close()
|
||||
time.sleep(1.0)
|
||||
assert not client._is_connected()
|
||||
assert not client.is_connected()
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy
|
||||
|
@ -81,7 +83,7 @@ class TestWebpubsubClientSmoke(WebpubsubClientTest):
|
|||
with pytest.raises(OpenClientError):
|
||||
with client:
|
||||
client.open()
|
||||
assert not client._is_connected()
|
||||
assert not client.is_connected()
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy
|
||||
|
@ -89,17 +91,18 @@ class TestWebpubsubClientSmoke(WebpubsubClientTest):
|
|||
client = self.create_client(connection_string=webpubsubclient_connection_string)
|
||||
with client:
|
||||
client.close()
|
||||
assert not client._is_connected()
|
||||
assert not client.is_connected()
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy
|
||||
def test_send_event(self, webpubsubclient_connection_string):
|
||||
client = self.create_client(
|
||||
connection_string=webpubsubclient_connection_string, message_retry_total=0
|
||||
)
|
||||
client = self.create_client(connection_string=webpubsubclient_connection_string, message_retry_total=0)
|
||||
with client:
|
||||
# please register event handler in azure portal before run this test
|
||||
client.send_event("event", "test_send_event", "text")
|
||||
try:
|
||||
client.send_event("event", "test_send_event", "text")
|
||||
except SendMessageError as err:
|
||||
assert err.error_detail.name == "InternalServerError"
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy
|
||||
|
@ -116,7 +119,7 @@ class TestWebpubsubClientSmoke(WebpubsubClientTest):
|
|||
|
||||
with client:
|
||||
time.sleep(1) # make sure rejoin group is called
|
||||
client.send_to_group(group_name, "test_rejoin_group", "text")
|
||||
client.send_to_group(group_name, group_name, "text")
|
||||
time.sleep(1) # wait for on_group_message to be called
|
||||
assert assert_func(test_group_name)
|
||||
|
||||
|
@ -130,3 +133,16 @@ class TestWebpubsubClientSmoke(WebpubsubClientTest):
|
|||
test_group_name="test_disable_rejoin_group",
|
||||
assert_func=lambda x: x not in TEST_RESULT,
|
||||
)
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy
|
||||
def test_open_client_error(self):
|
||||
client = self.create_client(
|
||||
connection_string="Endpoint=https://myservice.webpubsub.azure.com;AccessKey=aaaaaaaaaaaaa;Version=1.0;",
|
||||
)
|
||||
start_time = time.time()
|
||||
with pytest.raises(OpenClientError) as err:
|
||||
with client:
|
||||
pass
|
||||
assert time.time() - start_time < client._start_timeout
|
||||
assert "During the process, an error occurred" in str(err)
|
||||
|
|
|
@ -0,0 +1,148 @@
|
|||
# coding: utf-8
|
||||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# -------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import time
|
||||
import pytest
|
||||
from devtools_testutils.aio import recorded_by_proxy_async
|
||||
from testcase import WebpubsubClientPowerShellPreparer
|
||||
from testcase_async import (
|
||||
WebpubsubClientTestAsync,
|
||||
on_group_message,
|
||||
TEST_RESULT_ASYNC,
|
||||
)
|
||||
from azure.messaging.webpubsubclient.models import (
|
||||
OnGroupDataMessageArgs,
|
||||
OpenClientError,
|
||||
SendMessageError,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.live_test_only
|
||||
class TestWebpubsubClientSmokeAsync(WebpubsubClientTestAsync):
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_call_back_deadlock_async(self, webpubsubclient_connection_string):
|
||||
client = await self.create_client(connection_string=webpubsubclient_connection_string)
|
||||
group_name = "test_call_back_deadlock_async"
|
||||
|
||||
async def on_group_message(msg: OnGroupDataMessageArgs):
|
||||
await client.send_to_group(group_name, msg.data, "text", no_echo=True)
|
||||
|
||||
async with client:
|
||||
await client.join_group(group_name)
|
||||
await client.subscribe("group-message", on_group_message)
|
||||
await client.send_to_group(group_name, "hello test_call_back_deadlock1", "text")
|
||||
await client.send_to_group(group_name, "hello test_call_back_deadlock2", "text")
|
||||
await client.send_to_group(group_name, "hello test_call_back_deadlock3", "text")
|
||||
# sleep to make sure the callback has enough time to execute before close
|
||||
await asyncio.sleep(1)
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_context_manager_async(self, webpubsubclient_connection_string):
|
||||
client = await self.create_client(connection_string=webpubsubclient_connection_string)
|
||||
async with client:
|
||||
group_name = "test_context_manager_async"
|
||||
await client.join_group(group_name)
|
||||
await client.send_to_group(group_name, "test_context_manager", "text")
|
||||
await asyncio.sleep(2.0)
|
||||
assert client._sequence_id.sequence_id > 0
|
||||
|
||||
# test on_stop
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_on_stop_async(self, webpubsubclient_connection_string):
|
||||
client = await self.create_client(connection_string=webpubsubclient_connection_string)
|
||||
|
||||
async def on_stop():
|
||||
await client.open()
|
||||
|
||||
async with client:
|
||||
# open client again after close
|
||||
await client.subscribe("stopped", on_stop)
|
||||
await asyncio.sleep(0.1)
|
||||
assert client.is_connected()
|
||||
await client.close()
|
||||
await asyncio.sleep(1.0)
|
||||
assert client.is_connected()
|
||||
|
||||
# remove stopped event and close again
|
||||
await client.unsubscribe("stopped", on_stop)
|
||||
await client.close()
|
||||
await asyncio.sleep(1.0)
|
||||
assert not client.is_connected()
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_duplicated_start_async(self, webpubsubclient_connection_string):
|
||||
client = await self.create_client(connection_string=webpubsubclient_connection_string)
|
||||
with pytest.raises(OpenClientError):
|
||||
async with client:
|
||||
await client.open()
|
||||
assert not client.is_connected()
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_duplicated_stop_async(self, webpubsubclient_connection_string):
|
||||
client = await self.create_client(connection_string=webpubsubclient_connection_string)
|
||||
async with client:
|
||||
await client.close()
|
||||
assert not client.is_connected()
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_send_event_async(self, webpubsubclient_connection_string):
|
||||
client = await self.create_client(connection_string=webpubsubclient_connection_string, message_retry_total=0)
|
||||
async with client:
|
||||
# please register event handler in azure portal before run this test
|
||||
try:
|
||||
await client.send_event("event", "test_send_event", "text")
|
||||
except SendMessageError as err:
|
||||
assert err.error_detail.name == "InternalServerError"
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_rejoin_group_async(self, webpubsubclient_connection_string):
|
||||
async def _test(enable_auto_rejoin, test_group_name, assert_func):
|
||||
client = await self.create_client(
|
||||
connection_string=webpubsubclient_connection_string,
|
||||
auto_rejoin_groups=enable_auto_rejoin,
|
||||
)
|
||||
group_name = test_group_name
|
||||
await client.subscribe("group-message", on_group_message)
|
||||
async with client:
|
||||
await client.join_group(group_name)
|
||||
|
||||
async with client:
|
||||
await asyncio.sleep(1) # make sure rejoin group is called
|
||||
await client.send_to_group(group_name, group_name, "text")
|
||||
await asyncio.sleep(1) # wait for on_group_message to be called
|
||||
assert assert_func(test_group_name)
|
||||
|
||||
await _test(
|
||||
enable_auto_rejoin=True,
|
||||
test_group_name="test_rejoin_group_async",
|
||||
assert_func=lambda x: x in TEST_RESULT_ASYNC,
|
||||
)
|
||||
await _test(
|
||||
enable_auto_rejoin=False,
|
||||
test_group_name="test_disable_rejoin_group_async",
|
||||
assert_func=lambda x: x not in TEST_RESULT_ASYNC,
|
||||
)
|
||||
|
||||
@WebpubsubClientPowerShellPreparer()
|
||||
@recorded_by_proxy_async
|
||||
async def test_open_client_error_async(self):
|
||||
client = await self.create_client(
|
||||
connection_string="Endpoint=https://myservice.webpubsub.azure.com;AccessKey=aaaaaaaaaaaaa;Version=1.0;",
|
||||
)
|
||||
start_time = time.time()
|
||||
with pytest.raises(OpenClientError) as err:
|
||||
async with client:
|
||||
await asyncio.sleep(0)
|
||||
assert time.time() - start_time < client._start_timeout
|
||||
assert "During the process, an error occurred" in str(err)
|
|
@ -0,0 +1,33 @@
|
|||
# coding: utf-8
|
||||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# --------------------------------------------------------------------------
|
||||
from typing import List
|
||||
from devtools_testutils import AzureRecordedTestCase, PowerShellPreparer
|
||||
from azure.messaging.webpubsubclient.aio import WebPubSubClient, WebPubSubClientCredential
|
||||
from azure.messaging.webpubsubclient.models import OnGroupDataMessageArgs
|
||||
from azure.messaging.webpubsubservice.aio import WebPubSubServiceClient
|
||||
|
||||
|
||||
class WebpubsubClientTestAsync(AzureRecordedTestCase):
|
||||
async def create_client(
|
||||
self,
|
||||
connection_string,
|
||||
hub: str = "Hub",
|
||||
roles: List[str] = ["webpubsub.joinLeaveGroup", "webpubsub.sendToGroup"],
|
||||
**kwargs,
|
||||
):
|
||||
service_client = WebPubSubServiceClient.from_connection_string(connection_string, hub)
|
||||
async def client_access_url_provider():
|
||||
return (await service_client.get_client_access_token(roles=roles))["url"]
|
||||
return WebPubSubClient(
|
||||
credential=WebPubSubClientCredential(client_access_url_provider),
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
TEST_RESULT_ASYNC = set()
|
||||
|
||||
async def on_group_message(msg: OnGroupDataMessageArgs):
|
||||
TEST_RESULT_ASYNC.add(msg.data)
|
Загрузка…
Ссылка в новой задаче