Ensure the new eventhub parser backward compatible with the old eventhub extensions
This commit is contained in:
Родитель
10689c0d02
Коммит
c30b8beaa8
|
@ -105,12 +105,12 @@ class EventHubTriggerConverter(EventHubConverter,
|
|||
typing.List[_eventhub.EventHubEvent]]:
|
||||
data_type = data.type
|
||||
|
||||
if cls._is_cardinality_one(trigger_metadata):
|
||||
return cls.decode_single_event(data, trigger_metadata)
|
||||
|
||||
elif cls._is_cardinality_many(trigger_metadata):
|
||||
if cls._is_cardinality_many(trigger_metadata, data_type):
|
||||
return cls.decode_multiple_events(data, trigger_metadata)
|
||||
|
||||
elif cls._is_cardinality_one(trigger_metadata, data_type):
|
||||
return cls.decode_single_event(data, trigger_metadata)
|
||||
elif data_type == 'json': # decode json as single event by default
|
||||
return cls.decode_single_event(data, trigger_metadata)
|
||||
else:
|
||||
raise NotImplementedError(
|
||||
f'unsupported event data payload type: {data_type}')
|
||||
|
@ -154,9 +154,11 @@ class EventHubTriggerConverter(EventHubConverter,
|
|||
elif data.type == 'json':
|
||||
parsed_data = json.loads(data.value)
|
||||
|
||||
sys_props = trigger_metadata.get('SystemPropertiesArray')
|
||||
|
||||
parsed_sys_props = json.loads(sys_props.value)
|
||||
parsed_sys_props = {}
|
||||
if trigger_metadata.get('SystemPropertiesArray'):
|
||||
parsed_sys_props = (
|
||||
json.loads(trigger_metadata['SystemPropertiesArray'].value)
|
||||
)
|
||||
|
||||
if len(parsed_data) != len(parsed_sys_props):
|
||||
raise AssertionError('Number of bodies and metadata mismatched')
|
||||
|
@ -245,9 +247,17 @@ class EventHubTriggerConverter(EventHubConverter,
|
|||
return iothub_metadata
|
||||
|
||||
@classmethod
|
||||
def _is_cardinality_many(cls, trigger_metadata) -> bool:
|
||||
return 'SystemPropertiesArray' in trigger_metadata
|
||||
def _is_cardinality_many(cls, trigger_metadata, data_type) -> bool:
|
||||
# If the data_type is json and the json contains multiple objects
|
||||
# The SystemPropertiesArray should be set and it gets here
|
||||
return ('SystemPropertiesArray' in trigger_metadata
|
||||
or data_type == 'collection_string'
|
||||
or data_type == 'collection_bytes')
|
||||
|
||||
@classmethod
|
||||
def _is_cardinality_one(cls, trigger_metadata) -> bool:
|
||||
return 'SystemProperties' in trigger_metadata
|
||||
def _is_cardinality_one(cls, trigger_metadata, data_type) -> bool:
|
||||
# If the data_type is json and the json contains a single object
|
||||
# The SystemProperties should be set and it gets here
|
||||
return ('SystemProperties' in trigger_metadata
|
||||
or data_type == 'string'
|
||||
or data_type == 'bytes')
|
||||
|
|
|
@ -177,6 +177,41 @@ class TestEventHub(unittest.TestCase):
|
|||
result[1].iothub_metadata['connection-device-id'], 'MyTestDevice2'
|
||||
)
|
||||
|
||||
def test_bytes_without_system_properties(self):
|
||||
result = azf_eh.EventHubTriggerConverter.decode(
|
||||
data=self._generate_single_iothub_datum('bytes'),
|
||||
trigger_metadata={}
|
||||
)
|
||||
self.assertEqual(
|
||||
result.get_body().decode('utf-8'), '{"device-status": "good"}'
|
||||
)
|
||||
|
||||
def test_string_without_system_properties(self):
|
||||
result = azf_eh.EventHubTriggerConverter.decode(
|
||||
data=self._generate_single_iothub_datum('string'),
|
||||
trigger_metadata={}
|
||||
)
|
||||
self.assertEqual(
|
||||
result.get_body().decode('utf-8'), '{"device-status": "good"}'
|
||||
)
|
||||
|
||||
def test_json_without_system_properties(self):
|
||||
result = azf_eh.EventHubTriggerConverter.decode(
|
||||
data=self._generate_single_iothub_datum('json'),
|
||||
trigger_metadata={}
|
||||
)
|
||||
self.assertEqual(
|
||||
result.get_body().decode('utf-8'), '{"device-status": "good"}'
|
||||
)
|
||||
|
||||
def test_mismatch_number_of_bodies_in_metadata(self):
|
||||
# 2 events with no metadata
|
||||
with self.assertRaises(AssertionError):
|
||||
azf_eh.EventHubTriggerConverter.decode(
|
||||
data=self._generate_multiple_iothub_data('collection_string'),
|
||||
trigger_metadata={}
|
||||
)
|
||||
|
||||
def _generate_single_iothub_datum(self, datum_type='json'):
|
||||
datum = '{"device-status": "good"}'
|
||||
if datum_type == 'bytes':
|
||||
|
|
Загрузка…
Ссылка в новой задаче