added E2E pystein and generic binding tests. (#1045)

This commit is contained in:
pdthummar 2022-07-05 10:34:18 -05:00 коммит произвёл GitHub
Родитель c7a52362c0
Коммит ef37809029
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 101 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,33 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import uuid
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="table_in_binding")
@app.route(route="table_in_binding/{id}")
@app.read_table(arg_name="testEntity",
connection="AzureWebJobsStorage",
table_name="BindingTestTable",
row_key='{id}',
partition_key="test")
def table_in_binding(req: func.HttpRequest, testEntity):
headers_dict = json.loads(testEntity)
return func.HttpResponse(status_code=200, headers=headers_dict)
@app.function_name(name="table_out_binding")
@app.route(route="table_out_binding", binding_arg_name="resp")
@app.write_table(arg_name="$return",
connection="AzureWebJobsStorage",
table_name="BindingTestTable")
def table_out_binding(req: func.HttpRequest, resp: func.Out[func.HttpResponse]):
row_key_uuid = str(uuid.uuid4())
table_dict = {'PartitionKey': 'test', 'RowKey': row_key_uuid}
table_json = json.dumps(table_dict)
http_resp = func.HttpResponse(status_code=200, headers=table_dict)
resp.set(http_resp)
return table_json

Просмотреть файл

@ -0,0 +1,41 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import uuid
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="table_in_binding")
@app.generic_trigger(arg_name="req", type="httpTrigger",
route="table_in_binding/{id}")
@app.generic_output_binding(arg_name="$return", type="http")
@app.generic_input_binding(
arg_name="testEntity",
type="table",
connection="AzureWebJobsStorage",
table_name="BindingTestTable",
row_key="{id}",
partition_key="test")
def table_in_binding(req: func.HttpRequest, testEntity):
headers_dict = json.loads(testEntity)
return func.HttpResponse(status_code=200, headers=headers_dict)
@app.function_name(name="table_out_binding")
@app.generic_trigger(arg_name="req", type="httpTrigger",
route="table_out_binding")
@app.generic_output_binding(arg_name="resp", type="http")
@app.generic_output_binding(
arg_name="$return",
type="table",
connection="AzureWebJobsStorage",
table_name="BindingTestTable")
def table_out_binding(req: func.HttpRequest, resp: func.Out[func.HttpResponse]):
row_key_uuid = str(uuid.uuid4())
table_dict = {'PartitionKey': 'test', 'RowKey': row_key_uuid}
table_json = json.dumps(table_dict)
http_resp = func.HttpResponse(status_code=200, headers=table_dict)
resp.set(http_resp)
return table_json

Просмотреть файл

@ -37,3 +37,30 @@ class TestTableFunctions(testutils.WebHostTestCase):
self.assertEqual(in_resp.status_code, 200)
in_row_key = in_resp.headers['rowKey']
self.assertEqual(in_row_key, row_key)
class TestTableFunctionsStein(testutils.WebHostTestCase):
@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'table_functions' / \
'table_functions_stein'
@testutils.retryable_test(3, 5)
def test_table_bindings(self):
out_resp = self.webhost.request('POST', 'table_out_binding')
self.assertEqual(out_resp.status_code, 200)
row_key = out_resp.headers['rowKey']
in_resp = self.webhost.request('GET', f'table_in_binding/{row_key}')
self.assertEqual(in_resp.status_code, 200)
in_row_key = in_resp.headers['rowKey']
self.assertEqual(in_row_key, row_key)
class TestTableFunctionsGeneric(TestTableFunctionsStein):
@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'table_functions' / \
'table_functions_stein' / 'generic'