зеркало из
1
0
Форкнуть 0
* servicebusmessage for unit tests

* correct return types

* constructor tests

* full constructor test

* added test for abstract methods

* fix lint errors

* fix assert error

* edited tests

* edited codecov tool

---------

Co-authored-by: Varad Meru <vrdmr@users.noreply.github.com>
This commit is contained in:
hallvictoria 2023-11-14 11:30:02 -06:00 коммит произвёл GitHub
Родитель 717bf06843
Коммит b42035a884
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 309 добавлений и 58 удалений

Просмотреть файл

@ -38,4 +38,5 @@ omit =
*/venv/*
*/.venv/*
*/.env*/*
*/.vscode/*
*/.vscode/*
azure/functions/_abc.py

Просмотреть файл

@ -439,3 +439,117 @@ class OrchestrationContext(abc.ABC):
@abc.abstractmethod
def body(self) -> str:
pass
class ServiceBusMessage(abc.ABC):
@abc.abstractmethod
def get_body(self) -> typing.Union[str, bytes]:
pass
@property
@abc.abstractmethod
def content_type(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def correlation_id(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def dead_letter_source(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def delivery_count(self) -> typing.Optional[int]:
pass
@property
@abc.abstractmethod
def enqueued_time_utc(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def expires_at_utc(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def expiration_time(self) -> typing.Optional[datetime.datetime]:
"""(Deprecated, use expires_at_utc instead)"""
pass
@property
@abc.abstractmethod
def label(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def lock_token(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def message_id(self) -> str:
pass
@property
@abc.abstractmethod
def partition_key(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def reply_to(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def reply_to_session_id(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def scheduled_enqueue_time(self) -> typing.Optional[datetime.datetime]:
"""(Deprecated, use scheduled_enqueue_time_utc instead)"""
pass
@property
@abc.abstractmethod
def scheduled_enqueue_time_utc(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def sequence_number(self) -> typing.Optional[int]:
pass
@property
@abc.abstractmethod
def session_id(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def time_to_live(self) -> typing.Optional[datetime.timedelta]:
pass
@property
@abc.abstractmethod
def to(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def user_properties(self) -> typing.Dict[str, typing.Any]:
pass
@property
@abc.abstractmethod
def metadata(self) -> typing.Optional[typing.Dict[str, typing.Any]]:
pass

Просмотреть файл

@ -1,26 +1,39 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import abc
import datetime
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, Union
from . import _abc
class ServiceBusMessage(abc.ABC):
class ServiceBusMessage(_abc.ServiceBusMessage):
@abc.abstractmethod
def get_body(self) -> bytes:
"""Get the message body from ServiceBus
"""A ServiceBuss message object.
Returns:
--------
bytes
The ServiceBus message body in bytes form
"""
pass
:param body:
A string or bytes instance specifying the message body.
:param content_type:
An optional string specifying the content type.
:param correlation_id:
An optional string specifying the correlation id.
"""
def __init__(self, *,
body: Optional[Union[str, bytes]] = None,
content_type: Optional[str] = None,
correlation_id: Optional[str] = None) -> None:
self.__body = b''
self.__content_type = content_type
self.__correlation_id = correlation_id
if body is not None:
self.__set_body(body)
@property
@abc.abstractmethod
def content_type(self) -> Optional[str]:
"""Optionally describes the payload of the message,
with a descriptor following the format of RFC2045
@ -31,10 +44,9 @@ class ServiceBusMessage(abc.ABC):
If content type is set, returns a string.
Otherwise, returns None.
"""
pass
return self.__content_type
@property
@abc.abstractmethod
def correlation_id(self) -> Optional[str]:
"""Enables an application to specify a context for the message for the
purposes of correlation
@ -45,10 +57,9 @@ class ServiceBusMessage(abc.ABC):
If correlation id set, returns a string.
Otherwise, returns None.
"""
pass
return self.__correlation_id
@property
@abc.abstractmethod
def dead_letter_source(self) -> Optional[str]:
"""Only set in messages that have been dead-lettered and subsequently
auto-forwarded from the dead-letter queue to another entity.
@ -61,10 +72,9 @@ class ServiceBusMessage(abc.ABC):
If dead letter source is set, returns a string.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def delivery_count(self) -> Optional[int]:
"""Number of deliveries that have been attempted for this message.
The count is incremented when a message lock expires,
@ -77,10 +87,9 @@ class ServiceBusMessage(abc.ABC):
If delivery count is set, returns a string.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def enqueued_time_utc(self) -> Optional[datetime.datetime]:
"""The UTC instant at which the message has been accepted and stored
in the entity. This value can be used as an authoritative and neutral
@ -93,10 +102,9 @@ class ServiceBusMessage(abc.ABC):
If enqueued time utc is set, returns a datetime.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def expires_at_utc(self) -> Optional[datetime.datetime]:
"""The UTC instant at which the message is marked for removal and no
longer available for retrieval from the entity due to its expiration.
@ -109,16 +117,14 @@ class ServiceBusMessage(abc.ABC):
If expires at utc is set, returns a datetime.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def expiration_time(self) -> Optional[datetime.datetime]:
"""(Deprecated, use expires_at_utc instead)"""
pass
return None
@property
@abc.abstractmethod
def label(self) -> Optional[str]:
"""This property enables the application to indicate the purpose of
the message to the receiver in a standardized fashion, similar to an
@ -130,10 +136,9 @@ class ServiceBusMessage(abc.ABC):
If label is set, returns a string.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def lock_token(self) -> Optional[str]:
""" The lock token is a reference to the lock that is being held by
the broker in peek-lock receive mode. The token can be used to pin the
@ -147,10 +152,9 @@ class ServiceBusMessage(abc.ABC):
If local token is set, returns a string.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def message_id(self) -> str:
"""The message identifier is an application-defined value that
uniquely identifies the message and its payload.
@ -164,10 +168,9 @@ class ServiceBusMessage(abc.ABC):
str
The message identifier
"""
pass
return ""
@property
@abc.abstractmethod
def partition_key(self) -> Optional[str]:
""" For partitioned entities, setting this value enables assigning
related messages to the same internal partition, so that submission
@ -181,10 +184,9 @@ class ServiceBusMessage(abc.ABC):
If partition key is set, returns a string.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def reply_to(self) -> Optional[str]:
"""This optional and application-defined value is a standard way to
express a reply path to the receiver of the message. When a sender
@ -197,10 +199,9 @@ class ServiceBusMessage(abc.ABC):
If reply to is set, returns a string.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def reply_to_session_id(self) -> Optional[str]:
"""This value augments the ReplyTo information and specifies which
SessionId should be set for the reply when sent to the reply entity.
@ -211,16 +212,14 @@ class ServiceBusMessage(abc.ABC):
If reply to session id is set, returns a string.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def scheduled_enqueue_time(self) -> Optional[datetime.datetime]:
"""(Deprecated, use scheduled_enqueue_time_utc instead)"""
pass
return None
@property
@abc.abstractmethod
def scheduled_enqueue_time_utc(self) -> Optional[datetime.datetime]:
"""For messages that are only made available for retrieval after a
delay, this property defines the UTC instant at which the message
@ -233,10 +232,9 @@ class ServiceBusMessage(abc.ABC):
If scheduled enqueue time utc is set, returns a string.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def sequence_number(self) -> Optional[int]:
"""The sequence number is a unique 64-bit integer assigned to a message
as it is accepted and stored by the broker and functions as its true
@ -251,10 +249,9 @@ class ServiceBusMessage(abc.ABC):
If sequence number is set, returns an integer.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def session_id(self) -> Optional[str]:
"""For session-aware entities, this application-defined value
specifies the session affiliation of the message. Messages with the
@ -268,10 +265,9 @@ class ServiceBusMessage(abc.ABC):
If session id is set, returns a string.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def time_to_live(self) -> Optional[datetime.timedelta]:
""" This value is the relative duration after which the message
expires, starting from the instant the message has been accepted and
@ -287,10 +283,9 @@ class ServiceBusMessage(abc.ABC):
If time to live is set, returns a timedelta.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def to(self) -> Optional[str]:
""" This property is reserved for future use in routing scenarios and
currently ignored by the broker itself. Applications can use this
@ -303,10 +298,9 @@ class ServiceBusMessage(abc.ABC):
If the recipient is set, returns a string.
Otherwise, returns None.
"""
pass
return None
@property
@abc.abstractmethod
def user_properties(self) -> Dict[str, Any]:
"""Contains user defined message properties.
@ -316,10 +310,9 @@ class ServiceBusMessage(abc.ABC):
If user has set properties for the message, returns a dictionary.
If nothing is set, returns an empty dictionary.
"""
pass
return {}
@property
@abc.abstractmethod
def metadata(self) -> Optional[Dict[str, Any]]:
"""Getting read-only trigger metadata in a Python dictionary.
@ -334,4 +327,19 @@ class ServiceBusMessage(abc.ABC):
Dict[str, object]
Return the Python dictionary of trigger metadata
"""
pass
return None
def __set_body(self, body):
if isinstance(body, str):
body = body.encode('utf-8')
if not isinstance(body, (bytes, bytearray)):
raise TypeError(
f'response is expected to be either of '
f'str, bytes, or bytearray, got {type(body).__name__}')
self.__body = bytes(body)
def get_body(self) -> bytes:
"""Return message content as bytes."""
return self.__body

Просмотреть файл

@ -34,8 +34,9 @@ class ServiceBusMessage(azf_sbus.ServiceBusMessage):
session_id: Optional[str] = None,
time_to_live: Optional[datetime.timedelta] = None,
to: Optional[str] = None,
user_properties: Dict[str, object]) -> None:
user_properties: Dict[str, object]):
super().__init__(body=body, content_type=content_type,
correlation_id=correlation_id)
self.__body = body
self.__trigger_metadata = trigger_metadata
self.__content_type = content_type

Просмотреть файл

@ -4,7 +4,7 @@
from typing import Dict, List
import json
import unittest
from datetime import datetime, timedelta
from datetime import datetime, timedelta, date
import azure.functions as func
import azure.functions.servicebus as azf_sb
@ -38,6 +38,133 @@ class TestServiceBus(unittest.TestCase):
MOCKED_AZURE_PARTNER_ID = '6ceef68b-0794-45dd-bb2e-630748515552'
def test_servicebusmessage_initialize_without_args(self):
# given
expected_body = b""
expexceted_content_type = None
expected_correlation_id = None
# when
test_sb_message = func.ServiceBusMessage()
# then
assert expected_body == test_sb_message.get_body()
assert expexceted_content_type == test_sb_message.content_type
assert expected_correlation_id == test_sb_message.correlation_id
def test_servicebusmessage_initialize_all_arguments(self):
# given
expected_body: bytes = b"Body"
expected_content_type: str = "Content Type"
expected_correlation_id: str = "Correlation ID"
# when
test_sb_message = func.ServiceBusMessage(
body=expected_body,
content_type=expected_content_type,
correlation_id=expected_correlation_id
)
# then
assert expected_body == test_sb_message.get_body()
assert expected_content_type == test_sb_message.content_type
assert expected_correlation_id == test_sb_message.correlation_id
self.assertIsNone(test_sb_message.dead_letter_source)
self.assertIsNone(test_sb_message.delivery_count)
self.assertIsNone(test_sb_message.enqueued_time_utc)
self.assertIsNone(test_sb_message.expires_at_utc)
self.assertIsNone(test_sb_message.expiration_time)
self.assertIsNone(test_sb_message.label)
self.assertIsNone(test_sb_message.lock_token)
assert "" == test_sb_message.message_id
self.assertIsNone(test_sb_message.partition_key)
self.assertIsNone(test_sb_message.reply_to)
self.assertIsNone(test_sb_message.reply_to_session_id)
self.assertIsNone(test_sb_message.scheduled_enqueue_time)
self.assertIsNone(test_sb_message.scheduled_enqueue_time_utc)
self.assertIsNone(test_sb_message.sequence_number)
self.assertIsNone(test_sb_message.session_id)
self.assertIsNone(test_sb_message.time_to_live)
self.assertIsNone(test_sb_message.to)
self.assertDictEqual(test_sb_message.user_properties, {})
self.assertIsNone(test_sb_message.metadata)
def test_servicebus_message_initialize_all_args(self):
# given
body = "body"
trigger_metadata = "trigger metadata"
content_type = "content type"
correlation_id = "correlation id"
dead_letter_source = "dead letter source"
delivery_count = 1
enqueued_time_utc = date(2022, 5, 1)
expires_at_utc = date(2022, 5, 1)
label = "label"
lock_token = "lock token"
message_id = "message id"
partition_key = "partition key"
reply_to = "reply to"
reply_to_session_id = "reply to session id"
scheduled_enqueue_time_utc = date(2022, 5, 1)
sequence_number = 1
session_id = "session id"
time_to_live = timedelta(hours=1)
to = "to"
user_properties = {"user": "properties"}
# when
sb_message = azf_sb.ServiceBusMessage(
body=body,
trigger_metadata=trigger_metadata,
content_type=content_type,
correlation_id=correlation_id,
dead_letter_source=dead_letter_source,
delivery_count=delivery_count,
enqueued_time_utc=enqueued_time_utc,
expires_at_utc=expires_at_utc,
label=label,
lock_token=lock_token,
message_id=message_id,
partition_key=partition_key,
reply_to=reply_to,
reply_to_session_id=reply_to_session_id,
scheduled_enqueue_time_utc=scheduled_enqueue_time_utc,
sequence_number=sequence_number,
session_id=session_id,
time_to_live=time_to_live,
to=to,
user_properties=user_properties)
# then
self.assertEqual(sb_message.get_body(), body)
self.assertEqual(sb_message.content_type, content_type)
self.assertEqual(sb_message.correlation_id, correlation_id)
self.assertEqual(sb_message.dead_letter_source, dead_letter_source)
self.assertEqual(sb_message.delivery_count, delivery_count)
self.assertEqual(sb_message.enqueued_time_utc, enqueued_time_utc)
self.assertEqual(sb_message.expires_at_utc, expires_at_utc)
self.assertEqual(sb_message.label, label)
self.assertEqual(sb_message.lock_token, lock_token)
self.assertEqual(sb_message.message_id, message_id)
self.assertEqual(sb_message.partition_key, partition_key)
self.assertEqual(sb_message.reply_to, reply_to)
self.assertEqual(sb_message.reply_to_session_id, reply_to_session_id)
self.assertEqual(sb_message.scheduled_enqueue_time_utc,
scheduled_enqueue_time_utc)
self.assertEqual(sb_message.sequence_number, sequence_number)
self.assertEqual(sb_message.session_id, session_id)
self.assertEqual(sb_message.time_to_live, time_to_live)
self.assertEqual(sb_message.to, to)
self.assertEqual(sb_message.user_properties, user_properties)
def test_abstract_servicebus_message(self):
test_sb_message = func.ServiceBusMessage()
abstract_sb_message = func._abc.ServiceBusMessage
self.assertIsInstance(test_sb_message, abstract_sb_message)
with self.assertRaises(TypeError):
func._abc.ServiceBusMessage()
def test_servicebus_input_type(self):
check_input_type = (
azf_sb.ServiceBusMessageInConverter.check_input_type_annotation