# Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. from typing import Dict, List import json import unittest from datetime import datetime, timedelta, date import azure.functions as func import azure.functions.servicebus as azf_sb from azure.functions import meta from tests.utils.testutils import (CollectionBytes, CollectionString, CollectionSint64) class TestServiceBus(unittest.TestCase): MOCKED_CONTENT_TYPE = 'application/json' MOCKED_CORROLATION_ID = '87c66eaf88e84119b66a26278a7b4149' MOCKED_DEADLETTER_ERROR_DESCRIPTION = \ 'mocked_dead_letter_error_description' MOCKED_DEADLETTER_REASON = 'mocked_dead_letter_reason' MOCKED_DEADLETTER_SOURCE = 'mocked_dead_letter_source' MOCKED_DELIVERY_COUNT = 571 MOCKED_ENQUEUED_SEQUENCE_NUMBER = 15 MOCKED_ENQUEUE_TIME_UTC = datetime.utcnow() MOCKED_EXPIRY_AT_UTC = datetime.utcnow() MOCKED_LABEL = 'mocked_label' MOCKED_LOCKED_UNTIL = datetime.utcnow() MOCKED_LOCK_TOKEN = '87931fd2-39f4-415a-9fdc-adfdcbed3148' MOCKED_MESSAGE_ID = 'abcee18397398d93891830a0aac89eed' MOCKED_MESSAGE_ID_A = 'aaaaa18397398d93891830a0aac89eed' MOCKED_MESSAGE_ID_B = 'bbbbb18397398d93891830a0aac89eed' MOCKED_MESSAGE_ID_C = 'ccccc18397398d93891830a0aac89eed' MOCKED_PARTITION_KEY = 'mocked_partition_key' MOCKED_REPLY_TO = 'mocked_reply_to' MOCKED_REPLY_TO_SESSION_ID = 'mocked_reply_to_session_id' MOCKED_SCHEDULED_ENQUEUE_TIME_UTC = datetime.utcnow() MOCKED_SEQUENCE_NUMBER = 38291 MOCKED_SESSION_ID = 'mocked_session_id' MOCKED_STATE = 1 MOCKED_SUBJECT = 'mocked_subject' MOCKED_TIME_TO_LIVE = '11:22:33' MOCKED_TIME_TO_LIVE_TIMEDELTA = timedelta(hours=11, minutes=22, seconds=33) MOCKED_TO = 'mocked_to' MOCKED_TRANSACTION_PARTITION_KEY = 'mocked_transaction_partition_key' MOCKED_AZURE_PARTNER_ID = '6ceef68b-0794-45dd-bb2e-630748515552' def test_servicebusmessage_initialize_without_args(self): # given expected_body = b"" expexceted_content_type = None expected_correlation_id = None # when test_sb_message = func.ServiceBusMessage() # then assert expected_body == test_sb_message.get_body() assert expexceted_content_type == test_sb_message.content_type assert expected_correlation_id == test_sb_message.correlation_id def test_servicebusmessage_initialize_all_arguments(self): # given expected_body: bytes = b"Body" expected_content_type: str = "Content Type" expected_correlation_id: str = "Correlation ID" # when test_sb_message = func.ServiceBusMessage( body=expected_body, content_type=expected_content_type, correlation_id=expected_correlation_id ) # then assert expected_body == test_sb_message.get_body() assert expected_content_type == test_sb_message.content_type assert expected_correlation_id == test_sb_message.correlation_id self.assertDictEqual(test_sb_message.application_properties, {}) self.assertIsNone(test_sb_message.dead_letter_error_description) self.assertIsNone(test_sb_message.dead_letter_reason) self.assertIsNone(test_sb_message.dead_letter_source) self.assertIsNone(test_sb_message.delivery_count) self.assertIsNone(test_sb_message.enqueued_sequence_number) self.assertIsNone(test_sb_message.enqueued_time_utc) self.assertIsNone(test_sb_message.expires_at_utc) self.assertIsNone(test_sb_message.expiration_time) self.assertIsNone(test_sb_message.label) self.assertIsNone(test_sb_message.locked_until) self.assertIsNone(test_sb_message.lock_token) assert "" == test_sb_message.message_id self.assertIsNone(test_sb_message.partition_key) self.assertIsNone(test_sb_message.reply_to) self.assertIsNone(test_sb_message.reply_to_session_id) self.assertIsNone(test_sb_message.scheduled_enqueue_time) self.assertIsNone(test_sb_message.scheduled_enqueue_time_utc) self.assertIsNone(test_sb_message.sequence_number) self.assertIsNone(test_sb_message.session_id) self.assertIsNone(test_sb_message.state) self.assertIsNone(test_sb_message.subject) self.assertIsNone(test_sb_message.time_to_live) self.assertIsNone(test_sb_message.to) self.assertIsNone(test_sb_message.transaction_partition_key) self.assertDictEqual(test_sb_message.user_properties, {}) self.assertIsNone(test_sb_message.metadata) def test_servicebus_message_initialize_all_args(self): # given body = "body" trigger_metadata = "trigger metadata" application_properties = {"application": "properties"} content_type = "content type" correlation_id = "correlation id" dead_letter_error_description = "dead letter error description" dead_letter_reason = "dead letter reason" dead_letter_source = "dead letter source" delivery_count = 1 enqueued_sequence_number = 1 enqueued_time_utc = date(2022, 5, 1) expires_at_utc = date(2022, 5, 1) label = "label" locked_until = date(2022, 5, 1) lock_token = "lock token" message_id = "message id" partition_key = "partition key" reply_to = "reply to" reply_to_session_id = "reply to session id" scheduled_enqueue_time_utc = date(2022, 5, 1) sequence_number = 1 session_id = "session id" state = 1 subject = "subject" time_to_live = timedelta(hours=1) to = "to" transaction_partition_key = "transaction partition key" user_properties = {"user": "properties"} # when sb_message = azf_sb.ServiceBusMessage( body=body, trigger_metadata=trigger_metadata, application_properties=application_properties, content_type=content_type, correlation_id=correlation_id, dead_letter_error_description=dead_letter_error_description, dead_letter_reason=dead_letter_reason, dead_letter_source=dead_letter_source, delivery_count=delivery_count, enqueued_sequence_number=enqueued_sequence_number, enqueued_time_utc=enqueued_time_utc, expires_at_utc=expires_at_utc, label=label, locked_until=locked_until, lock_token=lock_token, message_id=message_id, partition_key=partition_key, reply_to=reply_to, reply_to_session_id=reply_to_session_id, scheduled_enqueue_time_utc=scheduled_enqueue_time_utc, sequence_number=sequence_number, session_id=session_id, state=state, subject=subject, time_to_live=time_to_live, to=to, transaction_partition_key=transaction_partition_key, user_properties=user_properties) # then self.assertEqual(sb_message.get_body(), body) self.assertEqual(sb_message.application_properties, application_properties) self.assertEqual(sb_message.content_type, content_type) self.assertEqual(sb_message.correlation_id, correlation_id) self.assertEqual(sb_message.dead_letter_error_description, dead_letter_error_description) self.assertEqual(sb_message.dead_letter_reason, dead_letter_reason) self.assertEqual(sb_message.dead_letter_source, dead_letter_source) self.assertEqual(sb_message.delivery_count, delivery_count) self.assertEqual(sb_message.enqueued_sequence_number, enqueued_sequence_number) self.assertEqual(sb_message.enqueued_time_utc, enqueued_time_utc) self.assertEqual(sb_message.expires_at_utc, expires_at_utc) self.assertEqual(sb_message.label, label) self.assertEqual(sb_message.locked_until, locked_until) self.assertEqual(sb_message.lock_token, lock_token) self.assertEqual(sb_message.message_id, message_id) self.assertEqual(sb_message.partition_key, partition_key) self.assertEqual(sb_message.reply_to, reply_to) self.assertEqual(sb_message.reply_to_session_id, reply_to_session_id) self.assertEqual(sb_message.scheduled_enqueue_time_utc, scheduled_enqueue_time_utc) self.assertEqual(sb_message.sequence_number, sequence_number) self.assertEqual(sb_message.session_id, session_id) self.assertEqual(sb_message.state, state) self.assertEqual(sb_message.subject, subject) self.assertEqual(sb_message.time_to_live, time_to_live) self.assertEqual(sb_message.to, to) self.assertEqual(sb_message.transaction_partition_key, transaction_partition_key) self.assertEqual(sb_message.user_properties, user_properties) def test_abstract_servicebus_message(self): test_sb_message = func.ServiceBusMessage() abstract_sb_message = func._abc.ServiceBusMessage self.assertIsInstance(test_sb_message, abstract_sb_message) with self.assertRaises(TypeError): func._abc.ServiceBusMessage() def test_servicebus_input_type(self): check_input_type = ( azf_sb.ServiceBusMessageInConverter.check_input_type_annotation ) # Should accept a single service bus message as trigger input type self.assertTrue(check_input_type(func.ServiceBusMessage)) # Should accept a multiple service bus message as trigger input type self.assertTrue(check_input_type(List[func.ServiceBusMessage])) # Should accept a message class derived from service bus class ServiceBusMessageChild(func.ServiceBusMessage): FOO = 'BAR' self.assertTrue(check_input_type(ServiceBusMessageChild)) # Should be false if a message type does not match expectation self.assertFalse(check_input_type(func.EventHubEvent)) self.assertFalse(check_input_type(str)) self.assertFalse(check_input_type(type(None))) def test_servicebus_output_type(self): check_output_type = ( azf_sb.ServiceBusMessageOutConverter.check_output_type_annotation ) # Should accept bytes and string as trigger output type self.assertTrue(check_output_type(bytes)) self.assertTrue(check_output_type(str)) # Should reject if attempt to send a service bus message out self.assertFalse(check_output_type(func.ServiceBusMessage)) # Should be false if a message type does not match expectation self.assertFalse(check_output_type(func.EventGridEvent)) self.assertFalse(check_output_type(type(None))) def test_servicebus_data(self): servicebus_msg = azf_sb.ServiceBusMessageInConverter.decode( data=self._generate_single_servicebus_data(), trigger_metadata=self._generate_single_trigger_metadata()) servicebus_data = servicebus_msg.get_body().decode('utf-8') self.assertEqual(servicebus_data, json.dumps({"lucky_number": 23})) def test_servicebus_non_existing_property(self): # The function should not fail even when property does not work msg = azf_sb.ServiceBusMessageInConverter.decode( data=self._generate_single_servicebus_data(), trigger_metadata={ 'MessageId': meta.Datum(self.MOCKED_MESSAGE_ID, 'string'), 'UserProperties': meta.Datum('{ "UserId": 1 }', 'json') } ) # Property that are not passed from extension should be None self.assertIsNone(msg.content_type) # Message id should always be available self.assertEqual(msg.message_id, self.MOCKED_MESSAGE_ID) # User property should always be available self.assertEqual(msg.user_properties['UserId'], 1) def test_servicebus_properties(self): # SystemProperties in metadata should propagate to class properties msg = azf_sb.ServiceBusMessageInConverter.decode( data=meta.Datum(b'body_bytes', 'bytes'), trigger_metadata=self._generate_single_trigger_metadata()) self.assertEqual(msg.get_body(), b'body_bytes') # Test individual ServiceBus properties respectively self.assertEqual(msg.application_properties, {'application': 'value'}) self.assertEqual(msg.content_type, self.MOCKED_CONTENT_TYPE) self.assertEqual(msg.correlation_id, self.MOCKED_CORROLATION_ID) self.assertEqual(msg.dead_letter_error_description, self.MOCKED_DEADLETTER_ERROR_DESCRIPTION) self.assertEqual(msg.dead_letter_reason, self.MOCKED_DEADLETTER_REASON) self.assertEqual(msg.dead_letter_source, self.MOCKED_DEADLETTER_SOURCE) self.assertEqual(msg.enqueued_sequence_number, self.MOCKED_ENQUEUED_SEQUENCE_NUMBER) self.assertEqual(msg.enqueued_time_utc, self.MOCKED_ENQUEUE_TIME_UTC) self.assertEqual(msg.expires_at_utc, self.MOCKED_EXPIRY_AT_UTC) self.assertEqual(msg.expiration_time, self.MOCKED_EXPIRY_AT_UTC) self.assertEqual(msg.label, self.MOCKED_LABEL) self.assertEqual(msg.locked_until, self.MOCKED_LOCKED_UNTIL) self.assertEqual(msg.lock_token, self.MOCKED_LOCK_TOKEN) self.assertEqual(msg.message_id, self.MOCKED_MESSAGE_ID) self.assertEqual(msg.partition_key, self.MOCKED_PARTITION_KEY) self.assertEqual(msg.reply_to, self.MOCKED_REPLY_TO) self.assertEqual(msg.reply_to_session_id, self.MOCKED_REPLY_TO_SESSION_ID) self.assertEqual(msg.scheduled_enqueue_time, self.MOCKED_SCHEDULED_ENQUEUE_TIME_UTC) self.assertEqual(msg.scheduled_enqueue_time_utc, self.MOCKED_SCHEDULED_ENQUEUE_TIME_UTC) self.assertEqual(msg.session_id, self.MOCKED_SESSION_ID) self.assertEqual(msg.state, self.MOCKED_STATE) self.assertEqual(msg.subject, self.MOCKED_SUBJECT) self.assertEqual(msg.time_to_live, self.MOCKED_TIME_TO_LIVE_TIMEDELTA) self.assertEqual(msg.to, self.MOCKED_TO) self.assertEqual(msg.transaction_partition_key, self.MOCKED_TRANSACTION_PARTITION_KEY) self.assertDictEqual(msg.user_properties, { '$AzureWebJobsParentId': self.MOCKED_AZURE_PARTNER_ID, 'x-opt-enqueue-sequence-number': 0 }) def test_servicebus_metadata(self): # Trigger metadata should contains all the essential information # about this service bus message servicebus_msg = azf_sb.ServiceBusMessageInConverter.decode( data=self._generate_single_servicebus_data(), trigger_metadata=self._generate_single_trigger_metadata()) # Datetime should be in iso8601 string instead of datetime object metadata_dict = servicebus_msg.metadata self.assertGreaterEqual(metadata_dict.items(), { 'DeliveryCount': self.MOCKED_DELIVERY_COUNT, 'LockToken': self.MOCKED_LOCK_TOKEN, 'ExpiresAtUtc': self.MOCKED_EXPIRY_AT_UTC.isoformat(), 'EnqueuedTimeUtc': self.MOCKED_ENQUEUE_TIME_UTC.isoformat(), 'MessageId': self.MOCKED_MESSAGE_ID, 'ContentType': self.MOCKED_CONTENT_TYPE, 'SequenceNumber': self.MOCKED_SEQUENCE_NUMBER, 'Label': self.MOCKED_LABEL, 'sys': { 'MethodName': 'ServiceBusSMany', 'UtcNow': '2020-06-18T05:39:12.2860411Z', 'RandGuid': 'bb38deae-cc75-49f2-89f5-96ec6eb857db' } }.items()) def test_servicebus_should_not_override_metadata(self): # SystemProperties in metadata should propagate to class properties servicebus_msg = azf_sb.ServiceBusMessageInConverter.decode( data=self._generate_single_servicebus_data(), trigger_metadata=self._generate_single_trigger_metadata()) # The content_type trigger field should be set self.assertEqual(servicebus_msg.content_type, 'application/json') # The metadata field should also be set self.assertEqual(servicebus_msg.metadata['ContentType'], 'application/json') # Now we change the metadata field # The trigger property should still remain the same servicebus_msg.metadata['ContentType'] = 'text/plain' self.assertEqual(servicebus_msg.content_type, 'application/json') def test_multiple_servicebus_trigger(self): # When cardinality is turned on to 'many', metadata should contain # information for all messages servicebus_msgs = azf_sb.ServiceBusMessageInConverter.decode( data=self._generate_multiple_service_bus_data(), trigger_metadata=self._generate_multiple_trigger_metadata() ) # The decoding result should contain a list of message self.assertEqual(len(servicebus_msgs), 3) def test_multiple_servicebus_trigger_non_existing_properties(self): servicebus_msgs = azf_sb.ServiceBusMessageInConverter.decode( data=self._generate_multiple_service_bus_data(), trigger_metadata={ 'MessageIdArray': meta.Datum(type='collection_string', value=CollectionString([ self.MOCKED_MESSAGE_ID, self.MOCKED_MESSAGE_ID, self.MOCKED_MESSAGE_ID ])), 'UserPropertiesArray': meta.Datum(type='json', value='''[{ "UserId": 1 }, { "UserId": 2 }, { "UserId": 3 }] ''') } ) # Non existing properties should return None self.assertIsNone(servicebus_msgs[0].content_type) self.assertIsNone(servicebus_msgs[1].content_type) self.assertIsNone(servicebus_msgs[2].content_type) # Message Id should always be available self.assertEqual(servicebus_msgs[0].message_id, self.MOCKED_MESSAGE_ID) self.assertEqual(servicebus_msgs[1].message_id, self.MOCKED_MESSAGE_ID) self.assertEqual(servicebus_msgs[2].message_id, self.MOCKED_MESSAGE_ID) # User properties should always be available self.assertEqual(servicebus_msgs[0].user_properties['UserId'], 1) self.assertEqual(servicebus_msgs[1].user_properties['UserId'], 2) self.assertEqual(servicebus_msgs[2].user_properties['UserId'], 3) def test_multiple_servicebus_trigger_properties(self): # When cardinality is turned on to 'many', metadata should contain # information for all messages servicebus_msgs = azf_sb.ServiceBusMessageInConverter.decode( data=self._generate_multiple_service_bus_data(), trigger_metadata=self._generate_multiple_trigger_metadata() ) expected_bodies: List[str] = [ json.dumps({"lucky_number": 23}), json.dumps({"lucky_number": 34}), json.dumps({"lucky_number": 45}), ] expected_message_ids: List[int] = [ self.MOCKED_MESSAGE_ID_A, self.MOCKED_MESSAGE_ID_B, self.MOCKED_MESSAGE_ID_C ] for i in range(len(servicebus_msgs)): msg = servicebus_msgs[i] body_data = msg.get_body().decode('utf-8') self.assertEqual(body_data, expected_bodies[i]) self.assertDictEqual(msg.application_properties, {"application": "value"}) self.assertEqual(msg.content_type, self.MOCKED_CONTENT_TYPE) self.assertEqual(msg.correlation_id, self.MOCKED_CORROLATION_ID) self.assertEqual(msg.dead_letter_error_description, self.MOCKED_DEADLETTER_ERROR_DESCRIPTION) self.assertEqual(msg.dead_letter_reason, self.MOCKED_DEADLETTER_REASON) self.assertEqual(msg.dead_letter_source, self.MOCKED_DEADLETTER_SOURCE) self.assertEqual(msg.enqueued_sequence_number, self.MOCKED_ENQUEUED_SEQUENCE_NUMBER) self.assertEqual(msg.enqueued_time_utc, self.MOCKED_ENQUEUE_TIME_UTC) self.assertEqual(msg.expires_at_utc, self.MOCKED_EXPIRY_AT_UTC) self.assertEqual(msg.expiration_time, self.MOCKED_EXPIRY_AT_UTC) self.assertEqual(msg.label, self.MOCKED_LABEL) self.assertEqual(msg.locked_until, self.MOCKED_LOCKED_UNTIL) self.assertEqual(msg.lock_token, self.MOCKED_LOCK_TOKEN) self.assertEqual(msg.message_id, expected_message_ids[i]) self.assertEqual(msg.partition_key, self.MOCKED_PARTITION_KEY) self.assertEqual(msg.reply_to, self.MOCKED_REPLY_TO) self.assertEqual(msg.reply_to_session_id, self.MOCKED_REPLY_TO_SESSION_ID) self.assertEqual(msg.scheduled_enqueue_time, self.MOCKED_SCHEDULED_ENQUEUE_TIME_UTC) self.assertEqual(msg.scheduled_enqueue_time_utc, self.MOCKED_SCHEDULED_ENQUEUE_TIME_UTC) self.assertEqual(msg.session_id, self.MOCKED_SESSION_ID) self.assertEqual(msg.state, self.MOCKED_STATE) self.assertEqual(msg.subject, self.MOCKED_SUBJECT) self.assertEqual(msg.time_to_live, self.MOCKED_TIME_TO_LIVE_TIMEDELTA) self.assertEqual(msg.to, self.MOCKED_TO) self.assertEqual(msg.transaction_partition_key, self.MOCKED_TRANSACTION_PARTITION_KEY) self.assertDictEqual(msg.user_properties, { '$AzureWebJobsParentId': self.MOCKED_AZURE_PARTNER_ID, 'x-opt-enqueue-sequence-number': 0 }) def test_servicebus_message_out_converter_encode_str(self): data = "dummy_string" result = azf_sb.ServiceBusMessageOutConverter.encode( obj=data, expected_type=None) self.assertEqual(result.type, "string") self.assertEqual(result.python_value, data) def test_servicebus_message_out_converter_encode_bytes(self): data = b"dummy_bytes" result = azf_sb.ServiceBusMessageOutConverter.encode( obj=data, expected_type=None) self.assertEqual(result.type, "bytes") self.assertEqual(result.python_value, data) def _generate_single_servicebus_data(self) -> meta.Datum: return meta.Datum(value=json.dumps({ 'lucky_number': 23 }), type='json') def _generate_multiple_service_bus_data(self) -> meta.Datum: return meta.Datum(value=json.dumps([ {'lucky_number': 23}, {'lucky_number': 34}, {'lucky_number': 45} ]), type='json') def _generate_single_trigger_metadata(self) -> Dict[str, meta.Datum]: """Generate a single ServiceBus message following https://docs.microsoft.com/en-us/azure/service-bus-messaging/ service-bus-messages-payloads """ mocked_metadata: Dict[str, meta.Datum] = { 'ContentType': meta.Datum( self.MOCKED_CONTENT_TYPE, 'string' ), 'CorrelationId': meta.Datum( self.MOCKED_CORROLATION_ID, 'string' ), 'DeadLetterErrorDescription': meta.Datum( self.MOCKED_DEADLETTER_ERROR_DESCRIPTION, 'string' ), 'DeadLetterReason': meta.Datum( self.MOCKED_DEADLETTER_REASON, 'string' ), 'DeadLetterSource': meta.Datum( self.MOCKED_DEADLETTER_SOURCE, 'string' ), 'DeliveryCount': meta.Datum( self.MOCKED_DELIVERY_COUNT, 'int' ), 'EnqueuedSequenceNumber': meta.Datum( self.MOCKED_ENQUEUED_SEQUENCE_NUMBER, 'int' ), 'EnqueuedTimeUtc': meta.Datum( self.MOCKED_ENQUEUE_TIME_UTC.isoformat(), 'string' ), 'ExpiresAtUtc': meta.Datum( self.MOCKED_EXPIRY_AT_UTC.isoformat(), 'string' ), # 'ForcePersistence' not exposed yet, requires gRPC boolean passing 'Label': meta.Datum( self.MOCKED_LABEL, 'string' ), 'LockedUntil': meta.Datum( self.MOCKED_LOCKED_UNTIL.isoformat(), 'string' ), 'LockToken': meta.Datum( self.MOCKED_LOCK_TOKEN, 'string' ), 'MessageId': meta.Datum( self.MOCKED_MESSAGE_ID, 'string' ), 'PartitionKey': meta.Datum( self.MOCKED_PARTITION_KEY, 'string' ), 'ReplyTo': meta.Datum( self.MOCKED_REPLY_TO, 'string' ), 'ReplyToSessionId': meta.Datum( self.MOCKED_REPLY_TO_SESSION_ID, 'string' ), 'ScheduledEnqueueTimeUtc': meta.Datum( self.MOCKED_SCHEDULED_ENQUEUE_TIME_UTC.isoformat(), 'string' ), 'SequenceNumber': meta.Datum( self.MOCKED_SEQUENCE_NUMBER, 'int' ), 'SessionId': meta.Datum( self.MOCKED_SESSION_ID, 'string' ), 'State': meta.Datum( self.MOCKED_STATE, 'int' ), 'Subject': meta.Datum( self.MOCKED_SUBJECT, 'string' ), 'TimeToLive': meta.Datum( self.MOCKED_TIME_TO_LIVE, 'string' ), 'To': meta.Datum( self.MOCKED_TO, 'string' ), 'TransactionPartitionKey': meta.Datum( self.MOCKED_TRANSACTION_PARTITION_KEY, 'string' ) } mocked_metadata['MessageReceiver'] = meta.Datum(type='json', value=''' { "RegisteredPlugins": [], "ReceiveMode": 0, "PrefetchCount": 0, "LastPeekedSequenceNumber": 0, "Path": "testqueue", "OperationTimeout": "00:01:00", "ServiceBusConnection": { "Endpoint": "sb://python-worker-36-sbns.servicebus.win.net", "OperationTimeout": "00:01:00", "RetryPolicy": { "MinimalBackoff": "00:00:00", "MaximumBackoff": "00:00:30", "DeltaBackoff": "00:00:03", "MaxRetryCount": 5, "IsServerBusy": false, "ServerBusyExceptionMessage": null }, "TransportType": 0, "TokenProvider": {} }, "IsClosedOrClosing": false, "ClientId": "MessageReceiver1testqueue", "RetryPolicy": { "MinimalBackoff": "00:00:00", "MaximumBackoff": "00:00:30", "DeltaBackoff": "00:00:03", "MaxRetryCount": 5, "IsServerBusy": false, "ServerBusyExceptionMessage": null } }''') mocked_metadata['ApplicationProperties'] = ( meta.Datum(type='json', value=''' { "application": "value" } ''')) mocked_metadata['UserProperties'] = meta.Datum(type='json', value=''' { "$AzureWebJobsParentId": "6ceef68b-0794-45dd-bb2e-630748515552", "x-opt-enqueue-sequence-number": 0 }''') mocked_metadata['sys'] = meta.Datum(type='json', value=''' { "MethodName": "ServiceBusSMany", "UtcNow": "2020-06-18T05:39:12.2860411Z", "RandGuid": "bb38deae-cc75-49f2-89f5-96ec6eb857db" } ''') return mocked_metadata def _generate_multiple_trigger_metadata(self) -> Dict[str, meta.Datum]: """Generate a metadatum containing 3 service bus messages which can be distingushed by enqueued_sequence_number """ sb_a = self._generate_single_trigger_metadata() sb_b = self._generate_single_trigger_metadata() sb_c = self._generate_single_trigger_metadata() sb_a['MessageId'] = meta.Datum(self.MOCKED_MESSAGE_ID_A, 'string') sb_b['MessageId'] = meta.Datum(self.MOCKED_MESSAGE_ID_B, 'string') sb_c['MessageId'] = meta.Datum(self.MOCKED_MESSAGE_ID_C, 'string') combine_from = lambda key, et: self._zip(key, et, sb_a, sb_b, sb_c) mocked_metadata = { 'ApplicationPropertiesArray': combine_from( 'ApplicationProperties', 'json' ), 'ContentTypeArray': combine_from( 'ContentType', 'collection_string' ), 'CorrelationIdArray': combine_from( 'CorrelationId', 'collection_string' ), 'DeadLetterErrorDescriptionArray': combine_from( 'DeadLetterErrorDescription', 'collection_string' ), 'DeadLetterReasonArray': combine_from( 'DeadLetterReason', 'collection_string' ), 'DeadLetterSourceArray': combine_from( 'DeadLetterSource', 'collection_string' ), 'EnqueuedSequenceNumberArray': combine_from( 'EnqueuedSequenceNumber', 'collection_sint64' ), 'EnqueuedTimeUtcArray': combine_from( 'EnqueuedTimeUtc', 'json' ), 'ExpiresAtUtcArray': combine_from( 'ExpiresAtUtc', 'json' ), 'LabelArray': combine_from( 'Label', 'collection_string' ), 'LockedUntilArray': combine_from( 'LockedUntil', 'json' ), 'LockTokenArray': combine_from( 'LockToken', 'collection_string' ), 'MessageIdArray': combine_from( 'MessageId', 'collection_string' ), 'PartitionKeyArray': combine_from( 'PartitionKey', 'collection_string' ), 'ReplyToArray': combine_from( 'ReplyTo', 'collection_string' ), 'ReplyToSessionIdArray': combine_from( 'ReplyToSessionId', 'collection_string' ), 'ScheduledEnqueueTimeUtcArray': combine_from( 'ScheduledEnqueueTimeUtc', 'collection_string' ), 'SessionIdArray': combine_from( 'SessionId', 'collection_string' ), 'SequenceNumberArray': combine_from( 'SequenceNumber', 'collection_sint64' ), 'StateArray': combine_from( 'State', 'collection_sint64' ), 'SubjectArray': combine_from( 'Subject', 'collection_string' ), 'TimeToLiveArray': combine_from( 'TimeToLive', 'collection_string' ), 'ToArray': combine_from( 'To', 'collection_string' ), 'TransactionPartitionKeyArray': combine_from( 'TransactionPartitionKey', 'collection_string' ), 'UserPropertiesArray': combine_from( 'UserProperties', 'json' ) } return mocked_metadata def _zip(self, key: str, expected_type: str, *args: List[Dict[str, meta.Datum]]) -> meta.Datum: """Combining multiple metadata into one: string -> collection_string bytes -> collection_bytes int -> collection_sint64 sint64 -> collection_sint64 json -> json (with array in it) """ convertible = { 'collection_string': CollectionString, 'collection_bytes': CollectionBytes, 'collection_sint64': CollectionSint64 } datum_type = args[0][key].type if expected_type in convertible.keys(): return meta.Datum( value=convertible[expected_type]([d[key].value for d in args]), type=expected_type ) elif expected_type == 'json': if datum_type == 'json': value = json.dumps([json.loads(d[key].value) for d in args]) else: value = json.dumps([d[key].value for d in args]) return meta.Datum( value=value, type='json' ) else: raise NotImplementedError(f'Unknown convertion {key}: ' f'{datum_type} -> {expected_type}')