unit tests for wait or create check response
includes bug fixes found from tests
This commit is contained in:
Родитель
c04154c294
Коммит
03ba1605c2
|
@ -380,13 +380,12 @@ class DurableOrchestrationClient:
|
|||
The timeout between checks for output from the durable function.
|
||||
The default value is 1 second.
|
||||
"""
|
||||
|
||||
if retry_interval_in_milliseconds > timeout_in_milliseconds:
|
||||
raise Exception(f'Total timeout {timeout_in_milliseconds} (ms) should be bigger than '
|
||||
f'retry timeout {retry_interval_in_milliseconds} (ms)')
|
||||
|
||||
checking = True
|
||||
start_time = time.time_ns()
|
||||
start_time = time.time()
|
||||
|
||||
while checking:
|
||||
status = await self.get_status(instance_id)
|
||||
|
@ -394,36 +393,37 @@ class DurableOrchestrationClient:
|
|||
if status:
|
||||
switch_statement = {
|
||||
OrchestrationRuntimeStatus.Completed:
|
||||
self._create_http_response(200, status.output),
|
||||
lambda: self._create_http_response(200, status.output),
|
||||
OrchestrationRuntimeStatus.Canceled:
|
||||
self._create_http_response(200, status.to_json()),
|
||||
lambda: self._create_http_response(200, status.to_json()),
|
||||
OrchestrationRuntimeStatus.Terminated:
|
||||
self._create_http_response(200, status.to_json()),
|
||||
lambda: self._create_http_response(200, status.to_json()),
|
||||
OrchestrationRuntimeStatus.Failed:
|
||||
self._create_http_response(500, status.to_json()),
|
||||
lambda: self._create_http_response(500, status.to_json()),
|
||||
}
|
||||
|
||||
result = switch_statement.get(status.runtime_status)
|
||||
result = switch_statement.get(OrchestrationRuntimeStatus(status.runtime_status))
|
||||
if result:
|
||||
return result
|
||||
return result()
|
||||
|
||||
elapsed = time.time_ns() - start_time
|
||||
elapsed = time.time() - start_time
|
||||
elapsed_in_milliseconds = elapsed * 1000
|
||||
if elapsed_in_milliseconds < timeout_in_milliseconds:
|
||||
remaining_time = timeout_in_milliseconds - elapsed_in_milliseconds
|
||||
sleep_time = retry_interval_in_milliseconds \
|
||||
if remaining_time > retry_interval_in_milliseconds else remaining_time
|
||||
sleep_time /= 1000
|
||||
await time.sleep(sleep_time)
|
||||
time.sleep(sleep_time)
|
||||
else:
|
||||
return self.create_check_status_response(request, instance_id)
|
||||
|
||||
@staticmethod
|
||||
def _create_http_response(status_code: int, body: Any) -> func.HttpResponse:
|
||||
body_as_json = json.dumps(body)
|
||||
body_as_json = body if isinstance(body, str) else json.dumps(body)
|
||||
response_args = {
|
||||
"status": status_code,
|
||||
"status_code": status_code,
|
||||
"body": body_as_json,
|
||||
"mimetype": "application/json",
|
||||
"headers": {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ from typing import Any
|
|||
|
||||
class PurgeHistoryResult:
|
||||
"""Information provided when the request to purge history has been made."""
|
||||
|
||||
# parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions
|
||||
def __init__(self, instancesDeleted: int, **kwargs):
|
||||
self._instances_deleted: int = instancesDeleted
|
||||
|
|
|
@ -6,6 +6,7 @@ import pytest
|
|||
from azure.durable_functions.models.OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
|
||||
from azure.durable_functions.models.DurableOrchestrationClient \
|
||||
import DurableOrchestrationClient
|
||||
from azure.durable_functions.models.DurableOrchestrationStatus import DurableOrchestrationStatus
|
||||
from tests.conftest import replace_stand_in_bits
|
||||
from tests.test_utils.constants import RPC_BASE_URL
|
||||
from unittest.mock import Mock
|
||||
|
@ -23,8 +24,14 @@ class MockRequest:
|
|||
def __init__(self, expected_url: str, response: [int, any]):
|
||||
self._expected_url = expected_url
|
||||
self._response = response
|
||||
self._get_count = 0
|
||||
|
||||
@property
|
||||
def get_count(self):
|
||||
return self._get_count
|
||||
|
||||
async def get(self, url: str):
|
||||
self._get_count += 1
|
||||
assert url == self._expected_url
|
||||
return self._response
|
||||
|
||||
|
@ -382,3 +389,111 @@ async def test_post_500_terminate(binding_string):
|
|||
|
||||
with pytest.raises(Exception):
|
||||
await client.terminate(TEST_INSTANCE_ID, raw_reason)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_or_response_200_completed(binding_string):
|
||||
output = 'Some output'
|
||||
mock_request = MockRequest(expected_url=f"{RPC_BASE_URL}instances/{TEST_INSTANCE_ID}",
|
||||
response=[200, dict(createdTime=TEST_CREATED_TIME,
|
||||
lastUpdatedTime=TEST_LAST_UPDATED_TIME,
|
||||
runtimeStatus="Completed",
|
||||
output=output)])
|
||||
client = DurableOrchestrationClient(binding_string)
|
||||
client._get_async_request = mock_request.get
|
||||
|
||||
result = await client.wait_for_completion_or_create_check_status_response(
|
||||
None, TEST_INSTANCE_ID)
|
||||
assert result is not None
|
||||
assert result.status_code == 200
|
||||
assert result.mimetype == 'application/json'
|
||||
assert result.get_body().decode() == output
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_or_response_200_canceled(binding_string):
|
||||
status = dict(createdTime=TEST_CREATED_TIME,
|
||||
lastUpdatedTime=TEST_LAST_UPDATED_TIME,
|
||||
runtimeStatus="Canceled")
|
||||
mock_request = MockRequest(expected_url=f"{RPC_BASE_URL}instances/{TEST_INSTANCE_ID}",
|
||||
response=[200, status])
|
||||
client = DurableOrchestrationClient(binding_string)
|
||||
client._get_async_request = mock_request.get
|
||||
|
||||
result = await client.wait_for_completion_or_create_check_status_response(
|
||||
None, TEST_INSTANCE_ID)
|
||||
assert result is not None
|
||||
assert result.status_code == 200
|
||||
assert result.mimetype == 'application/json'
|
||||
assert json.loads(result.get_body().decode()) == DurableOrchestrationStatus.from_json(
|
||||
status).to_json()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_or_response_200_terminated(binding_string):
|
||||
status = dict(createdTime=TEST_CREATED_TIME,
|
||||
lastUpdatedTime=TEST_LAST_UPDATED_TIME,
|
||||
runtimeStatus="Terminated")
|
||||
mock_request = MockRequest(expected_url=f"{RPC_BASE_URL}instances/{TEST_INSTANCE_ID}",
|
||||
response=[200, status])
|
||||
client = DurableOrchestrationClient(binding_string)
|
||||
client._get_async_request = mock_request.get
|
||||
|
||||
result = await client.wait_for_completion_or_create_check_status_response(
|
||||
None, TEST_INSTANCE_ID)
|
||||
assert result is not None
|
||||
assert result.status_code == 200
|
||||
assert result.mimetype == 'application/json'
|
||||
assert json.loads(result.get_body().decode()) == DurableOrchestrationStatus.from_json(
|
||||
status).to_json()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_or_response_200_failed(binding_string):
|
||||
status = dict(createdTime=TEST_CREATED_TIME,
|
||||
lastUpdatedTime=TEST_LAST_UPDATED_TIME,
|
||||
runtimeStatus="Failed")
|
||||
mock_request = MockRequest(expected_url=f"{RPC_BASE_URL}instances/{TEST_INSTANCE_ID}",
|
||||
response=[200, status])
|
||||
client = DurableOrchestrationClient(binding_string)
|
||||
client._get_async_request = mock_request.get
|
||||
|
||||
result = await client.wait_for_completion_or_create_check_status_response(
|
||||
None, TEST_INSTANCE_ID)
|
||||
assert result is not None
|
||||
assert result.status_code == 500
|
||||
assert result.mimetype == 'application/json'
|
||||
assert json.loads(result.get_body().decode()) == DurableOrchestrationStatus.from_json(
|
||||
status).to_json()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_or_response_check_status_response(binding_string):
|
||||
status = dict(createdTime=TEST_CREATED_TIME,
|
||||
lastUpdatedTime=TEST_LAST_UPDATED_TIME,
|
||||
runtimeStatus="Running")
|
||||
mock_request = MockRequest(expected_url=f"{RPC_BASE_URL}instances/{TEST_INSTANCE_ID}",
|
||||
response=[200, status])
|
||||
client = DurableOrchestrationClient(binding_string)
|
||||
client._get_async_request = mock_request.get
|
||||
|
||||
request = Mock(url="http://test_azure.net/api/orchestrators/DurableOrchestrationTrigger")
|
||||
result = await client.wait_for_completion_or_create_check_status_response(
|
||||
request, TEST_INSTANCE_ID, timeout_in_milliseconds=2000)
|
||||
assert result is not None
|
||||
assert mock_request.get_count == 3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_or_response_check_status_response(binding_string):
|
||||
status = dict(createdTime=TEST_CREATED_TIME,
|
||||
lastUpdatedTime=TEST_LAST_UPDATED_TIME,
|
||||
runtimeStatus="Running")
|
||||
mock_request = MockRequest(expected_url=f"{RPC_BASE_URL}instances/{TEST_INSTANCE_ID}",
|
||||
response=[200, status])
|
||||
client = DurableOrchestrationClient(binding_string)
|
||||
client._get_async_request = mock_request.get
|
||||
|
||||
with pytest.raises(Exception):
|
||||
await client.wait_for_completion_or_create_check_status_response(
|
||||
None, TEST_INSTANCE_ID, timeout_in_milliseconds=500)
|
||||
|
|
Загрузка…
Ссылка в новой задаче