continued implementation of docstrings in library
This commit is contained in:
Shawn Gaul 2020-01-17 14:54:17 -08:00
Родитель 67e3b0394c
Коммит 1b651593d8
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: C1569ABD11FBB1CB
18 изменённых файлов: 278 добавлений и 121 удалений

5
.flake8 Normal file
Просмотреть файл

@ -0,0 +1,5 @@
[flake8]
exclude = .git, __pycache__, build, dist, .eggs, .github, .local, docs/,
samples, .venv*, .env*, .vscode, .idea, investigations
max-complexity = 10
docstring-convention = numpy

Просмотреть файл

@ -1,6 +1,6 @@
"""
Base module for the Python Durable functions, exposing the different API
components intended for public consumption
"""Base module for the Python Durable functions.
Exposes the different API components intended for public consumption
"""
from .orchestrator import Orchestrator
from .models.DurableOrchestrationClient import DurableOrchestrationClient

Просмотреть файл

@ -1,3 +1,3 @@
"""Constants used to determine the local running context."""
_DEFAULT_LOCAL_HOST: str = "localhost:7071"
_DEFAULT_LOCAL_ORIGIN: str = f"http://{_DEFAULT_LOCAL_HOST}"
DEFAULT_LOCAL_HOST: str = "localhost:7071"
DEFAULT_LOCAL_ORIGIN: str = f"http://{DEFAULT_LOCAL_HOST}"

Просмотреть файл

@ -1,7 +1,10 @@
"""Defines the base interface for Actions that need to be executed."""
from ..models.actions import ActionType
class IAction:
"""Defines the base interface for Actions that need to be executed."""
def __init__(self):
"""Create a new Action object."""
actionType: ActionType

Просмотреть файл

@ -1,6 +1,10 @@
"""Interface for the Orchestration object exposed to the generator function."""
from ..models import DurableOrchestrationContext
class IFunctionContext:
"""Orchestration object exposed to the generator function."""
def __init__(self, df=None):
"""Create a new orchestration context."""
self.df: DurableOrchestrationContext = df

Просмотреть файл

@ -1,8 +0,0 @@
from typing import Callable, List
from ..models import (Task, TaskSet)
class ITaskMethods:
def __init__(self):
self.all: Callable[[List[Task]], TaskSet]
self.any: Callable[[List[Task]], TaskSet]

Просмотреть файл

@ -1,9 +1,8 @@
"""Interfaces for durable functions."""
from .IAction import IAction
from .ITaskMethods import ITaskMethods
from .IFunctionContext import IFunctionContext
__all__ = [
'IAction',
'ITaskMethods',
'IFunctionContext'
]

Просмотреть файл

@ -1,29 +1,18 @@
"""Binding information for durable functions."""
import json
from typing import Dict
class DurableOrchestrationBindings:
"""Binding information.
Provides information relevant to the creation and management of
durable functions.
"""
def __init__(self, client_data: str):
"""Create a new binding object."""
context = json.loads(client_data)
self.task_hub_name: str = context.get('taskHubName')
self.creation_urls: Dict[str, str] = context.get('creationUrls')
self.management_urls: Dict[str, str] = context.get('managementUrls')
'''
{
"taskHubName":"DurableFunctionsHub",
"creationUrls":{
"createNewInstancePostUri":"http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"createAndWaitOnNewInstancePostUri":"http://localhost:7071/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg=="
},
"managementUrls":{
"id":"INSTANCEID",
"statusQueryGetUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"sendEventPostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"terminatePostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"rewindPostUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg==",
"purgeHistoryDeleteUri":"http://localhost:7071/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg=="
}
}
'''

Просмотреть файл

@ -1,3 +1,4 @@
"""Durable Orchestration Client class definition."""
import requests
import json
from typing import List
@ -6,46 +7,70 @@ from azure.durable_functions.models import DurableOrchestrationBindings
class DurableOrchestrationClient:
"""Durable Orchestration Client.
Client for starting, querying, terminating and raising events to
orchestration instances.
"""
def __init__(self, context: str):
self.taskHubName: str
self.uniqueWebhookOrigins: List[str]
self._eventNamePlaceholder: str = "{eventName}"
self._functionNamePlaceholder: str = "{functionName}"
self._instanceIdPlaceholder: str = "[/{instanceId}]"
self._reasonPlaceholder: str = "{text}"
self._createdTimeFromQueryKey: str = "createdTimeFrom"
self._createdTimeToQueryKey: str = "createdTimeTo"
self._runtimeStatusQueryKey: str = "runtimeStatus"
self._showHistoryQueryKey: str = "showHistory"
self._showHistoryOutputQueryKey: str = "showHistoryOutput"
self._showInputQueryKey: str = "showInput"
self._orchestrationBindings: DurableOrchestrationBindings = \
"""Create a new Orchestration Client.
:param context: The object representing the orchestrationClient input
binding of the Azure function that will use this client.
"""
self.task_hub_name: str
self._uniqueWebHookOrigins: List[str]
self._event_name_placeholder: str = "{eventName}"
self._function_name_placeholder: str = "{functionName}"
self._instance_id_placeholder: str = "[/{instanceId}]"
self._reason_placeholder: str = "{text}"
self._created_time_from_query_key: str = "createdTimeFrom"
self._created_time_to_query_key: str = "createdTimeTo"
self._runtime_status_query_key: str = "runtimeStatus"
self._show_history_query_key: str = "showHistory"
self._show_history_output_query_key: str = "showHistoryOutput"
self._show_input_query_key: str = "showInput"
self._orchestration_bindings: DurableOrchestrationBindings = \
DurableOrchestrationBindings(context)
def start_new(self,
orchestration_function_name: str,
instance_id: str,
client_input):
request_url = self.get_start_new_url(
"""Start a new instance of the specified orchestrator function.
If an orchestration instance with the specified ID already exists, the
existing instance will be silently replaced by this new instance.
:param orchestration_function_name: The name of the orchestrator
function to start.
:param instance_id: The ID to use for the new orchestration instance.
If no instanceId is specified, the Durable Functions extension will
generate a random GUID (recommended).
:param client_input: JSON-serializable input value for the orchestrator
function.
:return: The ID of the new orchestration instance.
"""
request_url = self._get_start_new_url(
instance_id,
orchestration_function_name)
result = requests.post(request_url, json=self.get_json_input(
result = requests.post(request_url, json=self._get_json_input(
client_input))
return result
@staticmethod
def get_json_input(client_input):
def _get_json_input(client_input):
return json.dumps(client_input) if client_input is not None else None
def get_start_new_url(self, instance_id, orchestration_function_name):
request_url = self._orchestrationBindings.creation_urls[
def _get_start_new_url(self, instance_id, orchestration_function_name):
request_url = self._orchestration_bindings.creation_urls[
'createNewInstancePostUri'
]
request_url = request_url.replace(self._functionNamePlaceholder,
request_url = request_url.replace(self._function_name_placeholder,
orchestration_function_name)
request_url = request_url.replace(self._instanceIdPlaceholder,
request_url = request_url.replace(self._instance_id_placeholder,
f'/{instance_id}'
if instance_id is not None else '')
return request_url

Просмотреть файл

@ -1,5 +1,7 @@
"""Defines the Durable Orchestration Context Class Object."""
import json
import logging
import datetime
from typing import List, Any, Dict
from dateutil.parser import parse as dt_parse
@ -7,21 +9,25 @@ from dateutil.parser import parse as dt_parse
from . import (RetryOptions)
from .history import HistoryEvent, HistoryEventType
from ..interfaces import IAction
from ..interfaces import ITaskMethods
from ..models.Task import Task
from ..tasks import call_activity_task, task_all, call_activity_with_retry_task
class DurableOrchestrationContext:
"""Context of the durable orchestration execution.
Parameter data for orchestration bindings that can be used to schedule
function-based activities.
"""
def __init__(self,
context_string: str):
context: Dict[str, Any] = json.loads(context_string)
logging.warning(f"!!!Calling orchestrator handle {context}")
self.histories: List[HistoryEvent] = context.get("history")
self.instanceId = context.get("instanceId")
self.isReplaying = context.get("isReplaying")
self.parentInstanceId = context.get("parentInstanceId")
self._histories: List[HistoryEvent] = context.get("history")
self._instance_id = context.get("instanceId")
self._is_replaying = context.get("isReplaying")
self._parent_instance_id = context.get("parentInstanceId")
self.call_activity = lambda n, i: call_activity_task(
state=self.histories,
name=n,
@ -36,21 +42,108 @@ class DurableOrchestrationContext:
self.decision_started_event: HistoryEvent = list(filter(
lambda e_: e_["EventType"] == HistoryEventType.OrchestratorStarted,
self.histories))[0]
self.currentUtcDateTime = \
self._current_utc_datetime = \
dt_parse(self.decision_started_event["Timestamp"])
self.newGuidCounter = 0
self.new_guid_counter = 0
self.actions: List[List[IAction]] = []
self.Task: ITaskMethods
def call_activity(name: str, input_=None) -> Task:
raise NotImplementedError("This is a placeholder.")
def call_activity(self, name: str, input_=None) -> Task:
"""Schedule an activity for execution.
def call_activity_with_retry(
name: str, retry_options: RetryOptions, input_=None) -> Task:
raise NotImplementedError("This is a placeholder.")
:param name: The name of the activity function to call.
:param input_:The JSON-serializable input to pass to the activity
function.
:return: A Durable Task that completes when the called activity
function completes or fails.
"""
raise NotImplementedError("This is a placeholder.")
def call_sub_orchestrator(
name: str, input_=None, instance_id: str = None) -> Task:
raise NotImplementedError("This is a placeholder.")
def call_activity_with_retry(self,
name: str, retry_options: RetryOptions,
input_=None) -> Task:
"""Schedule an activity for execution with retry options.
# TODO: more to port over
:param name: The name of the activity function to call.
:param retry_options: The retry options for the activity function.
:param input_: The JSON-serializable input to pass to the activity
function.
:return: A Durable Task that completes when the called activity
function completes or fails completely.
"""
raise NotImplementedError("This is a placeholder.")
def call_sub_orchestrator(self,
name: str, input_=None,
instance_id: str = None) -> Task:
"""Schedule an orchestration function named `name` for execution.
:param name: The name of the orchestrator function to call.
:param input_: The JSON-serializable input to pass to the orchestrator
function.
:param instance_id: A unique ID to use for the sub-orchestration
instance. If `instanceId` is not specified, the extension will generate
an id in the format `<calling orchestrator instance ID>:<#>`
"""
raise NotImplementedError("This is a placeholder.")
@property
def histories(self):
"""Get running history of tasks that have been scheduled."""
return self._histories
@property
def instance_id(self):
"""Get the ID of the current orchestration instance.
The instance ID is generated and fixed when the orchestrator function
is scheduled. It can be either auto-generated, in which case it is
formatted as a GUID, or it can be user-specified with any format.
:return: The ID of the current orchestration instance.
"""
return self._instance_id
@property
def is_replaying(self):
"""Get the value indicating orchestration replaying itself.
This property is useful when there is logic that needs to run only when
the orchestrator function is _not_ replaying. For example, certain
types of application logging may become too noisy when duplicated as
part of orchestrator function replay. The orchestrator code could check
to see whether the function is being replayed and then issue the log
statements when this value is `false`.
:return: value indicating whether the orchestrator function is
currently replaying
"""
return self._is_replaying
@property
def parent_instance_id(self):
"""Get the ID of the parent orchestration.
The parent instance ID is generated and fixed when the parent
orchestrator function is scheduled. It can be either auto-generated, in
which case it is formatted as a GUID, or it can be user-specified with
any format.
:return: ID of the parent orchestration of the current
sub-orchestration instance
"""
return self._parent_instance_id
@property
def current_utc_datetime(self) -> datetime:
"""Get the current date/time.
This date/time value is derived from the orchestration history. It
always returns the same value at specific points in the orchestrator
function code, making it deterministic and safe for replay.
:return: The current date/time in a way that is safe for use by
orchestrator functions
"""
return self._current_utc_datetime
@current_utc_datetime.setter
def current_utc_datetime(self, value: datetime):
self._current_utc_datetime = value

Просмотреть файл

@ -28,7 +28,7 @@ class Orchestrator:
def __init__(self,
activity_func: Callable[[IFunctionContext], Iterator[Any]]):
"""Base constructor for class.
"""Create a new orchestrator for the user defined generator.
Responsible for orchestrating the execution of the user defined
generator function.
@ -39,6 +39,16 @@ class Orchestrator:
# noinspection PyAttributeOutsideInit
def handle(self, context_string: str):
"""Handle the orchestration of the user defined generator function.
Called each time the durable extension executes an activity and needs
the client to handle the result.
:param context_string: the context of what has been executed by
the durable extension.
:return: the resulting orchestration state, with instructions back to
the durable extension.
"""
self.durable_context = DurableOrchestrationContext(context_string)
activity_context = IFunctionContext(df=self.durable_context)
@ -111,14 +121,18 @@ class Orchestrator:
and dt_parse(e_["Timestamp"]) > last_timestamp),
self.durable_context.histories))
if len(decision_started_events) == 0:
self.durable_context.currentUtcDateTime = None
self.durable_context.current_utc_datetime = None
else:
self.durable_context.decision_started_event = \
decision_started_events[0]
self.durable_context.currentUtcDateTime = dt_parse(
self.durable_context.current_utc_datetime = dt_parse(
self.durable_context.decision_started_event['Timestamp'])
@classmethod
def create(cls, fn):
logging.warning("!!!Calling orchestrator create")
"""Create an instance of the orchestration class.
:param fn: Generator function that needs orchestration
:return: Handle function of the newly created orchestration client
"""
return lambda context: Orchestrator(fn).handle(context)

Просмотреть файл

@ -1,3 +1,4 @@
"""Setup for the durable function module."""
import pathlib
import os
import shutil
@ -5,59 +6,64 @@ import subprocess
import sys
import glob
from setuptools import setup,find_packages
from setuptools import setup, find_packages
from distutils.command import build
class BuildGRPC:
"""Generate gRPC bindings."""
def _gen_grpc():
root = pathlib.Path(os.path.abspath(os.path.dirname(__file__)))
proto_root_dir = \
root / 'azure' / 'durable_functions' / 'grpc' / 'protobuf'
proto_src_dir = proto_root_dir
staging_root_dir = root / 'build' / 'protos'
staging_dir = staging_root_dir
build_dir = staging_dir
def _gen_grpc(self):
root = pathlib.Path(os.path.abspath(os.path.dirname(__file__)))
proto_root_dir = root / 'azure' / 'durable_functions' / 'grpc' / 'protobuf'
proto_src_dir = proto_root_dir
staging_root_dir = root / 'build' / 'protos'
staging_dir = staging_root_dir
build_dir = staging_dir
if os.path.exists(build_dir):
shutil.rmtree(build_dir)
if os.path.exists(build_dir):
shutil.rmtree(build_dir)
shutil.copytree(proto_src_dir, build_dir)
shutil.copytree(proto_src_dir, build_dir)
subprocess.run([
sys.executable, '-m', 'grpc_tools.protoc',
'-I', str(proto_src_dir),
'--python_out', str(staging_root_dir),
'--grpc_python_out', str(staging_root_dir),
os.sep.join((str(proto_src_dir),
'DurableRpc.proto')),
], check=True, stdout=sys.stdout, stderr=sys.stderr,
cwd=staging_root_dir)
subprocess.run([
sys.executable, '-m', 'grpc_tools.protoc',
'-I', str(proto_src_dir),
'--python_out', str(staging_root_dir),
'--grpc_python_out', str(staging_root_dir),
os.sep.join((str(proto_src_dir),
'DurableRpc.proto')),
], check=True, stdout=sys.stdout, stderr=sys.stderr,
cwd=staging_root_dir)
compiled = glob.glob(str(staging_dir / '*.py'))
compiled = glob.glob(str(staging_dir / '*.py'))
if not compiled:
print('grpc_tools.protoc produced no Python files',
file=sys.stderr)
sys.exit(1)
if not compiled:
print('grpc_tools.protoc produced no Python files',
file=sys.stderr)
sys.exit(1)
# Not sure if we need this line that will copy both the proto and py generated
# files in the proto root dir
for f in compiled:
shutil.copy(f, proto_root_dir)
# Not sure if we need this line that will copy both the
# proto and py generated
# files in the proto root dir
for f in compiled:
shutil.copy(f, proto_root_dir)
class build(build.build, BuildGRPC):
class BuildModule(build.build):
"""Used to build the module."""
def run(self, *args, **kwargs):
self._gen_grpc()
"""Execute the build.
:param args:
:param kwargs:
"""
_gen_grpc()
super().run(*args, **kwargs)
setup(
name='azure-functions-durable',
packages=find_packages(exclude=("tests","samples")),
packages=find_packages(exclude=("tests", "samples")),
version='1.0.1ab',
description='Durable Functions Support For Python Functionapp',
license='MIT',
@ -75,7 +81,7 @@ setup(
],
include_package_data=True,
cmdclass={
'build': build
'build': BuildModule
},
test_suite='tests'
)

Просмотреть файл

@ -1,9 +1,14 @@
"""Unit tests for the durable functions library"""
import os
import sys
import unittest
def suite():
"""
:return: configuration for the suite of tests
"""
test_loader = unittest.TestLoader()
test_suite = test_loader.discover(
os.path.dirname(__file__), pattern='test_*.py')

Просмотреть файл

@ -8,6 +8,10 @@ AUTH_CODE = "GBgDKQriGLABxpY/m5qcPd3R2sNafdRPE3/LcUSZEnuvOzTA1qD3Tg=="
def get_binding_string():
"""
:return:
"""
binding_string = '{"taskHubName":"TASK_HUB_NAME","creationUrls":{' \
'"createNewInstancePostUri":"BASE_URL/orchestrators/{' \
'functionName}[/{' \
@ -43,16 +47,29 @@ def get_binding_string():
@pytest.fixture()
def binding_string():
"""
:return:
"""
return get_binding_string()
@pytest.fixture()
def binding_info():
"""
:return:
"""
binding = DurableOrchestrationBindings(get_binding_string())
return binding
def replace_stand_in_bits(binding_string):
"""
:param binding_string:
:return:
"""
binding_string = binding_string.replace("TASK_HUB_NAME", TASK_HUB_NAME)
binding_string = binding_string.replace("BASE_URL", BASE_URL)
binding_string = binding_string.replace("AUTH_CODE", AUTH_CODE)

Просмотреть файл

@ -1,7 +1,11 @@
from tests.fixtures import TASK_HUB_NAME, replace_stand_in_bits, binding_info
def test_extracts_task_hub_name(binding_info):
"""
:param binding_info: Test fixture containing
"""
assert TASK_HUB_NAME == binding_info.task_hub_name

Просмотреть файл

@ -9,19 +9,19 @@ def test_get_start_new_url(binding_string):
client = DurableOrchestrationClient(binding_string)
instance_id = "abc123"
function_name = "myfunction"
start_new_url = client.get_start_new_url(instance_id, function_name)
start_new_url = client._get_start_new_url(instance_id, function_name)
expected_url = replace_stand_in_bits(
f"BASE_URL/orchestrators/{function_name}/{instance_id}?code=AUTH_CODE")
assert expected_url == start_new_url
def test_get_input_returns_none_when_none_supplied():
result = DurableOrchestrationClient.get_json_input(None)
result = DurableOrchestrationClient._get_json_input(None)
assert result is None
def test_get_input_returns_json_string(binding_string):
input_ = json.loads(binding_string)
result = DurableOrchestrationClient.get_json_input(input_)
result = DurableOrchestrationClient._get_json_input(input_)
input_as_string = json.dumps(input_)
assert input_as_string == result

Просмотреть файл

@ -23,17 +23,17 @@ def starting_context():
def test_extracts_is_replaying(starting_context):
assert not starting_context.isReplaying
assert not starting_context.is_replaying
def test_extracts_instance_id(starting_context):
assert "48d0f95957504c2fa579e810a390b938" == starting_context.instanceId
assert "48d0f95957504c2fa579e810a390b938" == starting_context.instance_id
def test_sets_current_utc_datetime(starting_context):
assert \
dt_parse("2019-12-08T23:18:41.3240927Z") == \
starting_context.currentUtcDateTime
starting_context.current_utc_datetime
def test_extracts_histories(starting_context):

Просмотреть файл

@ -1,12 +1,13 @@
""" Validates the constants are set correctly."""
import unittest
from azure.durable_functions.constants import (
_DEFAULT_LOCAL_HOST,
_DEFAULT_LOCAL_ORIGIN)
DEFAULT_LOCAL_HOST,
DEFAULT_LOCAL_ORIGIN)
class TestConstants(unittest.TestCase):
def test_default_local_host(self):
self.assertEqual(_DEFAULT_LOCAL_HOST, "localhost:7071")
self.assertEqual(DEFAULT_LOCAL_HOST, "localhost:7071")
def test_default_local_origin(self):
self.assertEqual(_DEFAULT_LOCAL_ORIGIN, "http://localhost:7071")
self.assertEqual(DEFAULT_LOCAL_ORIGIN, "http://localhost:7071")