Serialize input for CallActivityWithRetryAction (#225)
Co-authored-by: cvitzthum-nasuni <cvitzthum@nasuni.com>
This commit is contained in:
Родитель
59e77089d4
Коммит
af482d2fc1
|
@ -1,9 +1,11 @@
|
|||
from json import dumps
|
||||
from typing import Dict, Union
|
||||
|
||||
from .Action import Action
|
||||
from .ActionType import ActionType
|
||||
from ..RetryOptions import RetryOptions
|
||||
from ..utils.json_utils import add_attrib, add_json_attrib
|
||||
from azure.functions._durable_functions import _serialize_custom_object
|
||||
|
||||
|
||||
class CallActivityWithRetryAction(Action):
|
||||
|
@ -16,7 +18,7 @@ class CallActivityWithRetryAction(Action):
|
|||
retry_options: RetryOptions, input_=None):
|
||||
self.function_name: str = function_name
|
||||
self.retry_options: RetryOptions = retry_options
|
||||
self.input_ = input_
|
||||
self.input_ = dumps(input_, default=_serialize_custom_object)
|
||||
|
||||
if not self.function_name:
|
||||
raise ValueError("function_name cannot be empty")
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from tests.test_utils.ContextBuilder import ContextBuilder
|
||||
from tests.test_utils.testClasses import SerializableClass
|
||||
from azure.durable_functions.models.RetryOptions import RetryOptions
|
||||
from azure.durable_functions.models.OrchestratorState import OrchestratorState
|
||||
from azure.durable_functions.models.DurableOrchestrationContext import DurableOrchestrationContext
|
||||
|
@ -42,6 +43,38 @@ def generator_function(context: DurableOrchestrationContext):
|
|||
|
||||
return outputs
|
||||
|
||||
|
||||
def generator_function_with_serialization(context: DurableOrchestrationContext):
|
||||
"""Orchestrator function for testing retry'ing with serializable input arguments.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
context: DurableOrchestrationContext
|
||||
Durable orchestration context, exposes the Durable API
|
||||
|
||||
Returns
|
||||
-------
|
||||
List[str]:
|
||||
Output of activities, a list of hello'd cities
|
||||
"""
|
||||
|
||||
outputs = []
|
||||
|
||||
retry_options = RETRY_OPTIONS
|
||||
task1 = yield context.call_activity_with_retry(
|
||||
"Hello", retry_options, SerializableClass("Tokyo"))
|
||||
task2 = yield context.call_activity_with_retry(
|
||||
"Hello", retry_options, SerializableClass("Seatlle"))
|
||||
task3 = yield context.call_activity_with_retry(
|
||||
"Hello", retry_options, SerializableClass("London"))
|
||||
|
||||
outputs.append(task1)
|
||||
outputs.append(task2)
|
||||
outputs.append(task3)
|
||||
|
||||
return outputs
|
||||
|
||||
|
||||
def get_context_with_retries_and_corrupted_completion() -> ContextBuilder:
|
||||
"""Get a ContextBuilder whose history contains a late completion event
|
||||
for an event that already failed.
|
||||
|
@ -268,3 +301,17 @@ def test_retries_can_fail():
|
|||
|
||||
expected_error_str = f"{error_msg}{error_label}"
|
||||
assert str.startswith(error_str, expected_error_str)
|
||||
|
||||
def test_retries_with_serializable_input():
|
||||
"""Tests that retried tasks work with serialized input classes."""
|
||||
context = get_context_with_retries()
|
||||
|
||||
result_1 = get_orchestration_state_result(
|
||||
context, generator_function)
|
||||
|
||||
result_2 = get_orchestration_state_result(
|
||||
context, generator_function_with_serialization)
|
||||
|
||||
assert "output" in result_1
|
||||
assert "output" in result_2
|
||||
assert result_1["output"] == result_2["output"]
|
||||
|
|
Загрузка…
Ссылка в новой задаче