173 строки
6.6 KiB
Python
173 строки
6.6 KiB
Python
# Copyright (c) Microsoft Corporation. All rights reserved.
|
|
# Licensed under the MIT License.
|
|
import time
|
|
import unittest
|
|
import uuid
|
|
|
|
import requests
|
|
|
|
from tests.utils import testutils
|
|
|
|
|
|
class TestEventGridFunctions(testutils.WebHostTestCase):
|
|
|
|
@classmethod
|
|
def get_script_dir(cls):
|
|
return testutils.E2E_TESTS_FOLDER / 'eventgrid_functions'
|
|
|
|
def eventgrid_webhook_request(self, meth, funcname, *args, **kwargs):
|
|
request_method = getattr(requests, meth.lower())
|
|
url = f'{self.webhost._addr}/runtime/webhooks/eventgrid'
|
|
params = dict(kwargs.pop('params', {}))
|
|
params['functionName'] = funcname
|
|
if 'code' not in params:
|
|
params['code'] = 'testSystemKey'
|
|
headers = dict(kwargs.pop('headers', {}))
|
|
headers['aeg-event-type'] = 'Notification'
|
|
return request_method(url, *args, params=params, headers=headers,
|
|
**kwargs)
|
|
|
|
@unittest.skip("Run locally. Running on Azure fails with 401/403 as the"
|
|
"host does not pick up the SecretKey from the"
|
|
"azure_functions_worker.testutils.py.SECRETS_TEMPLATE and"
|
|
"because of which we cannot test eventGrid webhook"
|
|
"invocation correctly.")
|
|
def test_eventgrid_trigger(self):
|
|
"""test event_grid trigger
|
|
|
|
This test calls the eventgrid_trigger function, sends in `data` as body
|
|
to the webhook for eventgrid. Once the event is received, the function
|
|
writes the data to the blob store.
|
|
|
|
Then get_eventgrid_triggered gets called (httpTrigger) and takes blob
|
|
input binding, reading the previously written text in blob store
|
|
`python-worker-tests/test-eventgrid-triggered.txt`, and then we validate
|
|
that the written text matches the one passed to the eventgrid trigger.
|
|
"""
|
|
data = [{
|
|
"topic": "test-topic",
|
|
"subject": "test-subject",
|
|
"eventType": "Microsoft.Storage.BlobCreated",
|
|
"eventTime": "2018-01-01T00:00:00.000000123Z",
|
|
"id": str(uuid.uuid4()),
|
|
"data": {
|
|
"api": "PutBlockList",
|
|
"clientRequestId": "2c169f2f-7b3b-4d99-839b-c92a2d25801b",
|
|
"requestId": "44d4f022-001e-003c-466b-940cba000000",
|
|
"eTag": "0x8D562831044DDD0",
|
|
"contentType": "application/octet-stream",
|
|
"contentLength": 2248,
|
|
"blobType": "BlockBlob",
|
|
"ur1": "foo",
|
|
"sequencer": "000000000000272D000000000003D60F",
|
|
"storageDiagnostics": {
|
|
"batchId": "b4229b3a-4d50-4ff4-a9f2-039ccf26efe9"
|
|
}
|
|
},
|
|
"dataVersion": "",
|
|
"metadataVersion": "1"
|
|
}]
|
|
|
|
r = self.eventgrid_webhook_request('POST', 'eventgrid_trigger',
|
|
json=data)
|
|
self.assertEqual(r.status_code, 202)
|
|
|
|
max_retries = 10
|
|
|
|
for try_no in range(max_retries):
|
|
# Allow trigger to fire.
|
|
time.sleep(2)
|
|
|
|
try:
|
|
# Check that the trigger has fired.
|
|
r = self.webhost.request('GET', 'get_eventgrid_triggered')
|
|
self.assertEqual(r.status_code, 200)
|
|
|
|
response = r.json()
|
|
self.assertLessEqual(response.items(), data[0].items())
|
|
except AssertionError:
|
|
if try_no == max_retries - 1:
|
|
raise
|
|
else:
|
|
break
|
|
|
|
def test_eventgrid_output_binding(self):
|
|
"""test event_grid output binding
|
|
|
|
This test needs three functions to work.
|
|
1. `eventgrid_output_binding`
|
|
2. `eventgrid_output_binding_message_to_blobstore`
|
|
3. `eventgrid_output_binding_success`
|
|
|
|
This test calls the eventgrid_output_binding function, sends in a unique
|
|
uuid as `data` in the body to the httpTrigger which sends in that value
|
|
in the eventGrid output data. The eventGrid topic is configured to
|
|
send the event to a storage queue.
|
|
|
|
The second function (`eventgrid_output_binding_message_to_blobstore`)
|
|
reads from that storage queue and puts into a blob store.
|
|
|
|
The third function (`eventgrid_output_binding_success`) reads the
|
|
text from the blob store and compares with the expected result. The
|
|
unique uuid should confirm if the message went through correctly to
|
|
EventGrid and came back as a blob.
|
|
"""
|
|
|
|
test_uuid = uuid.uuid4().__str__()
|
|
expected_response = "Sent event with subject: {}, id: {}, data: {}, " \
|
|
"event_type: {} to EventGrid!".format(
|
|
"test-subject", "test-id",
|
|
f"{{'test_uuid': '{test_uuid}'}}",
|
|
"test-event-1")
|
|
expected_final_data = {
|
|
'id': 'test-id', 'subject': 'test-subject', 'dataVersion': '1.0',
|
|
'eventType': 'test-event-1',
|
|
'data': {'test_uuid': test_uuid}
|
|
}
|
|
|
|
r = self.webhost.request('GET', 'eventgrid_output_binding',
|
|
params={'test_uuid': test_uuid})
|
|
self.assertEqual(r.status_code, 200)
|
|
response = r.text
|
|
|
|
self.assertEqual(expected_response, response)
|
|
|
|
max_retries = 10
|
|
for try_no in range(max_retries):
|
|
# Allow trigger to fire.
|
|
time.sleep(2)
|
|
|
|
try:
|
|
# Check that the trigger has fired.
|
|
r = self.webhost.request('GET',
|
|
'eventgrid_output_binding_success')
|
|
self.assertEqual(r.status_code, 200)
|
|
response = r.json()
|
|
|
|
# list of fields to check are limited as other fields contain
|
|
# datetime or other uncertain values
|
|
for f in ['data', 'id', 'eventType', 'subject', 'dataVersion']:
|
|
self.assertEqual(response[f], expected_final_data[f])
|
|
|
|
except AssertionError:
|
|
if try_no == max_retries - 1:
|
|
raise
|
|
else:
|
|
break
|
|
|
|
|
|
class TestEventGridFunctionsStein(TestEventGridFunctions):
|
|
|
|
@classmethod
|
|
def get_script_dir(cls):
|
|
return testutils.E2E_TESTS_FOLDER / 'eventgrid_functions' / \
|
|
'eventgrid_functions_stein'
|
|
|
|
|
|
class TestEventGridFunctionsGeneric(TestEventGridFunctions):
|
|
|
|
@classmethod
|
|
def get_script_dir(cls):
|
|
return testutils.E2E_TESTS_FOLDER / 'eventgrid_functions' / \
|
|
'eventgrid_functions_stein' / 'generic'
|