Added e2e tests for dependency isolation (#919)

* Added e2e tests for dependency isolation

Co-authored-by: Gavin Aguiar <gavin.aguiar@microsoft.com>
Co-authored-by: Varad Meru <vrdmr@users.noreply.github.com>
This commit is contained in:
gavin-aguiar 2021-12-03 11:28:07 -06:00 коммит произвёл GitHub
Родитель a83ac06e37
Коммит 2bc45d1aaf
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
28 изменённых файлов: 2838 добавлений и 123 удалений

Просмотреть файл

@ -0,0 +1,26 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from ._abc import Context, Out
from ._http import HttpRequest
from ._http import HttpResponse
from .meta import get_binding_registry
# Import binding implementations to register them
from . import http # NoQA
__all__ = (
# Functions
'get_binding_registry',
# Generics.
'Context',
'Out',
# Binding rich types, sorted alphabetically.
'HttpRequest',
'HttpResponse',
)
__version__ = '9.9.9'

Просмотреть файл

@ -0,0 +1,423 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import abc
import datetime
import io
import typing
T = typing.TypeVar('T')
class Out(abc.ABC, typing.Generic[T]):
"""An interface to set function output parameters."""
@abc.abstractmethod
def set(self, val: T) -> None:
"""Set the value of the output parameter."""
pass
@abc.abstractmethod
def get(self) -> T:
"""Get the value of the output parameter."""
pass
class RpcException:
"""Rpc Exception object."""
@property
@abc.abstractmethod
def source(self) -> str:
"""Source of the exception."""
pass
@property
@abc.abstractmethod
def stack_trace(self) -> str:
"""Stack trace for the exception."""
pass
@property
@abc.abstractmethod
def message(self) -> str:
"""Textual message describing the exception."""
pass
class TraceContext(abc.ABC):
"""Trace context object."""
@property
@abc.abstractmethod
def trace_state(self) -> str:
"""Gets trace state from trace-context."""
pass
@property
@abc.abstractmethod
def trace_parent(self) -> str:
"""Gets trace parent from trace-context."""
pass
@property
@abc.abstractmethod
def attributes(self) -> typing.Dict[str, str]:
"""Gets trace-context attributes."""
pass
class RetryContext(abc.ABC):
"""Retry Context object.
For more information refer: https://aka.ms/azfunc-retries-policies
"""
@property
@abc.abstractmethod
def retry_count(self) -> int:
"""Gets the current retry count from retry-context."""
pass
@property
@abc.abstractmethod
def max_retry_count(self) -> int:
"""Gets the max retry count from retry-context."""
pass
@property
@abc.abstractmethod
def exception(self) -> RpcException:
"""Gets the RpcException"""
pass
class Context(abc.ABC):
"""Function invocation context."""
@property
@abc.abstractmethod
def invocation_id(self) -> str:
"""Function invocation ID."""
pass
@property
@abc.abstractmethod
def function_name(self) -> str:
"""Function name."""
pass
@property
@abc.abstractmethod
def function_directory(self) -> str:
"""Function directory."""
pass
@property
@abc.abstractmethod
def trace_context(self) -> TraceContext:
"""Context for distributed tracing."""
pass
@property
@abc.abstractmethod
def retry_context(self) -> RetryContext:
"""Context for retries to the function."""
pass
class HttpRequest(abc.ABC):
"""HTTP request object."""
@property
@abc.abstractmethod
def method(self) -> str:
"""Request method."""
pass
@property
@abc.abstractmethod
def url(self) -> str:
"""Request URL."""
pass
@property
@abc.abstractmethod
def headers(self) -> typing.Mapping[str, str]:
"""A dictionary containing request headers."""
pass
@property
@abc.abstractmethod
def params(self) -> typing.Mapping[str, str]:
"""A dictionary containing request GET parameters."""
pass
@property
@abc.abstractmethod
def route_params(self) -> typing.Mapping[str, str]:
"""A dictionary containing request route parameters."""
pass
@abc.abstractmethod
def get_body(self) -> bytes:
"""Return request body as bytes."""
pass
@abc.abstractmethod
def get_json(self) -> typing.Any:
"""Decode and return request body as JSON.
:raises ValueError:
when the request does not contain valid JSON data.
"""
pass
class HttpResponse(abc.ABC):
@property
@abc.abstractmethod
def status_code(self) -> int:
pass
@property
@abc.abstractmethod
def mimetype(self):
pass
@property
@abc.abstractmethod
def charset(self):
pass
@property
@abc.abstractmethod
def headers(self) -> typing.MutableMapping[str, str]:
pass
@abc.abstractmethod
def get_body(self) -> bytes:
pass
class TimerRequest(abc.ABC):
"""Timer request object."""
@property
@abc.abstractmethod
def past_due(self) -> bool:
"""Whether the timer is past due."""
pass
class InputStream(io.BufferedIOBase, abc.ABC):
"""File-like object representing an input blob."""
@abc.abstractmethod
def read(self, size=-1) -> bytes:
"""Return and read up to *size* bytes.
:param int size:
The number of bytes to read. If the argument is omitted,
``None``, or negative, data is read and returned until
EOF is reached.
:return:
Bytes read from the input stream.
"""
pass
@property
@abc.abstractmethod
def name(self) -> typing.Optional[str]:
"""The name of the blob."""
pass
@property
@abc.abstractmethod
def length(self) -> typing.Optional[int]:
"""The size of the blob in bytes."""
pass
@property
@abc.abstractmethod
def uri(self) -> typing.Optional[str]:
"""The blob's primary location URI."""
pass
class QueueMessage(abc.ABC):
@property
@abc.abstractmethod
def id(self) -> typing.Optional[str]:
pass
@abc.abstractmethod
def get_body(self) -> typing.Union[str, bytes]:
pass
@abc.abstractmethod
def get_json(self) -> typing.Any:
pass
@property
@abc.abstractmethod
def dequeue_count(self) -> typing.Optional[int]:
pass
@property
@abc.abstractmethod
def expiration_time(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def insertion_time(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def time_next_visible(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def pop_receipt(self) -> typing.Optional[str]:
pass
class EventGridEvent(abc.ABC):
@property
@abc.abstractmethod
def id(self) -> str:
pass
@abc.abstractmethod
def get_json(self) -> typing.Any:
pass
@property
@abc.abstractmethod
def topic(self) -> str:
pass
@property
@abc.abstractmethod
def subject(self) -> str:
pass
@property
@abc.abstractmethod
def event_type(self) -> str:
pass
@property
@abc.abstractmethod
def event_time(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def data_version(self) -> str:
pass
class EventGridOutputEvent(abc.ABC):
@property
@abc.abstractmethod
def id(self) -> str:
pass
@abc.abstractmethod
def get_json(self) -> typing.Any:
pass
@property
@abc.abstractmethod
def subject(self) -> str:
pass
@property
@abc.abstractmethod
def event_type(self) -> str:
pass
@property
@abc.abstractmethod
def event_time(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def data_version(self) -> str:
pass
class Document(abc.ABC):
@classmethod
@abc.abstractmethod
def from_json(cls, json_data: str) -> 'Document':
pass
@classmethod
@abc.abstractmethod
def from_dict(cls, dct: dict) -> 'Document':
pass
@abc.abstractmethod
def __getitem__(self, key):
pass
@abc.abstractmethod
def __setitem__(self, key, value):
pass
@abc.abstractmethod
def to_json(self) -> str:
pass
class DocumentList(abc.ABC):
pass
class EventHubEvent(abc.ABC):
@abc.abstractmethod
def get_body(self) -> bytes:
pass
@property
@abc.abstractmethod
def partition_key(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def sequence_number(self) -> typing.Optional[int]:
pass
@property
@abc.abstractmethod
def iothub_metadata(self) -> typing.Optional[typing.Mapping[str, str]]:
pass
@property
@abc.abstractmethod
def enqueued_time(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def offset(self) -> typing.Optional[str]:
pass
class OrchestrationContext(abc.ABC):
@property
@abc.abstractmethod
def body(self) -> str:
pass

Просмотреть файл

@ -0,0 +1,231 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import collections.abc
import io
import json
import typing
import types
from . import _abc
from ._thirdparty.werkzeug import datastructures as _wk_datastructures
from ._thirdparty.werkzeug import formparser as _wk_parser
from ._thirdparty.werkzeug import http as _wk_http
class BaseHeaders(collections.abc.Mapping):
def __init__(self, source: typing.Optional[typing.Mapping] = None) -> None:
self.__http_headers__: typing.Dict[str, str] = {}
if source is not None:
self.__http_headers__.update(
{k.lower(): v for k, v in source.items()})
def __getitem__(self, key: str) -> str:
return self.__http_headers__[key.lower()]
def __len__(self):
return len(self.__http_headers__)
def __contains__(self, key: typing.Any):
return key.lower() in self.__http_headers__
def __iter__(self):
return iter(self.__http_headers__)
class HttpRequestHeaders(BaseHeaders):
pass
class HttpResponseHeaders(BaseHeaders, collections.abc.MutableMapping):
def __setitem__(self, key: str, value: str):
self.__http_headers__[key.lower()] = value
def __delitem__(self, key: str):
del self.__http_headers__[key.lower()]
class HttpResponse(_abc.HttpResponse):
"""An HTTP response object.
:param str/bytes body:
Optional response body.
:param int status_code:
Response status code. If not specified, defaults to 200.
:param dict headers:
An optional mapping containing response HTTP headers.
:param str mimetype:
An optional response MIME type. If not specified, defaults to
``'text/plain'``.
:param str charset:
Response content text encoding. If not specified, defaults to
``'utf-8'``.
"""
def __init__(self, body=None, *,
status_code=None, headers=None, mimetype=None, charset=None):
if status_code is None:
status_code = 200
self.__status_code = status_code
if mimetype is None:
mimetype = 'text/plain'
self.__mimetype = mimetype
if charset is None:
charset = 'utf-8'
self.__charset = charset
if headers is None:
headers = {}
self.__headers = HttpResponseHeaders(headers)
if body is not None:
self.__set_body(body)
else:
self.__body = b''
@property
def mimetype(self):
"""Response MIME type."""
return self.__mimetype
@property
def charset(self):
"""Response text encoding."""
return self.__charset
@property
def headers(self):
"""A dictionary of response HTTP headers."""
return self.__headers
@property
def status_code(self):
"""Response status code."""
return self.__status_code
def __set_body(self, body):
if isinstance(body, str):
body = body.encode(self.__charset)
if not isinstance(body, (bytes, bytearray)):
raise TypeError(
f'response is expected to be either of '
f'str, bytes, or bytearray, got {type(body).__name__}')
self.__body = bytes(body)
def get_body(self) -> bytes:
"""Response body as a bytes object."""
return self.__body
class HttpRequest(_abc.HttpRequest):
"""An HTTP request object.
:param str method:
HTTP request method name.
:param str url:
HTTP URL.
:param dict headers:
An optional mapping containing HTTP request headers.
:param dict params:
An optional mapping containing HTTP request params.
:param dict route_params:
An optional mapping containing HTTP request route params.
:param bytes body:
HTTP request body.
"""
def __init__(self,
method: str,
url: str, *,
headers: typing.Optional[typing.Mapping[str, str]] = None,
params: typing.Optional[typing.Mapping[str, str]] = None,
route_params: typing.Optional[
typing.Mapping[str, str]] = None,
body: bytes) -> None:
self.__method = method
self.__url = url
self.__headers = HttpRequestHeaders(headers or {})
self.__params = types.MappingProxyType(params or {})
self.__route_params = types.MappingProxyType(route_params or {})
self.__body_bytes = body
self.__form_parsed = False
self.__form = None
self.__files = None
@property
def url(self):
return self.__url
@property
def method(self):
return self.__method.upper()
@property
def headers(self):
return self.__headers
@property
def params(self):
return self.__params
@property
def route_params(self):
return self.__route_params
@property
def form(self):
self._parse_form_data()
return self.__form
@property
def files(self):
self._parse_form_data()
return self.__files
def get_body(self) -> bytes:
return self.__body_bytes
def get_json(self) -> typing.Any:
return json.loads(self.__body_bytes.decode('utf-8'))
def _parse_form_data(self):
if self.__form_parsed:
return
body = self.get_body()
content_type = self.headers.get('Content-Type', '')
content_length = len(body)
mimetype, options = _wk_http.parse_options_header(content_type)
parser = _wk_parser.FormDataParser(
_wk_parser.default_stream_factory,
options.get('charset') or 'utf-8',
'replace',
None,
None,
_wk_datastructures.ImmutableMultiDict,
)
body_stream = io.BytesIO(body)
_, self.__form, self.__files = parser.parse(
body_stream, mimetype, content_length, options
)
self.__form_parsed = True

Просмотреть файл

@ -0,0 +1,89 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import List, Tuple, Optional
from datetime import datetime, timedelta
def try_parse_datetime_with_formats(
datetime_str: str,
datetime_formats: List[str]
) -> Tuple[Optional[datetime], Optional[str], Optional[Exception]]:
"""Try parsing the datetime string with a list of formats
Parameters
----------
datetime_str: str
The datetime string needs to be parsed (e.g. 2018-12-12T03:16:34.2191Z)
datetime_formats: List[str]
A list of datetime formats that the parser would try to match
Returns
-------
dict_obj: A serializable dictionary with enough metadata to reconstruct
`obj`
Exceptions
----------
Tuple[Optional[datetime], Optional[str], Optional[Exception]]:
If the datetime can be successfully parsed, the first element is the
paresd datetime object and the second is the matched format.
If the datetime cannot be parsed, the first and second element will be
None, and the third is the exception from the datetime.strptime()
method.
"""
for fmt in datetime_formats:
try:
dt = datetime.strptime(datetime_str, fmt)
return (dt, fmt, None)
except ValueError as ve:
last_exception = ve
return (None, None, last_exception)
def try_parse_timedelta_with_formats(
timedelta_str: str,
timedelta_formats: List[str]
) -> Tuple[Optional[timedelta], Optional[str], Optional[Exception]]:
"""Try parsing the datetime delta string with a list of formats
Parameters
----------
timedelta_str: str
The timedelta string needs to be parsed (e.g. 12:34:56)
timedelta_formats: List[str]
A list of datetime formats that the parser would try to match
Returns
-------
dict_obj: A serializable dictionary with enough metadata to reconstruct
`obj`
Exceptions
----------
Tuple[Optional[timedelta], Optional[str], Optional[Exception]]:
If the timedelta can be successfully parsed, the first element is the
paresd timedelta object and the second is the matched format.
If the timedelta cannot be parsed, the first and second element will be
None, and the third is the exception from the datetime.strptime()
method.
"""
for fmt in timedelta_formats:
try:
# If singular form %S, %M, %H, will just return the timedelta
if fmt == '%S':
td = timedelta(seconds=int(timedelta_str))
elif fmt == '%M':
td = timedelta(minutes=int(timedelta_str))
elif fmt == '%H':
td = timedelta(hours=int(timedelta_str))
else:
dt = datetime.strptime(timedelta_str, fmt)
td = timedelta(hours=dt.hour,
minutes=dt.minute,
seconds=dt.second)
return (td, fmt, None)
except ValueError as ve:
last_exception = ve
return (None, None, last_exception)

Просмотреть файл

@ -0,0 +1,134 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import typing
from azure.functions import _abc as azf_abc
from azure.functions import _http as azf_http
from . import meta
class HttpRequest(azf_http.HttpRequest):
"""An HTTP request object."""
__body_bytes: typing.Optional[bytes]
__body_str: typing.Optional[str]
def __init__(self,
method: str,
url: str, *,
headers: typing.Mapping[str, str],
params: typing.Mapping[str, str],
route_params: typing.Mapping[str, str],
body_type: str,
body: typing.Union[str, bytes]) -> None:
body_str: typing.Optional[str] = None
body_bytes: typing.Optional[bytes] = None
if isinstance(body, str):
body_str = body
body_bytes = body_str.encode('utf-8')
elif isinstance(body, bytes):
body_bytes = body
else:
raise TypeError(
f'unexpected HTTP request body type: {type(body).__name__}')
super().__init__(method=method, url=url, headers=headers,
params=params, route_params=route_params,
body=body_bytes)
self.__body_type = body_type
self.__body_str = body_str
self.__body_bytes = body_bytes
def get_body(self) -> bytes:
if self.__body_bytes is None:
assert self.__body_str is not None
self.__body_bytes = self.__body_str.encode('utf-8')
return self.__body_bytes
def get_json(self) -> typing.Any:
if self.__body_type in ('json', 'string'):
assert self.__body_str is not None
return json.loads(self.__body_str)
elif self.__body_bytes is not None:
try:
return json.loads(self.__body_bytes.decode('utf-8'))
except ValueError as e:
raise ValueError(
'HTTP request does not contain valid JSON data') from e
else:
raise ValueError(
'Request body cannot be empty in JSON deserialization')
class HttpResponseConverter(meta.OutConverter, binding='http'):
@classmethod
def check_output_type_annotation(cls, pytype: type) -> bool:
return issubclass(pytype, (azf_abc.HttpResponse, str))
@classmethod
def encode(cls, obj: typing.Any, *,
expected_type: typing.Optional[type]) -> meta.Datum:
if isinstance(obj, str):
return meta.Datum(type='string', value=obj)
if isinstance(obj, azf_abc.HttpResponse):
status = obj.status_code
headers = dict(obj.headers)
if 'content-type' not in headers:
if obj.mimetype.startswith('text/'):
ct = f'{obj.mimetype}; charset={obj.charset}'
else:
ct = f'{obj.mimetype}'
headers['content-type'] = ct
body = obj.get_body()
if body is not None:
datum_body = meta.Datum(type='bytes', value=body)
else:
datum_body = meta.Datum(type='bytes', value=b'')
return meta.Datum(
type='http',
value=dict(
status_code=meta.Datum(type='string', value=str(status)),
headers={
n: meta.Datum(type='string', value=h)
for n, h in headers.items()
},
body=datum_body,
)
)
raise NotImplementedError
class HttpRequestConverter(meta.InConverter,
binding='httpTrigger', trigger=True):
@classmethod
def check_input_type_annotation(cls, pytype: type) -> bool:
return issubclass(pytype, azf_abc.HttpRequest)
@classmethod
def decode(cls, data: meta.Datum, *,
trigger_metadata) -> typing.Any:
if data.type != 'http':
raise NotImplementedError
val = data.value
return HttpRequest(
method=val['method'].value,
url=val['url'].value,
headers={n: v.value for n, v in val['headers'].items()},
params={n: v.value for n, v in val['query'].items()},
route_params={n: v.value for n, v in val['params'].items()},
body_type=val['body'].type,
body=val['body'].value,
)

Просмотреть файл

@ -0,0 +1,404 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import abc
import collections.abc
import datetime
import json
import re
from typing import Dict, Optional, Union, Tuple, Mapping, Any
from ._thirdparty import typing_inspect
from ._utils import (
try_parse_datetime_with_formats,
try_parse_timedelta_with_formats
)
def is_iterable_type_annotation(annotation: object, pytype: object) -> bool:
is_iterable_anno = (
typing_inspect.is_generic_type(annotation)
and issubclass(typing_inspect.get_origin(annotation),
collections.abc.Iterable)
)
if not is_iterable_anno:
return False
args = typing_inspect.get_args(annotation)
if not args:
return False
if isinstance(pytype, tuple):
return any(isinstance(t, type) and issubclass(t, arg)
for t in pytype for arg in args)
else:
return any(isinstance(pytype, type) and issubclass(pytype, arg)
for arg in args)
class Datum:
def __init__(self, value: Any, type: Optional[str]):
self.value: Any = value
self.type: Optional[str] = type
@property
def python_value(self) -> Any:
if self.value is None or self.type is None:
return None
elif self.type in ('bytes', 'string', 'int', 'double'):
return self.value
elif self.type == 'json':
return json.loads(self.value)
elif self.type == 'collection_string':
return [v for v in self.value.string]
elif self.type == 'collection_bytes':
return [v for v in self.value.bytes]
elif self.type == 'collection_double':
return [v for v in self.value.double]
elif self.type == 'collection_sint64':
return [v for v in self.value.sint64]
else:
return self.value
@property
def python_type(self) -> type:
return type(self.python_value)
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.value == other.value and self.type == other.type
def __hash__(self):
return hash((type(self), (self.value, self.type)))
def __repr__(self):
val_repr = repr(self.value)
if len(val_repr) > 10:
val_repr = val_repr[:10] + '...'
return '<Datum {} {}>'.format(self.type, val_repr)
class _ConverterMeta(abc.ABCMeta):
_bindings: Dict[str, type] = {}
def __new__(mcls, name, bases, dct, *,
binding: Optional[str],
trigger: Optional[str] = None):
cls = super().__new__(mcls, name, bases, dct)
cls._trigger = trigger # type: ignore
if binding is None:
return cls
if binding in mcls._bindings:
raise RuntimeError(
f'cannot register a converter for {binding!r} binding: '
f'another converter for this binding has already been '
f'registered')
mcls._bindings[binding] = cls
if trigger is not None:
mcls._bindings[trigger] = cls
return cls
@classmethod
def get(cls, binding_name):
return cls._bindings.get(binding_name)
def has_trigger_support(cls) -> bool:
return cls._trigger is not None # type: ignore
class _BaseConverter(metaclass=_ConverterMeta, binding=None):
@classmethod
def _decode_typed_data(
cls, data: Datum, *,
python_type: Union[type, Tuple[type, ...]],
context: str = 'data') -> Any:
if data is None:
return None
data_type = data.type
if data_type == 'json':
result = json.loads(data.value)
elif data_type == 'string':
result = data.value
elif data_type == 'int':
result = data.value
elif data_type == 'double':
result = data.value
elif data_type == 'collection_bytes':
result = data.value
elif data_type == 'collection_string':
result = data.value
elif data_type == 'collection_sint64':
result = data.value
elif data_type is None:
return None
else:
raise ValueError(
f'unsupported type of {context}: {data_type}')
if not isinstance(result, python_type):
if isinstance(python_type, (tuple, list, dict)):
raise ValueError(
f'unexpected value type in {context}: '
f'{type(result).__name__}, expected one of: '
f'{", ".join(t.__name__ for t in python_type)}')
else:
try:
# Try coercing into the requested type
result = python_type(result)
except (TypeError, ValueError) as e:
raise ValueError(
f'cannot convert value of {context} into '
f'{python_type.__name__}: {e}') from None
return result
@classmethod
def _decode_trigger_metadata_field(
cls, trigger_metadata: Mapping[str, Datum],
field: str, *,
python_type: Union[type, Tuple[type, ...]]) \
-> Any:
data = trigger_metadata.get(field)
if data is None:
return None
else:
return cls._decode_typed_data(
data, python_type=python_type,
context=f'field {field!r} in trigger metadata')
@classmethod
def _parse_datetime_metadata(
cls, trigger_metadata: Mapping[str, Datum],
field: str) -> Optional[datetime.datetime]:
datetime_str = cls._decode_trigger_metadata_field(
trigger_metadata, field, python_type=str)
if datetime_str is None:
return None
else:
return cls._parse_datetime(datetime_str)
@classmethod
def _parse_timedelta_metadata(
cls, trigger_metadata: Mapping[str, Datum],
field: str) -> Optional[datetime.timedelta]:
timedelta_str = cls._decode_trigger_metadata_field(
trigger_metadata, field, python_type=str)
if timedelta_str is None:
return None
else:
return cls._parse_timedelta(timedelta_str)
@classmethod
def _parse_datetime(
cls, datetime_str: Optional[str]) -> Optional[datetime.datetime]:
if not datetime_str:
return None
too_fractional = re.match(
r'(.*\.\d{6})(\d+)(Z|[\+|-]\d{1,2}:\d{1,2}){0,1}', datetime_str)
if too_fractional:
# The supplied value contains seven digits in the
# fractional second part, whereas Python expects
# a maxium of six, so strip it.
# https://github.com/Azure/azure-functions-python-worker/issues/269
datetime_str = too_fractional.group(1) + (
too_fractional.group(3) or '')
# Try parse time
utc_time, utc_time_error = cls._parse_datetime_utc(datetime_str)
if not utc_time_error and utc_time:
return utc_time.replace(tzinfo=datetime.timezone.utc)
local_time, local_time_error = cls._parse_datetime_local(datetime_str)
if not local_time_error and local_time:
return local_time.replace(tzinfo=None)
# Report error
if utc_time_error:
raise utc_time_error
elif local_time_error:
raise local_time_error
else:
return None
@classmethod
def _parse_timedelta(
cls,
timedelta_str: Optional[str]
) -> Optional[datetime.timedelta]:
if not timedelta_str:
return None
# Try parse timedelta
timedelta, td_error = cls._parse_timedelta_internal(timedelta_str)
if timedelta is not None:
return timedelta
# Report error
if td_error:
raise td_error
else:
return None
@classmethod
def _parse_datetime_utc(
cls,
datetime_str: str
) -> Tuple[Optional[datetime.datetime], Optional[Exception]]:
# UTC ISO 8601 assumed
# 2018-08-07T23:17:57.461050Z
utc_formats = [
'%Y-%m-%dT%H:%M:%S+00:00',
'%Y-%m-%dT%H:%M:%S-00:00',
'%Y-%m-%dT%H:%M:%S.%f+00:00',
'%Y-%m-%dT%H:%M:%S.%f-00:00',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%dT%H:%M:%S.%fZ',
'%m/%d/%Y %H:%M:%SZ',
'%m/%d/%Y %H:%M:%S.%fZ',
'%m/%d/%Y %H:%M:%S+00:00',
'%m/%d/%Y %H:%M:%S-00:00',
'%m/%d/%Y %H:%M:%S.%f+00:00',
'%m/%d/%Y %H:%M:%S.%f-00:00',
]
dt, _, excpt = try_parse_datetime_with_formats(
datetime_str, utc_formats)
if excpt is not None:
return None, excpt
return dt, None
@classmethod
def _parse_datetime_local(
cls, datetime_str: str
) -> Tuple[Optional[datetime.datetime], Optional[Exception]]:
"""Parse a string into a datetime object, accepts following formats
1. Without fractional seconds (e.g. 2018-08-07T23:17:57)
2. With fractional seconds (e.g. 2018-08-07T23:17:57.461050)
Parameters
----------
datetime_str: str
The string represents a datetime
Returns
-------
Tuple[Optional[datetime.datetime], Optional[Exception]]
If the datetime_str is None, will return None immediately.
If the datetime_str can be parsed correctly, it will return as the
first element in the tuple.
If the datetime_str cannot be parsed with all attempts, it will
return None in the first element, the exception in the second
element.
"""
local_formats = [
'%Y-%m-%dT%H:%M:%S.%f',
'%Y-%m-%dT%H:%M:%S',
'%m/%d/%YT%H:%M:%S.%f',
'%m/%d/%YT%H:%M:%S'
]
dt, _, excpt = try_parse_datetime_with_formats(
datetime_str, local_formats)
if excpt is not None:
return None, excpt
return dt, None
@classmethod
def _parse_timedelta_internal(
cls, timedelta_str: str
) -> Tuple[Optional[datetime.timedelta], Optional[Exception]]:
"""Parse a string into a timedelta object, accepts following formats
1. HH:MM:SS (e.g. 12:34:56)
2. MM:SS (e.g. 34:56)
3. Pure integer as seconds (e.g. 5819)
Parameters
----------
timedelta_str: str
The string represents a datetime
Returns
-------
Tuple[Optional[datetime.timedelta], Optional[Exception]]
If the timedelta_str is None, will return None immediately.
If the timedelta_str can be parsed correctly, it will return as the
first element in the tuple.
If the timedelta_str cannot be parsed with all attempts, it will
return None in the first element, the exception in the second
element.
"""
timedelta_formats = [
'%H:%M:%S',
'%M:%S',
'%S'
]
td, _, excpt = try_parse_timedelta_with_formats(
timedelta_str, timedelta_formats)
if td is not None:
return td, None
return None, excpt
class InConverter(_BaseConverter, binding=None):
@abc.abstractclassmethod
def check_input_type_annotation(cls, pytype: type) -> bool:
pass
@abc.abstractclassmethod
def decode(cls, data: Datum, *, trigger_metadata) -> Any:
raise NotImplementedError
@abc.abstractclassmethod
def has_implicit_output(cls) -> bool:
return False
class OutConverter(_BaseConverter, binding=None):
@abc.abstractclassmethod
def check_output_type_annotation(cls, pytype: type) -> bool:
pass
@abc.abstractclassmethod
def encode(cls, obj: Any, *,
expected_type: Optional[type]) -> Optional[Datum]:
raise NotImplementedError
def get_binding_registry():
return _ConverterMeta

Просмотреть файл

@ -0,0 +1,26 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from ._abc import Context, Out
from ._http import HttpRequest
from ._http import HttpResponse
from .meta import get_binding_registry
# Import binding implementations to register them
from . import http # NoQA
__all__ = (
# Functions
'get_binding_registry',
# Generics.
'Context',
'Out',
# Binding rich types, sorted alphabetically.
'HttpRequest',
'HttpResponse',
)
__version__ = '1.5.0'

Просмотреть файл

@ -0,0 +1,423 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import abc
import datetime
import io
import typing
T = typing.TypeVar('T')
class Out(abc.ABC, typing.Generic[T]):
"""An interface to set function output parameters."""
@abc.abstractmethod
def set(self, val: T) -> None:
"""Set the value of the output parameter."""
pass
@abc.abstractmethod
def get(self) -> T:
"""Get the value of the output parameter."""
pass
class RpcException:
"""Rpc Exception object."""
@property
@abc.abstractmethod
def source(self) -> str:
"""Source of the exception."""
pass
@property
@abc.abstractmethod
def stack_trace(self) -> str:
"""Stack trace for the exception."""
pass
@property
@abc.abstractmethod
def message(self) -> str:
"""Textual message describing the exception."""
pass
class TraceContext(abc.ABC):
"""Trace context object."""
@property
@abc.abstractmethod
def trace_state(self) -> str:
"""Gets trace state from trace-context."""
pass
@property
@abc.abstractmethod
def trace_parent(self) -> str:
"""Gets trace parent from trace-context."""
pass
@property
@abc.abstractmethod
def attributes(self) -> typing.Dict[str, str]:
"""Gets trace-context attributes."""
pass
class RetryContext(abc.ABC):
"""Retry Context object.
For more information refer: https://aka.ms/azfunc-retries-policies
"""
@property
@abc.abstractmethod
def retry_count(self) -> int:
"""Gets the current retry count from retry-context."""
pass
@property
@abc.abstractmethod
def max_retry_count(self) -> int:
"""Gets the max retry count from retry-context."""
pass
@property
@abc.abstractmethod
def exception(self) -> RpcException:
"""Gets the RpcException"""
pass
class Context(abc.ABC):
"""Function invocation context."""
@property
@abc.abstractmethod
def invocation_id(self) -> str:
"""Function invocation ID."""
pass
@property
@abc.abstractmethod
def function_name(self) -> str:
"""Function name."""
pass
@property
@abc.abstractmethod
def function_directory(self) -> str:
"""Function directory."""
pass
@property
@abc.abstractmethod
def trace_context(self) -> TraceContext:
"""Context for distributed tracing."""
pass
@property
@abc.abstractmethod
def retry_context(self) -> RetryContext:
"""Context for retries to the function."""
pass
class HttpRequest(abc.ABC):
"""HTTP request object."""
@property
@abc.abstractmethod
def method(self) -> str:
"""Request method."""
pass
@property
@abc.abstractmethod
def url(self) -> str:
"""Request URL."""
pass
@property
@abc.abstractmethod
def headers(self) -> typing.Mapping[str, str]:
"""A dictionary containing request headers."""
pass
@property
@abc.abstractmethod
def params(self) -> typing.Mapping[str, str]:
"""A dictionary containing request GET parameters."""
pass
@property
@abc.abstractmethod
def route_params(self) -> typing.Mapping[str, str]:
"""A dictionary containing request route parameters."""
pass
@abc.abstractmethod
def get_body(self) -> bytes:
"""Return request body as bytes."""
pass
@abc.abstractmethod
def get_json(self) -> typing.Any:
"""Decode and return request body as JSON.
:raises ValueError:
when the request does not contain valid JSON data.
"""
pass
class HttpResponse(abc.ABC):
@property
@abc.abstractmethod
def status_code(self) -> int:
pass
@property
@abc.abstractmethod
def mimetype(self):
pass
@property
@abc.abstractmethod
def charset(self):
pass
@property
@abc.abstractmethod
def headers(self) -> typing.MutableMapping[str, str]:
pass
@abc.abstractmethod
def get_body(self) -> bytes:
pass
class TimerRequest(abc.ABC):
"""Timer request object."""
@property
@abc.abstractmethod
def past_due(self) -> bool:
"""Whether the timer is past due."""
pass
class InputStream(io.BufferedIOBase, abc.ABC):
"""File-like object representing an input blob."""
@abc.abstractmethod
def read(self, size=-1) -> bytes:
"""Return and read up to *size* bytes.
:param int size:
The number of bytes to read. If the argument is omitted,
``None``, or negative, data is read and returned until
EOF is reached.
:return:
Bytes read from the input stream.
"""
pass
@property
@abc.abstractmethod
def name(self) -> typing.Optional[str]:
"""The name of the blob."""
pass
@property
@abc.abstractmethod
def length(self) -> typing.Optional[int]:
"""The size of the blob in bytes."""
pass
@property
@abc.abstractmethod
def uri(self) -> typing.Optional[str]:
"""The blob's primary location URI."""
pass
class QueueMessage(abc.ABC):
@property
@abc.abstractmethod
def id(self) -> typing.Optional[str]:
pass
@abc.abstractmethod
def get_body(self) -> typing.Union[str, bytes]:
pass
@abc.abstractmethod
def get_json(self) -> typing.Any:
pass
@property
@abc.abstractmethod
def dequeue_count(self) -> typing.Optional[int]:
pass
@property
@abc.abstractmethod
def expiration_time(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def insertion_time(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def time_next_visible(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def pop_receipt(self) -> typing.Optional[str]:
pass
class EventGridEvent(abc.ABC):
@property
@abc.abstractmethod
def id(self) -> str:
pass
@abc.abstractmethod
def get_json(self) -> typing.Any:
pass
@property
@abc.abstractmethod
def topic(self) -> str:
pass
@property
@abc.abstractmethod
def subject(self) -> str:
pass
@property
@abc.abstractmethod
def event_type(self) -> str:
pass
@property
@abc.abstractmethod
def event_time(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def data_version(self) -> str:
pass
class EventGridOutputEvent(abc.ABC):
@property
@abc.abstractmethod
def id(self) -> str:
pass
@abc.abstractmethod
def get_json(self) -> typing.Any:
pass
@property
@abc.abstractmethod
def subject(self) -> str:
pass
@property
@abc.abstractmethod
def event_type(self) -> str:
pass
@property
@abc.abstractmethod
def event_time(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def data_version(self) -> str:
pass
class Document(abc.ABC):
@classmethod
@abc.abstractmethod
def from_json(cls, json_data: str) -> 'Document':
pass
@classmethod
@abc.abstractmethod
def from_dict(cls, dct: dict) -> 'Document':
pass
@abc.abstractmethod
def __getitem__(self, key):
pass
@abc.abstractmethod
def __setitem__(self, key, value):
pass
@abc.abstractmethod
def to_json(self) -> str:
pass
class DocumentList(abc.ABC):
pass
class EventHubEvent(abc.ABC):
@abc.abstractmethod
def get_body(self) -> bytes:
pass
@property
@abc.abstractmethod
def partition_key(self) -> typing.Optional[str]:
pass
@property
@abc.abstractmethod
def sequence_number(self) -> typing.Optional[int]:
pass
@property
@abc.abstractmethod
def iothub_metadata(self) -> typing.Optional[typing.Mapping[str, str]]:
pass
@property
@abc.abstractmethod
def enqueued_time(self) -> typing.Optional[datetime.datetime]:
pass
@property
@abc.abstractmethod
def offset(self) -> typing.Optional[str]:
pass
class OrchestrationContext(abc.ABC):
@property
@abc.abstractmethod
def body(self) -> str:
pass

Просмотреть файл

@ -0,0 +1,231 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import collections.abc
import io
import json
import typing
import types
from . import _abc
from ._thirdparty.werkzeug import datastructures as _wk_datastructures
from ._thirdparty.werkzeug import formparser as _wk_parser
from ._thirdparty.werkzeug import http as _wk_http
class BaseHeaders(collections.abc.Mapping):
def __init__(self, source: typing.Optional[typing.Mapping] = None) -> None:
self.__http_headers__: typing.Dict[str, str] = {}
if source is not None:
self.__http_headers__.update(
{k.lower(): v for k, v in source.items()})
def __getitem__(self, key: str) -> str:
return self.__http_headers__[key.lower()]
def __len__(self):
return len(self.__http_headers__)
def __contains__(self, key: typing.Any):
return key.lower() in self.__http_headers__
def __iter__(self):
return iter(self.__http_headers__)
class HttpRequestHeaders(BaseHeaders):
pass
class HttpResponseHeaders(BaseHeaders, collections.abc.MutableMapping):
def __setitem__(self, key: str, value: str):
self.__http_headers__[key.lower()] = value
def __delitem__(self, key: str):
del self.__http_headers__[key.lower()]
class HttpResponse(_abc.HttpResponse):
"""An HTTP response object.
:param str/bytes body:
Optional response body.
:param int status_code:
Response status code. If not specified, defaults to 200.
:param dict headers:
An optional mapping containing response HTTP headers.
:param str mimetype:
An optional response MIME type. If not specified, defaults to
``'text/plain'``.
:param str charset:
Response content text encoding. If not specified, defaults to
``'utf-8'``.
"""
def __init__(self, body=None, *,
status_code=None, headers=None, mimetype=None, charset=None):
if status_code is None:
status_code = 200
self.__status_code = status_code
if mimetype is None:
mimetype = 'text/plain'
self.__mimetype = mimetype
if charset is None:
charset = 'utf-8'
self.__charset = charset
if headers is None:
headers = {}
self.__headers = HttpResponseHeaders(headers)
if body is not None:
self.__set_body(body)
else:
self.__body = b''
@property
def mimetype(self):
"""Response MIME type."""
return self.__mimetype
@property
def charset(self):
"""Response text encoding."""
return self.__charset
@property
def headers(self):
"""A dictionary of response HTTP headers."""
return self.__headers
@property
def status_code(self):
"""Response status code."""
return self.__status_code
def __set_body(self, body):
if isinstance(body, str):
body = body.encode(self.__charset)
if not isinstance(body, (bytes, bytearray)):
raise TypeError(
f'response is expected to be either of '
f'str, bytes, or bytearray, got {type(body).__name__}')
self.__body = bytes(body)
def get_body(self) -> bytes:
"""Response body as a bytes object."""
return self.__body
class HttpRequest(_abc.HttpRequest):
"""An HTTP request object.
:param str method:
HTTP request method name.
:param str url:
HTTP URL.
:param dict headers:
An optional mapping containing HTTP request headers.
:param dict params:
An optional mapping containing HTTP request params.
:param dict route_params:
An optional mapping containing HTTP request route params.
:param bytes body:
HTTP request body.
"""
def __init__(self,
method: str,
url: str, *,
headers: typing.Optional[typing.Mapping[str, str]] = None,
params: typing.Optional[typing.Mapping[str, str]] = None,
route_params: typing.Optional[
typing.Mapping[str, str]] = None,
body: bytes) -> None:
self.__method = method
self.__url = url
self.__headers = HttpRequestHeaders(headers or {})
self.__params = types.MappingProxyType(params or {})
self.__route_params = types.MappingProxyType(route_params or {})
self.__body_bytes = body
self.__form_parsed = False
self.__form = None
self.__files = None
@property
def url(self):
return self.__url
@property
def method(self):
return self.__method.upper()
@property
def headers(self):
return self.__headers
@property
def params(self):
return self.__params
@property
def route_params(self):
return self.__route_params
@property
def form(self):
self._parse_form_data()
return self.__form
@property
def files(self):
self._parse_form_data()
return self.__files
def get_body(self) -> bytes:
return self.__body_bytes
def get_json(self) -> typing.Any:
return json.loads(self.__body_bytes.decode('utf-8'))
def _parse_form_data(self):
if self.__form_parsed:
return
body = self.get_body()
content_type = self.headers.get('Content-Type', '')
content_length = len(body)
mimetype, options = _wk_http.parse_options_header(content_type)
parser = _wk_parser.FormDataParser(
_wk_parser.default_stream_factory,
options.get('charset') or 'utf-8',
'replace',
None,
None,
_wk_datastructures.ImmutableMultiDict,
)
body_stream = io.BytesIO(body)
_, self.__form, self.__files = parser.parse(
body_stream, mimetype, content_length, options
)
self.__form_parsed = True

Просмотреть файл

@ -0,0 +1,89 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import List, Tuple, Optional
from datetime import datetime, timedelta
def try_parse_datetime_with_formats(
datetime_str: str,
datetime_formats: List[str]
) -> Tuple[Optional[datetime], Optional[str], Optional[Exception]]:
"""Try parsing the datetime string with a list of formats
Parameters
----------
datetime_str: str
The datetime string needs to be parsed (e.g. 2018-12-12T03:16:34.2191Z)
datetime_formats: List[str]
A list of datetime formats that the parser would try to match
Returns
-------
dict_obj: A serializable dictionary with enough metadata to reconstruct
`obj`
Exceptions
----------
Tuple[Optional[datetime], Optional[str], Optional[Exception]]:
If the datetime can be successfully parsed, the first element is the
paresd datetime object and the second is the matched format.
If the datetime cannot be parsed, the first and second element will be
None, and the third is the exception from the datetime.strptime()
method.
"""
for fmt in datetime_formats:
try:
dt = datetime.strptime(datetime_str, fmt)
return (dt, fmt, None)
except ValueError as ve:
last_exception = ve
return (None, None, last_exception)
def try_parse_timedelta_with_formats(
timedelta_str: str,
timedelta_formats: List[str]
) -> Tuple[Optional[timedelta], Optional[str], Optional[Exception]]:
"""Try parsing the datetime delta string with a list of formats
Parameters
----------
timedelta_str: str
The timedelta string needs to be parsed (e.g. 12:34:56)
timedelta_formats: List[str]
A list of datetime formats that the parser would try to match
Returns
-------
dict_obj: A serializable dictionary with enough metadata to reconstruct
`obj`
Exceptions
----------
Tuple[Optional[timedelta], Optional[str], Optional[Exception]]:
If the timedelta can be successfully parsed, the first element is the
paresd timedelta object and the second is the matched format.
If the timedelta cannot be parsed, the first and second element will be
None, and the third is the exception from the datetime.strptime()
method.
"""
for fmt in timedelta_formats:
try:
# If singular form %S, %M, %H, will just return the timedelta
if fmt == '%S':
td = timedelta(seconds=int(timedelta_str))
elif fmt == '%M':
td = timedelta(minutes=int(timedelta_str))
elif fmt == '%H':
td = timedelta(hours=int(timedelta_str))
else:
dt = datetime.strptime(timedelta_str, fmt)
td = timedelta(hours=dt.hour,
minutes=dt.minute,
seconds=dt.second)
return (td, fmt, None)
except ValueError as ve:
last_exception = ve
return (None, None, last_exception)

Просмотреть файл

@ -0,0 +1,134 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
import typing
from azure.functions import _abc as azf_abc
from azure.functions import _http as azf_http
from . import meta
class HttpRequest(azf_http.HttpRequest):
"""An HTTP request object."""
__body_bytes: typing.Optional[bytes]
__body_str: typing.Optional[str]
def __init__(self,
method: str,
url: str, *,
headers: typing.Mapping[str, str],
params: typing.Mapping[str, str],
route_params: typing.Mapping[str, str],
body_type: str,
body: typing.Union[str, bytes]) -> None:
body_str: typing.Optional[str] = None
body_bytes: typing.Optional[bytes] = None
if isinstance(body, str):
body_str = body
body_bytes = body_str.encode('utf-8')
elif isinstance(body, bytes):
body_bytes = body
else:
raise TypeError(
f'unexpected HTTP request body type: {type(body).__name__}')
super().__init__(method=method, url=url, headers=headers,
params=params, route_params=route_params,
body=body_bytes)
self.__body_type = body_type
self.__body_str = body_str
self.__body_bytes = body_bytes
def get_body(self) -> bytes:
if self.__body_bytes is None:
assert self.__body_str is not None
self.__body_bytes = self.__body_str.encode('utf-8')
return self.__body_bytes
def get_json(self) -> typing.Any:
if self.__body_type in ('json', 'string'):
assert self.__body_str is not None
return json.loads(self.__body_str)
elif self.__body_bytes is not None:
try:
return json.loads(self.__body_bytes.decode('utf-8'))
except ValueError as e:
raise ValueError(
'HTTP request does not contain valid JSON data') from e
else:
raise ValueError(
'Request body cannot be empty in JSON deserialization')
class HttpResponseConverter(meta.OutConverter, binding='http'):
@classmethod
def check_output_type_annotation(cls, pytype: type) -> bool:
return issubclass(pytype, (azf_abc.HttpResponse, str))
@classmethod
def encode(cls, obj: typing.Any, *,
expected_type: typing.Optional[type]) -> meta.Datum:
if isinstance(obj, str):
return meta.Datum(type='string', value=obj)
if isinstance(obj, azf_abc.HttpResponse):
status = obj.status_code
headers = dict(obj.headers)
if 'content-type' not in headers:
if obj.mimetype.startswith('text/'):
ct = f'{obj.mimetype}; charset={obj.charset}'
else:
ct = f'{obj.mimetype}'
headers['content-type'] = ct
body = obj.get_body()
if body is not None:
datum_body = meta.Datum(type='bytes', value=body)
else:
datum_body = meta.Datum(type='bytes', value=b'')
return meta.Datum(
type='http',
value=dict(
status_code=meta.Datum(type='string', value=str(status)),
headers={
n: meta.Datum(type='string', value=h)
for n, h in headers.items()
},
body=datum_body,
)
)
raise NotImplementedError
class HttpRequestConverter(meta.InConverter,
binding='httpTrigger', trigger=True):
@classmethod
def check_input_type_annotation(cls, pytype: type) -> bool:
return issubclass(pytype, azf_abc.HttpRequest)
@classmethod
def decode(cls, data: meta.Datum, *,
trigger_metadata) -> typing.Any:
if data.type != 'http':
raise NotImplementedError
val = data.value
return HttpRequest(
method=val['method'].value,
url=val['url'].value,
headers={n: v.value for n, v in val['headers'].items()},
params={n: v.value for n, v in val['query'].items()},
route_params={n: v.value for n, v in val['params'].items()},
body_type=val['body'].type,
body=val['body'].value,
)

Просмотреть файл

@ -0,0 +1,404 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import abc
import collections.abc
import datetime
import json
import re
from typing import Dict, Optional, Union, Tuple, Mapping, Any
from ._thirdparty import typing_inspect
from ._utils import (
try_parse_datetime_with_formats,
try_parse_timedelta_with_formats
)
def is_iterable_type_annotation(annotation: object, pytype: object) -> bool:
is_iterable_anno = (
typing_inspect.is_generic_type(annotation)
and issubclass(typing_inspect.get_origin(annotation),
collections.abc.Iterable)
)
if not is_iterable_anno:
return False
args = typing_inspect.get_args(annotation)
if not args:
return False
if isinstance(pytype, tuple):
return any(isinstance(t, type) and issubclass(t, arg)
for t in pytype for arg in args)
else:
return any(isinstance(pytype, type) and issubclass(pytype, arg)
for arg in args)
class Datum:
def __init__(self, value: Any, type: Optional[str]):
self.value: Any = value
self.type: Optional[str] = type
@property
def python_value(self) -> Any:
if self.value is None or self.type is None:
return None
elif self.type in ('bytes', 'string', 'int', 'double'):
return self.value
elif self.type == 'json':
return json.loads(self.value)
elif self.type == 'collection_string':
return [v for v in self.value.string]
elif self.type == 'collection_bytes':
return [v for v in self.value.bytes]
elif self.type == 'collection_double':
return [v for v in self.value.double]
elif self.type == 'collection_sint64':
return [v for v in self.value.sint64]
else:
return self.value
@property
def python_type(self) -> type:
return type(self.python_value)
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.value == other.value and self.type == other.type
def __hash__(self):
return hash((type(self), (self.value, self.type)))
def __repr__(self):
val_repr = repr(self.value)
if len(val_repr) > 10:
val_repr = val_repr[:10] + '...'
return '<Datum {} {}>'.format(self.type, val_repr)
class _ConverterMeta(abc.ABCMeta):
_bindings: Dict[str, type] = {}
def __new__(mcls, name, bases, dct, *,
binding: Optional[str],
trigger: Optional[str] = None):
cls = super().__new__(mcls, name, bases, dct)
cls._trigger = trigger # type: ignore
if binding is None:
return cls
if binding in mcls._bindings:
raise RuntimeError(
f'cannot register a converter for {binding!r} binding: '
f'another converter for this binding has already been '
f'registered')
mcls._bindings[binding] = cls
if trigger is not None:
mcls._bindings[trigger] = cls
return cls
@classmethod
def get(cls, binding_name):
return cls._bindings.get(binding_name)
def has_trigger_support(cls) -> bool:
return cls._trigger is not None # type: ignore
class _BaseConverter(metaclass=_ConverterMeta, binding=None):
@classmethod
def _decode_typed_data(
cls, data: Datum, *,
python_type: Union[type, Tuple[type, ...]],
context: str = 'data') -> Any:
if data is None:
return None
data_type = data.type
if data_type == 'json':
result = json.loads(data.value)
elif data_type == 'string':
result = data.value
elif data_type == 'int':
result = data.value
elif data_type == 'double':
result = data.value
elif data_type == 'collection_bytes':
result = data.value
elif data_type == 'collection_string':
result = data.value
elif data_type == 'collection_sint64':
result = data.value
elif data_type is None:
return None
else:
raise ValueError(
f'unsupported type of {context}: {data_type}')
if not isinstance(result, python_type):
if isinstance(python_type, (tuple, list, dict)):
raise ValueError(
f'unexpected value type in {context}: '
f'{type(result).__name__}, expected one of: '
f'{", ".join(t.__name__ for t in python_type)}')
else:
try:
# Try coercing into the requested type
result = python_type(result)
except (TypeError, ValueError) as e:
raise ValueError(
f'cannot convert value of {context} into '
f'{python_type.__name__}: {e}') from None
return result
@classmethod
def _decode_trigger_metadata_field(
cls, trigger_metadata: Mapping[str, Datum],
field: str, *,
python_type: Union[type, Tuple[type, ...]]) \
-> Any:
data = trigger_metadata.get(field)
if data is None:
return None
else:
return cls._decode_typed_data(
data, python_type=python_type,
context=f'field {field!r} in trigger metadata')
@classmethod
def _parse_datetime_metadata(
cls, trigger_metadata: Mapping[str, Datum],
field: str) -> Optional[datetime.datetime]:
datetime_str = cls._decode_trigger_metadata_field(
trigger_metadata, field, python_type=str)
if datetime_str is None:
return None
else:
return cls._parse_datetime(datetime_str)
@classmethod
def _parse_timedelta_metadata(
cls, trigger_metadata: Mapping[str, Datum],
field: str) -> Optional[datetime.timedelta]:
timedelta_str = cls._decode_trigger_metadata_field(
trigger_metadata, field, python_type=str)
if timedelta_str is None:
return None
else:
return cls._parse_timedelta(timedelta_str)
@classmethod
def _parse_datetime(
cls, datetime_str: Optional[str]) -> Optional[datetime.datetime]:
if not datetime_str:
return None
too_fractional = re.match(
r'(.*\.\d{6})(\d+)(Z|[\+|-]\d{1,2}:\d{1,2}){0,1}', datetime_str)
if too_fractional:
# The supplied value contains seven digits in the
# fractional second part, whereas Python expects
# a maxium of six, so strip it.
# https://github.com/Azure/azure-functions-python-worker/issues/269
datetime_str = too_fractional.group(1) + (
too_fractional.group(3) or '')
# Try parse time
utc_time, utc_time_error = cls._parse_datetime_utc(datetime_str)
if not utc_time_error and utc_time:
return utc_time.replace(tzinfo=datetime.timezone.utc)
local_time, local_time_error = cls._parse_datetime_local(datetime_str)
if not local_time_error and local_time:
return local_time.replace(tzinfo=None)
# Report error
if utc_time_error:
raise utc_time_error
elif local_time_error:
raise local_time_error
else:
return None
@classmethod
def _parse_timedelta(
cls,
timedelta_str: Optional[str]
) -> Optional[datetime.timedelta]:
if not timedelta_str:
return None
# Try parse timedelta
timedelta, td_error = cls._parse_timedelta_internal(timedelta_str)
if timedelta is not None:
return timedelta
# Report error
if td_error:
raise td_error
else:
return None
@classmethod
def _parse_datetime_utc(
cls,
datetime_str: str
) -> Tuple[Optional[datetime.datetime], Optional[Exception]]:
# UTC ISO 8601 assumed
# 2018-08-07T23:17:57.461050Z
utc_formats = [
'%Y-%m-%dT%H:%M:%S+00:00',
'%Y-%m-%dT%H:%M:%S-00:00',
'%Y-%m-%dT%H:%M:%S.%f+00:00',
'%Y-%m-%dT%H:%M:%S.%f-00:00',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%dT%H:%M:%S.%fZ',
'%m/%d/%Y %H:%M:%SZ',
'%m/%d/%Y %H:%M:%S.%fZ',
'%m/%d/%Y %H:%M:%S+00:00',
'%m/%d/%Y %H:%M:%S-00:00',
'%m/%d/%Y %H:%M:%S.%f+00:00',
'%m/%d/%Y %H:%M:%S.%f-00:00',
]
dt, _, excpt = try_parse_datetime_with_formats(
datetime_str, utc_formats)
if excpt is not None:
return None, excpt
return dt, None
@classmethod
def _parse_datetime_local(
cls, datetime_str: str
) -> Tuple[Optional[datetime.datetime], Optional[Exception]]:
"""Parse a string into a datetime object, accepts following formats
1. Without fractional seconds (e.g. 2018-08-07T23:17:57)
2. With fractional seconds (e.g. 2018-08-07T23:17:57.461050)
Parameters
----------
datetime_str: str
The string represents a datetime
Returns
-------
Tuple[Optional[datetime.datetime], Optional[Exception]]
If the datetime_str is None, will return None immediately.
If the datetime_str can be parsed correctly, it will return as the
first element in the tuple.
If the datetime_str cannot be parsed with all attempts, it will
return None in the first element, the exception in the second
element.
"""
local_formats = [
'%Y-%m-%dT%H:%M:%S.%f',
'%Y-%m-%dT%H:%M:%S',
'%m/%d/%YT%H:%M:%S.%f',
'%m/%d/%YT%H:%M:%S'
]
dt, _, excpt = try_parse_datetime_with_formats(
datetime_str, local_formats)
if excpt is not None:
return None, excpt
return dt, None
@classmethod
def _parse_timedelta_internal(
cls, timedelta_str: str
) -> Tuple[Optional[datetime.timedelta], Optional[Exception]]:
"""Parse a string into a timedelta object, accepts following formats
1. HH:MM:SS (e.g. 12:34:56)
2. MM:SS (e.g. 34:56)
3. Pure integer as seconds (e.g. 5819)
Parameters
----------
timedelta_str: str
The string represents a datetime
Returns
-------
Tuple[Optional[datetime.timedelta], Optional[Exception]]
If the timedelta_str is None, will return None immediately.
If the timedelta_str can be parsed correctly, it will return as the
first element in the tuple.
If the timedelta_str cannot be parsed with all attempts, it will
return None in the first element, the exception in the second
element.
"""
timedelta_formats = [
'%H:%M:%S',
'%M:%S',
'%S'
]
td, _, excpt = try_parse_timedelta_with_formats(
timedelta_str, timedelta_formats)
if td is not None:
return td, None
return None, excpt
class InConverter(_BaseConverter, binding=None):
@abc.abstractclassmethod
def check_input_type_annotation(cls, pytype: type) -> bool:
pass
@abc.abstractclassmethod
def decode(cls, data: Datum, *, trigger_metadata) -> Any:
raise NotImplementedError
@abc.abstractclassmethod
def has_implicit_output(cls) -> bool:
return False
class OutConverter(_BaseConverter, binding=None):
@abc.abstractclassmethod
def check_output_type_annotation(cls, pytype: type) -> bool:
pass
@abc.abstractclassmethod
def encode(cls, obj: Any, *,
expected_type: Optional[type]) -> Optional[Datum]:
raise NotImplementedError
def get_binding_registry():
return _ConverterMeta

Просмотреть файл

@ -14,7 +14,6 @@ def main(req: func.HttpRequest) -> func.HttpResponse:
customer's dependencies. We have mock a .python_packages/ folder in
this e2e test function app which contains the following stub package:
azure.functions==1.2.1
protobuf==3.9.0
grpc==1.35.0
@ -29,7 +28,6 @@ def main(req: func.HttpRequest) -> func.HttpResponse:
"worker_deps_path": dm._get_worker_deps_path(),
},
"libraries": {
"func.expected.version": "1.2.1",
"func.version": func.__version__,
"func.file": func.__file__,
"proto.expected.version": "3.9.0",

Просмотреть файл

@ -1,121 +0,0 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import os
import importlib.util
from unittest.case import skipIf
from unittest.mock import patch
from requests import Response
from azure_functions_worker import testutils
from azure_functions_worker.utils.common import is_envvar_true
from azure_functions_worker.constants import PYAZURE_INTEGRATION_TEST
REQUEST_TIMEOUT_SEC = 5
class TestDependencyFunctionsOnDedicated(testutils.WebHostTestCase):
"""Test the dependency manager E2E scneraio via Http Trigger.
The following E2E tests ensures the dependency manager is behaving as
expected. They are tested against the dependency_functions/ folder which
contain a dummy .python_packages/ folder.
"""
project_root = testutils.E2E_TESTS_ROOT / 'dependency_functions'
customer_deps = project_root / '.python_packages' / 'lib' / 'site-packages'
@classmethod
def setUpClass(cls):
os_environ = os.environ.copy()
# Turn on feature flag
os_environ['PYTHON_ISOLATE_WORKER_DEPENDENCIES'] = '1'
# Emulate Python worker in Azure enviroment.
# For how the PYTHONPATH is set in Azure, check prodV3/worker.py.
os_environ['PYTHONPATH'] = str(cls.customer_deps)
cls._patch_environ = patch.dict('os.environ', os_environ)
cls._patch_environ.start()
super().setUpClass()
@classmethod
def tearDownClass(self):
super().tearDownClass()
self._patch_environ.stop()
@classmethod
def get_script_dir(cls):
return cls.project_root
@testutils.retryable_test(3, 5)
def test_dependency_function_should_return_ok(self):
"""The common scenario of general import should return OK in any
circumstances
"""
r: Response = self.webhost.request('GET', 'report_dependencies')
self.assertTrue(r.ok)
@testutils.retryable_test(3, 5)
def test_feature_flag_is_turned_on(self):
"""Since passing the feature flag PYTHON_ISOLATE_WORKER_DEPENDENCIES to
the host, the customer's function should also be able to receive it
"""
r: Response = self.webhost.request('GET', 'report_dependencies')
environments = r.json()['environments']
flag_value = environments['PYTHON_ISOLATE_WORKER_DEPENDENCIES']
self.assertEqual(flag_value, '1')
@testutils.retryable_test(3, 5)
def test_working_directory_resolution(self):
"""Check from the dependency manager and see if the current working
directory is resolved correctly
"""
r: Response = self.webhost.request('GET', 'report_dependencies')
environments = r.json()['environments']
dir = os.path.dirname(__file__)
self.assertEqual(
environments['AzureWebJobsScriptRoot'].lower(),
os.path.join(dir, 'dependency_functions').lower()
)
@skipIf(
is_envvar_true(PYAZURE_INTEGRATION_TEST),
'Integration test expects dependencies derived from core tools folder'
)
@testutils.retryable_test(3, 5)
def test_paths_resolution(self):
"""Dependency manager requires paths to be resolved correctly before
switching to customer's modules. This test is to ensure when the app
is in ready state, check if the paths are in good state.
"""
r: Response = self.webhost.request('GET', 'report_dependencies')
dm = r.json()['dependency_manager']
self.assertEqual(
dm['cx_working_dir'].lower(), str(self.project_root).lower()
)
self.assertEqual(
dm['cx_deps_path'].lower(), str(self.customer_deps).lower()
)
# Should dervie the package location from the built-in azure.functions
azf_spec = importlib.util.find_spec('azure.functions')
self.assertEqual(
dm['worker_deps_path'].lower(),
os.path.abspath(
os.path.join(os.path.dirname(azf_spec.origin), '..', '..')
).lower()
)
@testutils.retryable_test(3, 5)
def test_loading_libraries_from_customers_package(self):
"""Since the Python now loaded the customer's dependencies, the
libraries version should match the ones in .python_packages/ folder
"""
r: Response = self.webhost.request('GET', 'report_dependencies')
libraries = r.json()['libraries']
self.assertEqual(
libraries['proto.expected.version'], libraries['proto.version']
)
self.assertEqual(
libraries['grpc.expected.version'], libraries['grpc.version']
)

Просмотреть файл

@ -0,0 +1,224 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import os
import importlib.util
from unittest.case import skipIf
from unittest.mock import patch
from requests import Response
from azure_functions_worker import testutils
from azure_functions_worker.utils.common import is_envvar_true
from azure_functions_worker.constants import PYAZURE_INTEGRATION_TEST
REQUEST_TIMEOUT_SEC = 5
class TestGRPCandProtobufDependencyIsolationOnDedicated(
testutils.WebHostTestCase):
"""Test the dependency manager E2E scenario via Http Trigger.
The following E2E tests ensures the dependency manager is behaving as
expected. They are tested against the dependency_isolation_functions/
folder which contain a dummy .python_packages_grpc_protobuf folder.
This testcase checks if the customers library version of grpc and protobuf
are being loaded in the functionapp
"""
function_name = 'dependency_isolation_functions'
package_name = '.python_packages_grpc_protobuf'
project_root = testutils.E2E_TESTS_ROOT / function_name
customer_deps = project_root / package_name / 'lib' / 'site-packages'
@classmethod
def setUpClass(cls):
os_environ = os.environ.copy()
# Turn on feature flag
os_environ['PYTHON_ISOLATE_WORKER_DEPENDENCIES'] = '1'
# Emulate Python worker in Azure environment.
# For how the PYTHONPATH is set in Azure, check prodV3/worker.py.
os_environ['PYTHONPATH'] = str(cls.customer_deps)
cls._patch_environ = patch.dict('os.environ', os_environ)
cls._patch_environ.start()
super().setUpClass()
@classmethod
def tearDownClass(self):
super().tearDownClass()
self._patch_environ.stop()
@classmethod
def get_script_dir(cls):
return cls.project_root
@testutils.retryable_test(3, 5)
def test_dependency_function_should_return_ok(self):
"""The common scenario of general import should return OK in any
circumstances
"""
r: Response = self.webhost.request('GET', 'report_dependencies')
self.assertTrue(r.ok)
@testutils.retryable_test(3, 5)
def test_feature_flag_is_turned_on(self):
"""Since passing the feature flag PYTHON_ISOLATE_WORKER_DEPENDENCIES to
the host, the customer's function should also be able to receive it
"""
r: Response = self.webhost.request('GET', 'report_dependencies')
environments = r.json()['environments']
flag_value = environments['PYTHON_ISOLATE_WORKER_DEPENDENCIES']
self.assertEqual(flag_value, '1')
@testutils.retryable_test(3, 5)
def test_working_directory_resolution(self):
"""Check from the dependency manager and see if the current working
directory is resolved correctly
"""
r: Response = self.webhost.request('GET', 'report_dependencies')
environments = r.json()['environments']
dir = os.path.dirname(__file__)
self.assertEqual(
environments['AzureWebJobsScriptRoot'].lower(),
os.path.join(dir, 'dependency_isolation_functions').lower()
)
@skipIf(
is_envvar_true(PYAZURE_INTEGRATION_TEST),
'Integration test expects dependencies derived from core tools folder'
)
@testutils.retryable_test(3, 5)
def test_paths_resolution(self):
"""Dependency manager requires paths to be resolved correctly before
switching to customer's modules. This test is to ensure when the app
is in ready state, check if the paths are in good state.
"""
r: Response = self.webhost.request('GET', 'report_dependencies')
dm = r.json()['dependency_manager']
self.assertEqual(
dm['cx_working_dir'].lower(), str(self.project_root).lower()
)
self.assertEqual(
dm['cx_deps_path'].lower(), str(self.customer_deps).lower()
)
# Should derive the package location from the built-in azure.functions
azf_spec = importlib.util.find_spec('azure.functions')
self.assertEqual(
dm['worker_deps_path'].lower(),
os.path.abspath(
os.path.join(os.path.dirname(azf_spec.origin), '..', '..')
).lower()
)
@testutils.retryable_test(3, 5)
def test_loading_libraries_from_customers_package(self):
"""Since the Python now loaded the customer's dependencies, the
libraries version should match the ones in
.python_packages_grpc_protobuf/ folder
"""
r: Response = self.webhost.request('GET', 'report_dependencies')
libraries = r.json()['libraries']
self.assertEqual(
libraries['proto.expected.version'], libraries['proto.version']
)
self.assertEqual(
libraries['grpc.expected.version'], libraries['grpc.version']
)
class TestOlderVersionOfAzFuncDependencyIsolationOnDedicated(
testutils.WebHostTestCase):
"""Test the dependency manager E2E scenario via Http Trigger.
The following E2E tests ensures the dependency manager is behaving as
expected. They are tested against the dependency_isolation_functions/
folder which contain a dummy .python_packages_azf_older_version folder.
This testcase checks if the customers older library version of azure
functions is being loaded in the functionapp
"""
function_name = 'dependency_isolation_functions'
package_name = '.python_packages_azf_older_version'
project_root = testutils.E2E_TESTS_ROOT / function_name
customer_deps = project_root / package_name / 'lib' / 'site-packages'
expected_azfunc_version = '1.5.0'
@classmethod
def setUpClass(cls):
os_environ = os.environ.copy()
# Turn on feature flag
os_environ['PYTHON_ISOLATE_WORKER_DEPENDENCIES'] = '1'
# Emulate Python worker in Azure environment.
# For how the PYTHONPATH is set in Azure, check prodV3/worker.py.
os_environ['PYTHONPATH'] = str(cls.customer_deps)
cls._patch_environ = patch.dict('os.environ', os_environ)
cls._patch_environ.start()
super().setUpClass()
@classmethod
def tearDownClass(self):
super().tearDownClass()
self._patch_environ.stop()
@classmethod
def get_script_dir(cls):
return cls.project_root
@testutils.retryable_test(3, 5)
def test_loading_libraries_from_customers_package(self):
r: Response = self.webhost.request('GET', 'report_dependencies')
libraries = r.json()['libraries']
self.assertEqual(
self.expected_azfunc_version, libraries['func.version'])
class TestNewerVersionOfAzFuncDependencyIsolationOnDedicated(
testutils.WebHostTestCase):
"""Test the dependency manager E2E scenario via Http Trigger.
The following E2E tests ensures the dependency manager is behaving as
expected. They are tested against the dependency_isolation_functions/
folder which contain a dummy .python_packages_azf_newer_version folder.
This testcase checks if the customers newer library version of azure
functions is being loaded in the functionapp
"""
function_name = 'dependency_isolation_functions'
package_name = '.python_packages_azf_newer_version'
project_root = testutils.E2E_TESTS_ROOT / function_name
customer_deps = project_root / package_name / 'lib' / 'site-packages'
expected_azfunc_version = '9.9.9'
@classmethod
def setUpClass(cls):
os_environ = os.environ.copy()
# Turn on feature flag
os_environ['PYTHON_ISOLATE_WORKER_DEPENDENCIES'] = '1'
# Emulate Python worker in Azure environment.
# For how the PYTHONPATH is set in Azure, check prodV3/worker.py.
os_environ['PYTHONPATH'] = str(cls.customer_deps)
cls._patch_environ = patch.dict('os.environ', os_environ)
cls._patch_environ.start()
super().setUpClass()
@classmethod
def tearDownClass(self):
super().tearDownClass()
self._patch_environ.stop()
@classmethod
def get_script_dir(cls):
return cls.project_root
@testutils.retryable_test(3, 5)
def test_loading_libraries_from_customers_package(self):
r: Response = self.webhost.request('GET', 'report_dependencies')
libraries = r.json()['libraries']
self.assertEqual(
self.expected_azfunc_version, libraries['func.version'])