Supporting PyStein programming model in the Worker (#965)

This commit is contained in:
gavin-aguiar 2022-04-27 12:23:09 -05:00 коммит произвёл GitHub
Родитель 1312797281
Коммит 81b84102dc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
47 изменённых файлов: 1800 добавлений и 370 удалений

Просмотреть файл

@ -50,3 +50,8 @@ PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT_39 = True
# External Site URLs
MODULE_NOT_FOUND_TS_URL = "https://aka.ms/functions-modulenotfound"
# new programming model script file name
SCRIPT_FILE_NAME = "function_app.py"
PYTHON_LANGUAGE_RUNTIME = "python"

Просмотреть файл

@ -18,17 +18,13 @@ from typing import List, Optional
import grpc
from . import bindings
from . import constants
from . import functions
from . import loader
from . import protos
from . import bindings, constants, functions, loader, protos
from .bindings.shared_memory_data_transfer import SharedMemoryManager
from .constants import (PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
PYTHON_THREADPOOL_THREAD_COUNT_MAX_37,
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
PYTHON_ENABLE_DEBUG_LOGGING)
PYTHON_ENABLE_DEBUG_LOGGING, SCRIPT_FILE_NAME)
from .extension import ExtensionManager
from .logging import disable_console_logging, enable_console_logging
from .logging import enable_debug_logging_recommendation
@ -40,7 +36,6 @@ from .utils.tracing import marshall_exception_trace
from .utils.wrappers import disable_feature_by
from .version import VERSION
_TRUE = "true"
"""In Python 3.6, the current_task method was in the Task class, but got moved
@ -260,13 +255,13 @@ class Dispatcher(metaclass=DispatcherMeta):
resp = await request_handler(request)
self._grpc_resp_queue.put_nowait(resp)
async def _handle__worker_init_request(self, req):
async def _handle__worker_init_request(self, request):
logger.info('Received WorkerInitRequest, '
'python version %s, worker version %s, request ID %s',
sys.version, VERSION, self.request_id)
enable_debug_logging_recommendation()
worker_init_request = req.worker_init_request
worker_init_request = request.worker_init_request
host_capabilities = worker_init_request.capabilities
if constants.FUNCTION_DATA_CACHE in host_capabilities:
val = host_capabilities[constants.FUNCTION_DATA_CACHE]
@ -294,42 +289,93 @@ class Dispatcher(metaclass=DispatcherMeta):
result=protos.StatusResult(
status=protos.StatusResult.Success)))
async def _handle__worker_status_request(self, req):
async def _handle__worker_status_request(self, request):
# Logging is not necessary in this request since the response is used
# for host to judge scale decisions of out-of-proc languages.
# Having log here will reduce the responsiveness of the worker.
return protos.StreamingMessage(
request_id=req.request_id,
request_id=request.request_id,
worker_status_response=protos.WorkerStatusResponse())
async def _handle__function_load_request(self, req):
func_request = req.function_load_request
async def _handle__functions_metadata_request(self, request):
metadata_request = request.functions_metadata_request
directory = metadata_request.function_app_directory
function_path = os.path.join(directory, SCRIPT_FILE_NAME)
if not os.path.exists(function_path):
# Fallback to legacy model
logger.info(f"{SCRIPT_FILE_NAME} does not exist. "
"Switching to host indexing.")
return protos.StreamingMessage(
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
use_default_metadata_indexing=True,
result=protos.StatusResult(
status=protos.StatusResult.Success)))
try:
fx_metadata_results = []
indexed_functions = loader.index_function_app(function_path)
if indexed_functions:
indexed_function_logs: List[str] = []
for func in indexed_functions:
function_log = \
f"Function Name: {func.get_function_name()} " \
"Function Binding: " \
f"{[binding.name for binding in func.get_bindings()]}"
indexed_function_logs.append(function_log)
logger.info(
f'Successfully processed FunctionMetadataRequest for '
f'functions: {" ".join(indexed_function_logs)}')
fx_metadata_results = loader.process_indexed_function(
self._functions,
indexed_functions)
else:
logger.warning("No functions indexed. Please refer to the "
"documentation.")
return protos.StreamingMessage(
request_id=request.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
function_metadata_results=fx_metadata_results,
result=protos.StatusResult(
status=protos.StatusResult.Success)))
except Exception as ex:
return protos.StreamingMessage(
request_id=self.request_id,
function_metadata_response=protos.FunctionMetadataResponse(
result=protos.StatusResult(
status=protos.StatusResult.Failure,
exception=self._serialize_exception(ex))))
async def _handle__function_load_request(self, request):
func_request = request.function_load_request
function_id = func_request.function_id
function_name = func_request.metadata.name
logger.info(f'Received FunctionLoadRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id}'
f'function Name: {function_name}')
try:
func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)
if not self._functions.get_function(function_id):
func = loader.load_function(
func_request.metadata.name,
func_request.metadata.directory,
func_request.metadata.script_file,
func_request.metadata.entry_point)
self._functions.add_function(
function_id, func, func_request.metadata)
self._functions.add_function(
function_id, func, func_request.metadata)
ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)
ExtensionManager.function_load_extension(
function_name,
func_request.metadata.directory
)
logger.info('Successfully processed FunctionLoadRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id},'
f'function Name: {function_name}')
logger.info('Successfully processed FunctionLoadRequest, '
f'request ID: {self.request_id}, '
f'function ID: {function_id},'
f'function Name: {function_name}')
return protos.StreamingMessage(
request_id=self.request_id,
@ -347,8 +393,8 @@ class Dispatcher(metaclass=DispatcherMeta):
status=protos.StatusResult.Failure,
exception=self._serialize_exception(ex))))
async def _handle__invocation_request(self, req):
invoc_request = req.invocation_request
async def _handle__invocation_request(self, request):
invoc_request = request.invocation_request
invocation_id = invoc_request.invocation_id
function_id = invoc_request.function_id
@ -361,6 +407,7 @@ class Dispatcher(metaclass=DispatcherMeta):
try:
fi: functions.FunctionInfo = self._functions.get_function(
function_id)
assert fi is not None
function_invocation_logs: List[str] = [
'Received FunctionInvocationRequest',
@ -456,7 +503,7 @@ class Dispatcher(metaclass=DispatcherMeta):
status=protos.StatusResult.Failure,
exception=self._serialize_exception(ex))))
async def _handle__function_environment_reload_request(self, req):
async def _handle__function_environment_reload_request(self, request):
"""Only runs on Linux Consumption placeholder specialization.
"""
try:
@ -464,7 +511,8 @@ class Dispatcher(metaclass=DispatcherMeta):
'request ID: %s', self.request_id)
enable_debug_logging_recommendation()
func_env_reload_request = req.function_environment_reload_request
func_env_reload_request = \
request.function_environment_reload_request
# Import before clearing path cache so that the default
# azure.functions modules is available in sys.modules for
@ -523,7 +571,7 @@ class Dispatcher(metaclass=DispatcherMeta):
request_id=self.request_id,
function_environment_reload_response=failure_response)
async def _handle__close_shared_memory_resources_request(self, req):
async def _handle__close_shared_memory_resources_request(self, request):
"""
Frees any memory maps that were produced as output for a given
invocation.
@ -534,7 +582,7 @@ class Dispatcher(metaclass=DispatcherMeta):
If the cache is not enabled, the worker should free the resources as at
this point the host has read the memory maps and does not need them.
"""
close_request = req.close_shared_memory_resources_request
close_request = request.close_shared_memory_resources_request
map_names = close_request.map_names
# Assign default value of False to all result values.
# If we are successfully able to close a memory map, its result will be

Просмотреть файл

@ -1,16 +1,11 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import functools
import logging
from types import ModuleType
from typing import Any, Callable, List, Optional
import logging
import functools
from .utils.common import (
is_python_version,
get_sdk_from_sys_path,
get_sdk_version
)
from .utils.wrappers import enable_feature_by
from .constants import (
PYTHON_ISOLATE_WORKER_DEPENDENCIES,
PYTHON_ENABLE_WORKER_EXTENSIONS,
@ -18,7 +13,12 @@ from .constants import (
PYTHON_ENABLE_WORKER_EXTENSIONS_DEFAULT_39
)
from .logging import logger, SYSTEM_LOG_PREFIX
from .utils.common import (
is_python_version,
get_sdk_from_sys_path,
get_sdk_version
)
from .utils.wrappers import enable_feature_by
# Extension Hooks
FUNC_EXT_POST_FUNCTION_LOAD = "post_function_load"

Просмотреть файл

@ -2,21 +2,23 @@
# Licensed under the MIT License.
import inspect
import operator
import pathlib
import typing
from . import bindings
from azure.functions import DataType, Function
from . import bindings as bindings_utils
from . import protos
from ._thirdparty import typing_inspect
from .protos import BindingInfo
class ParamTypeInfo(typing.NamedTuple):
binding_name: str
pytype: typing.Optional[type]
class FunctionInfo(typing.NamedTuple):
func: typing.Callable
name: str
@ -38,63 +40,66 @@ class FunctionLoadError(RuntimeError):
class Registry:
_functions: typing.MutableMapping[str, FunctionInfo]
def __init__(self) -> None:
self._functions = {}
def get_function(self, function_id: str) -> FunctionInfo:
try:
if function_id in self._functions:
return self._functions[function_id]
except KeyError:
raise RuntimeError(
f'no function with function_id={function_id}') from None
def add_function(self, function_id: str,
func: typing.Callable,
metadata: protos.RpcFunctionMetadata):
func_name = metadata.name
sig = inspect.signature(func)
params = dict(sig.parameters)
annotations = typing.get_type_hints(func)
return None
input_types: typing.Dict[str, ParamTypeInfo] = {}
output_types: typing.Dict[str, ParamTypeInfo] = {}
return_binding_name: typing.Optional[str] = None
return_pytype: typing.Optional[type] = None
@staticmethod
def get_explicit_and_implicit_return(binding_name: str,
binding: BindingInfo,
explicit_return: bool,
implicit_return: bool,
bound_params: dict) -> \
typing.Tuple[bool, bool]:
if binding_name == '$return':
explicit_return = True
elif bindings_utils.has_implicit_output(
binding.type):
implicit_return = True
bound_params[binding_name] = binding
else:
bound_params[binding_name] = binding
return explicit_return, implicit_return
@staticmethod
def get_return_binding(binding_name: str,
binding_type: str,
return_binding_name: str) -> str:
if binding_name == "$return":
return_binding_name = binding_type
assert return_binding_name is not None
elif bindings_utils.has_implicit_output(binding_type):
return_binding_name = binding_type
return return_binding_name
@staticmethod
def validate_binding_direction(binding_name: str,
binding_direction: str,
func_name: str):
if binding_direction == protos.BindingInfo.inout:
raise FunctionLoadError(
func_name,
'"inout" bindings are not supported')
if binding_name == '$return' and \
binding_direction != protos.BindingInfo.out:
raise FunctionLoadError(
func_name,
'"$return" binding must have direction set to "out"')
@staticmethod
def is_context_required(params, bound_params: dict,
annotations: dict,
func_name: str) -> bool:
requires_context = False
has_explicit_return = False
has_implicit_return = False
bound_params = {}
for name, desc in metadata.bindings.items():
if desc.direction == protos.BindingInfo.inout:
raise FunctionLoadError(
func_name,
'"inout" bindings are not supported')
if name == '$return':
if desc.direction != protos.BindingInfo.out:
raise FunctionLoadError(
func_name,
'"$return" binding must have direction set to "out"')
has_explicit_return = True
return_binding_name = desc.type
assert return_binding_name is not None
elif bindings.has_implicit_output(desc.type):
# If the binding specify implicit output binding
# (e.g. orchestrationTrigger, activityTrigger)
# we should enable output even if $return is not specified
has_implicit_return = True
return_binding_name = desc.type
bound_params[name] = desc
else:
bound_params[name] = desc
if 'context' in params and 'context' not in bound_params:
requires_context = True
params.pop('context')
@ -107,7 +112,11 @@ class Registry:
'the "context" parameter is expected to be of '
'type azure.functions.Context, got '
f'{ctx_anno!r}')
return requires_context
@staticmethod
def validate_function_params(params: dict, bound_params: dict,
annotations: dict, func_name: str):
if set(params) - set(bound_params):
raise FunctionLoadError(
func_name,
@ -120,8 +129,11 @@ class Registry:
f'the following parameters are declared in function.json but '
f'not in Python: {set(bound_params) - set(params)!r}')
input_types: typing.Dict[str, ParamTypeInfo] = {}
output_types: typing.Dict[str, ParamTypeInfo] = {}
for param in params.values():
desc = bound_params[param.name]
binding = bound_params[param.name]
param_has_anno = param.name in annotations
param_anno = annotations.get(param.name)
@ -147,7 +159,7 @@ class Registry:
else:
is_param_out = False
is_binding_out = desc.direction == protos.BindingInfo.out
is_binding_out = binding.direction == protos.BindingInfo.out
if is_param_out:
param_anno_args = typing_inspect.get_args(param_anno)
@ -162,15 +174,14 @@ class Registry:
# so if the annotation was func.Out[typing.List[foo]],
# we need to reconstruct it.
if (isinstance(param_py_type, tuple)
and typing_inspect.is_generic_type(param_py_type[0])):
and typing_inspect.is_generic_type(param_py_type[0])):
param_py_type = operator.getitem(
param_py_type[0], *param_py_type[1:])
else:
param_py_type = param_anno
if (param_has_anno and not isinstance(param_py_type, type)
and not typing_inspect.is_generic_type(param_py_type)):
and not typing_inspect.is_generic_type(param_py_type)):
raise FunctionLoadError(
func_name,
f'binding {param.name} has invalid non-type annotation '
@ -191,33 +202,34 @@ class Registry:
'is azure.functions.Out in Python')
if param_has_anno and param_py_type in (str, bytes) and (
not bindings.has_implicit_output(desc.type)):
not bindings_utils.has_implicit_output(binding.type)):
param_bind_type = 'generic'
else:
param_bind_type = desc.type
param_bind_type = binding.type
if param_has_anno:
if is_param_out:
checks_out = bindings.check_output_type_annotation(
checks_out = bindings_utils.check_output_type_annotation(
param_bind_type, param_py_type)
else:
checks_out = bindings.check_input_type_annotation(
checks_out = bindings_utils.check_input_type_annotation(
param_bind_type, param_py_type)
if not checks_out:
if desc.data_type is not protos.BindingInfo.undefined:
if binding.data_type is not DataType(
protos.BindingInfo.undefined):
raise FunctionLoadError(
func_name,
f'{param.name!r} binding type "{desc.type}" '
f'and dataType "{desc.data_type}" in function.json'
f' do not match the corresponding function '
f'parameter\'s Python type '
f'{param.name!r} binding type "{binding.type}" '
f'and dataType "{binding.data_type}" in '
f'function.json do not match the corresponding '
f'function parameter\'s Python type '
f'annotation "{param_py_type.__name__}"')
else:
raise FunctionLoadError(
func_name,
f'type of {param.name} binding in function.json '
f'"{desc.type}" does not match its Python '
f'"{binding.type}" does not match its Python '
f'annotation "{param_py_type.__name__}"')
param_type_info = ParamTypeInfo(param_bind_type, param_py_type)
@ -225,12 +237,18 @@ class Registry:
output_types[param.name] = param_type_info
else:
input_types[param.name] = param_type_info
return input_types, output_types
@staticmethod
def get_function_return_type(annotations: dict, has_explicit_return: bool,
has_implicit_return: bool, binding_name: str,
func_name: str):
return_pytype = None
if has_explicit_return and 'return' in annotations:
return_anno = annotations.get('return')
if (typing_inspect.is_generic_type(return_anno)
and typing_inspect.get_origin(return_anno).__name__ == 'Out'):
if typing_inspect.is_generic_type(
return_anno) and typing_inspect.get_origin(
return_anno).__name__ == 'Out':
raise FunctionLoadError(
func_name,
'return annotation should not be azure.functions.Out')
@ -243,29 +261,152 @@ class Registry:
f'annotation {return_pytype!r}')
if return_pytype is (str, bytes):
return_binding_name = 'generic'
binding_name = 'generic'
if not bindings.check_output_type_annotation(
return_binding_name, return_pytype):
if not bindings_utils.check_output_type_annotation(
binding_name, return_pytype):
raise FunctionLoadError(
func_name,
f'Python return annotation "{return_pytype.__name__}" '
f'does not match binding type "{return_binding_name}"')
f'does not match binding type "{binding_name}"')
if has_implicit_return and 'return' in annotations:
return_pytype = annotations.get('return')
return_type = None
if has_explicit_return or has_implicit_return:
return_type = ParamTypeInfo(return_binding_name, return_pytype)
return_type = ParamTypeInfo(binding_name, return_pytype)
self._functions[function_id] = FunctionInfo(
func=func,
name=func_name,
directory=metadata.directory,
return return_type
def add_func_to_registry_and_return_funcinfo(self, function,
function_name: str,
function_id: str,
directory: str,
requires_context: bool,
has_explicit_return: bool,
has_implicit_return: bool,
input_types: typing.Dict[
str, ParamTypeInfo],
output_types: typing.Dict[
str, ParamTypeInfo],
return_type: str):
function_info = FunctionInfo(
func=function,
name=function_name,
directory=directory,
requires_context=requires_context,
is_async=inspect.iscoroutinefunction(func),
is_async=inspect.iscoroutinefunction(function),
has_return=has_explicit_return or has_implicit_return,
input_types=input_types,
output_types=output_types,
return_type=return_type)
self._functions[function_id] = function_info
return function_info
def add_function(self, function_id: str,
func: typing.Callable,
metadata: protos.RpcFunctionMetadata):
func_name = metadata.name
sig = inspect.signature(func)
params = dict(sig.parameters)
annotations = typing.get_type_hints(func)
return_binding_name: typing.Optional[str] = None
has_explicit_return = False
has_implicit_return = False
bound_params = {}
for binding_name, binding_info in metadata.bindings.items():
self.validate_binding_direction(binding_name,
binding_info.direction, func_name)
has_explicit_return, has_implicit_return = \
self.get_explicit_and_implicit_return(
binding_name, binding_info, has_explicit_return,
has_explicit_return, bound_params)
return_binding_name = self.get_return_binding(binding_name,
binding_info.type,
return_binding_name)
requires_context = self.is_context_required(params, bound_params,
annotations,
func_name)
input_types, output_types = self.validate_function_params(params,
bound_params,
annotations,
func_name)
return_type = \
self.get_function_return_type(annotations,
has_explicit_return,
has_implicit_return,
return_binding_name,
func_name)
self.add_func_to_registry_and_return_funcinfo(func, func_name,
function_id,
metadata.directory,
requires_context,
has_explicit_return,
has_implicit_return,
input_types,
output_types, return_type)
def add_indexed_function(self, function_id: str,
function: Function):
func = function.get_user_function()
func_name = function.get_function_name()
return_binding_name: typing.Optional[str] = None
has_explicit_return = False
has_implicit_return = False
sig = inspect.signature(func)
params = dict(sig.parameters)
annotations = typing.get_type_hints(func)
func_dir = str(pathlib.Path(inspect.getfile(func)).parent)
bound_params = {}
for binding in function.get_bindings():
self.validate_binding_direction(binding.name,
binding.direction,
func_name)
has_explicit_return, has_implicit_return = \
self.get_explicit_and_implicit_return(
binding.name, binding, has_explicit_return,
has_implicit_return, bound_params)
return_binding_name = self.get_return_binding(binding.name,
binding.type,
return_binding_name)
requires_context = self.is_context_required(params, bound_params,
annotations,
func_name)
input_types, output_types = self.validate_function_params(params,
bound_params,
annotations,
func_name)
return_type = \
self.get_function_return_type(annotations,
has_explicit_return,
has_implicit_return,
return_binding_name,
func_name)
return \
self.add_func_to_registry_and_return_funcinfo(func, func_name,
function_id,
func_dir,
requires_context,
has_explicit_return,
has_implicit_return,
input_types,
output_types,
return_type)

Просмотреть файл

@ -1,8 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
"""Python functions loader."""
import importlib
import importlib.machinery
import importlib.util
@ -10,10 +8,15 @@ import os
import os.path
import pathlib
import sys
import typing
import uuid
from os import PathLike, fspath
from typing import List, Optional, Dict
from .constants import MODULE_NOT_FOUND_TS_URL
from azure.functions import Function, FunctionApp
from . import protos, functions
from .constants import MODULE_NOT_FOUND_TS_URL, SCRIPT_FILE_NAME, \
PYTHON_LANGUAGE_RUNTIME
from .utils.wrappers import attach_message_to_exception
_AZURE_NAMESPACE = '__app__'
@ -44,6 +47,45 @@ def uninstall() -> None:
pass
def build_binding_protos(indexed_function: List[Function]) -> Dict:
binding_protos = {}
for binding in indexed_function.get_bindings():
binding_protos[binding.name] = protos.BindingInfo(
type=binding.type,
data_type=binding.data_type,
direction=binding.direction)
return binding_protos
def process_indexed_function(functions_registry: functions.Registry,
indexed_functions: List[Function]):
fx_metadata_results = []
for indexed_function in indexed_functions:
function_id = str(uuid.uuid4())
function_info = functions_registry.add_indexed_function(
function_id,
function=indexed_function)
binding_protos = build_binding_protos(indexed_function)
function_metadata = protos.RpcFunctionMetadata(
name=function_info.name,
function_id=function_id,
managed_dependency_enabled=False, # only enabled for PowerShell
directory=function_info.directory,
script_file=indexed_function.function_script_file,
entry_point=function_info.name,
is_proxy=False, # not supported in V4
language=PYTHON_LANGUAGE_RUNTIME,
bindings=binding_protos,
raw_bindings=indexed_function.get_raw_bindings())
fx_metadata_results.append(function_metadata)
return fx_metadata_results
@attach_message_to_exception(
expt_type=ImportError,
message=f'Please check the requirements.txt file for the missing module. '
@ -51,7 +93,7 @@ def uninstall() -> None:
f' guide: {MODULE_NOT_FOUND_TS_URL} '
)
def load_function(name: str, directory: str, script_file: str,
entry_point: typing.Optional[str]):
entry_point: Optional[str]):
dir_path = pathlib.Path(directory)
script_path = pathlib.Path(script_file) if script_file else pathlib.Path(
_DEFAULT_SCRIPT_FILENAME)
@ -93,3 +135,27 @@ def load_function(name: str, directory: str, script_file: str,
f'present in {rel_script_path}')
return func
@attach_message_to_exception(
expt_type=ImportError,
message=f'Troubleshooting Guide: {MODULE_NOT_FOUND_TS_URL}'
)
def index_function_app(function_path: str) -> List[Function]:
module_name = pathlib.Path(function_path).stem
imported_module = importlib.import_module(module_name)
app: Optional[FunctionApp] = None
for i in imported_module.__dir__():
if isinstance(getattr(imported_module, i, None), FunctionApp):
if not app:
app = getattr(imported_module, i, None)
else:
raise ValueError(
"Multiple instances of FunctionApp are defined")
if not app:
raise ValueError("Could not find instance of FunctionApp in "
f"{SCRIPT_FILE_NAME}.")
return app.get_functions()

Просмотреть файл

@ -1,10 +1,10 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from typing import Optional
import logging
import logging.handlers
import sys
from typing import Optional
# Logging Prefixes
CONSOLE_LOG_PREFIX = "LanguageWorkerConsoleLog"

Просмотреть файл

@ -28,4 +28,6 @@ from .FunctionRpc_pb2 import ( # NoQA
RpcSharedMemory,
RpcDataType,
CloseSharedMemoryResourcesRequest,
CloseSharedMemoryResourcesResponse)
CloseSharedMemoryResourcesResponse,
FunctionsMetadataRequest,
FunctionMetadataResponse)

Просмотреть файл

@ -67,7 +67,7 @@ message StreamingMessage {
// Worker logs a message back to the host
RpcLog rpc_log = 2;
FunctionEnvironmentReloadRequest function_environment_reload_request = 25;
FunctionEnvironmentReloadResponse function_environment_reload_response = 26;
@ -78,14 +78,20 @@ message StreamingMessage {
// Worker indexing message types
FunctionsMetadataRequest functions_metadata_request = 29;
FunctionMetadataResponses function_metadata_responses = 30;
FunctionMetadataResponse function_metadata_response = 30;
// Host sends required metadata to worker to load functions
FunctionLoadRequestCollection function_load_request_collection = 31;
// Host gets the list of function load responses
FunctionLoadResponseCollection function_load_response_collection = 32;
}
}
// Process.Start required info
// connection details
// protocol type
// protocol version
// protocol version
// Worker sends the host information identifying itself
message StartStream {
@ -93,7 +99,7 @@ message StartStream {
string worker_id = 2;
}
// Host requests the worker to initialize itself
// Host requests the worker to initialize itself
message WorkerInitRequest {
// version of the host sending init request
string host_version = 1;
@ -107,6 +113,9 @@ message WorkerInitRequest {
// Full path of worker.config.json location
string worker_directory = 4;
// base directory for function app
string function_app_directory = 5;
}
// Worker responds with the result of initializing itself
@ -181,7 +190,7 @@ message WorkerActionResponse {
Restart = 0;
Reload = 1;
}
// action for this response
Action action = 1;
@ -220,7 +229,17 @@ message CloseSharedMemoryResourcesResponse {
map<string, bool> close_map_results = 1;
}
// Host tells the worker to load a Function
// Host tells the worker to load a list of Functions
message FunctionLoadRequestCollection {
repeated FunctionLoadRequest function_load_requests = 1;
}
// Host gets the list of function load responses
message FunctionLoadResponseCollection {
repeated FunctionLoadResponse function_load_responses = 1;
}
// Load request of a single Function
message FunctionLoadRequest {
// unique function identifier (avoid name collisions, facilitate reload case)
string function_id = 1;
@ -252,7 +271,7 @@ message RpcFunctionMetadata {
// base directory for the Function
string directory = 1;
// Script file specified
string script_file = 2;
@ -273,6 +292,12 @@ message RpcFunctionMetadata {
// Raw binding info
repeated string raw_bindings = 10;
// unique function identifier (avoid name collisions, facilitate reload case)
string function_id = 13;
// A flag indicating if managed dependency is enabled or not
bool managed_dependency_enabled = 14;
}
// Host tells worker it is ready to receive metadata
@ -282,12 +307,15 @@ message FunctionsMetadataRequest {
}
// Worker sends function metadata back to host
message FunctionMetadataResponses {
message FunctionMetadataResponse {
// list of function indexing responses
repeated FunctionLoadRequest function_load_requests_results = 1;
repeated RpcFunctionMetadata function_metadata_results = 1;
// status of overall metadata request
StatusResult result = 2;
// if set to true then host will perform indexing
bool use_default_metadata_indexing = 3;
}
// Host requests worker to invoke a Function
@ -464,7 +492,7 @@ message BindingInfo {
DataType data_type = 4;
}
// Used to send logs back to the Host
// Used to send logs back to the Host
message RpcLog {
// Matching ILogger semantics
// https://github.com/aspnet/Logging/blob/9506ccc3f3491488fe88010ef8b9eb64594abf95/src/Microsoft.Extensions.Logging/Logger.cs
@ -481,8 +509,9 @@ message RpcLog {
// Category of the log. Defaults to User if not specified.
enum RpcLogCategory {
User = 0;
System = 1;
User = 0;
System = 1;
CustomMetric = 2;
}
// Unique id for invocation (if exists)
@ -504,14 +533,17 @@ message RpcLog {
// Exception (if exists)
RpcException exception = 6;
// json serialized property bag, or could use a type scheme like map<string, TypedData>
// json serialized property bag
string properties = 7;
// Category of the log. Either user(default) or system.
// Category of the log. Either user(default), system, or custom metric.
RpcLogCategory log_category = 8;
// strongly-typed (ish) property bag
map<string, TypedData> propertiesMap = 9;
}
// Encapsulates an Exception
// Encapsulates an Exception
message RpcException {
// Source of the exception
string source = 3;
@ -565,7 +597,7 @@ message RpcHttpCookie {
// TODO - solidify this or remove it
message RpcHttp {
string method = 1;
string url = 2;
string url = 2;
map<string,string> headers = 3;
TypedData body = 4;
map<string,string> params = 10;

Просмотреть файл

@ -33,22 +33,21 @@ import uuid
import grpc
import requests
from azure_functions_worker import dispatcher
from azure_functions_worker import protos
from azure_functions_worker._thirdparty import aio_compat
from azure_functions_worker.bindings.shared_memory_data_transfer \
import FileAccessorFactory
from azure_functions_worker.bindings.shared_memory_data_transfer \
import SharedMemoryConstants as consts
from . import dispatcher
from . import protos
from .constants import (
from azure_functions_worker.constants import (
PYAZURE_WEBHOST_DEBUG,
PYAZURE_WORKER_DIR,
PYAZURE_INTEGRATION_TEST,
FUNCTIONS_WORKER_SHARED_MEMORY_DATA_TRANSFER_ENABLED,
UNIX_SHARED_MEMORY_DIRECTORIES
)
from .utils.common import is_envvar_true, get_app_setting
from azure_functions_worker.utils.common import is_envvar_true, get_app_setting
PROJECT_ROOT = pathlib.Path(__file__).parent.parent
TESTS_ROOT = PROJECT_ROOT / 'tests'
@ -499,6 +498,18 @@ class _MockWebHost:
return r
async def get_functions_metadata(self):
r = await self.communicate(
protos.StreamingMessage(
functions_metadata_request=protos.FunctionsMetadataRequest(
function_app_directory=str(self._scripts_dir)
)
),
wait_for='function_metadata_response'
)
return r
async def load_function(self, name):
if name not in self._available_functions:
raise RuntimeError(f'cannot load function {name}')
@ -714,13 +725,14 @@ class _MockWebHostController:
def start_mockhost(*, script_root=FUNCS_PATH):
tests_dir = TESTS_ROOT
scripts_dir = tests_dir / script_root
scripts_dir = TESTS_ROOT / script_root
if not (scripts_dir.exists() and scripts_dir.is_dir()):
raise RuntimeError(
f'invalid script_root argument: '
f'{scripts_dir} directory does not exist')
sys.path.append(str(scripts_dir))
return _MockWebHostController(scripts_dir)
@ -765,6 +777,7 @@ def popen_webhost(*, stdout, stderr, script_root=FUNCS_PATH, port=None):
testconfig.read(WORKER_CONFIG)
hostexe_args = []
os.environ['AzureWebJobsFeatureFlags'] = 'EnableWorkerIndexing'
# If we want to use core-tools
coretools_exe = os.environ.get('CORE_TOOLS_EXE_PATH')
@ -836,7 +849,8 @@ def popen_webhost(*, stdout, stderr, script_root=FUNCS_PATH, port=None):
'languageWorkers:python:workerDirectory': str(worker_path),
'host:logger:consoleLoggingMode': 'always',
'AZURE_FUNCTIONS_ENVIRONMENT': 'development',
'AzureWebJobsSecretStorageType': 'files'
'AzureWebJobsSecretStorageType': 'files',
'FUNCTIONS_WORKER_RUNTIME': 'python'
}
# In E2E Integration mode, we should use the core tools worker
@ -898,39 +912,6 @@ def start_webhost(*, script_dir=None, stdout=None):
time.sleep(10) # Giving host some time to start fully.
addr = f'http://{LOCALHOST}:{port}'
health_check_endpoint = f'{addr}/api/ping'
host_out = ""
if stdout is not None and hasattr(stdout,
"readable") and stdout.readable():
host_out = stdout.readlines(100)
for _ in range(5):
try:
r = requests.get(health_check_endpoint,
params={'code': 'testFunctionKey'})
# Give the host a bit more time to settle
time.sleep(2)
if 200 <= r.status_code < 300:
# Give the host a bit more time to settle
time.sleep(1)
break
else:
print(f'Failed to ping {health_check_endpoint}, status code: '
f'{r.status_code}', flush=True)
except requests.exceptions.ConnectionError:
pass
time.sleep(1)
else:
proc.terminate()
try:
proc.wait(20)
except subprocess.TimeoutExpired:
proc.kill()
raise RuntimeError('could not start the webworker in time. Please'
f' check the log file for details: {stdout.name} \n'
f' Captured WebHost stdout:\n{host_out}')
return _WebHostProxy(proc, addr)
@ -985,7 +966,6 @@ def _symlink_dir(src, dst):
def _setup_func_app(app_root):
extensions = app_root / 'bin'
ping_func = app_root / 'ping'
host_json = app_root / 'host.json'
extensions_csproj_file = app_root / 'extensions.csproj'
@ -997,18 +977,16 @@ def _setup_func_app(app_root):
with open(extensions_csproj_file, 'w') as f:
f.write(EXTENSION_CSPROJ_TEMPLATE)
_symlink_dir(TESTS_ROOT / 'common' / 'ping', ping_func)
_symlink_dir(EXTENSIONS_PATH, extensions)
def _teardown_func_app(app_root):
extensions = app_root / 'bin'
ping_func = app_root / 'ping'
host_json = app_root / 'host.json'
extensions_csproj_file = app_root / 'extensions.csproj'
extensions_obj_file = app_root / 'obj'
for path in (extensions, ping_func, host_json, extensions_csproj_file,
for path in (extensions, host_json, extensions_csproj_file,
extensions_obj_file):
remove_path(path)

Просмотреть файл

@ -7,6 +7,7 @@
"supportedArchitectures":["X64", "X86"],
"extensions":[".py"],
"defaultExecutablePath":"python",
"defaultWorkerPath":"%FUNCTIONS_WORKER_RUNTIME_VERSION%/{os}/{architecture}/worker.py"
"defaultWorkerPath":"%FUNCTIONS_WORKER_RUNTIME_VERSION%/{os}/{architecture}/worker.py",
"workerIndexing": "true"
}
}

Просмотреть файл

@ -3,6 +3,7 @@
"language":"python",
"extensions":[".py"],
"defaultExecutablePath":"python",
"defaultWorkerPath":"worker.py"
"defaultWorkerPath":"worker.py",
"workerIndexing": "true"
}
}

Просмотреть файл

@ -0,0 +1,393 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import hashlib
import io
import json
import random
import string
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="blob_trigger")
@app.blob_trigger(arg_name="file",
path="python-worker-tests/test-blob-trigger.txt",
connection="AzureWebJobsStorage")
@app.write_blob(arg_name="$return",
path="python-worker-tests/test-blob-triggered.txt",
connection="AzureWebJobsStorage")
def blob_trigger(file: func.InputStream) -> str:
return json.dumps({
'name': file.name,
'length': file.length,
'content': file.read().decode('utf-8')
})
@app.function_name(name="get_blob_as_bytes")
@app.route(route="get_blob_as_bytes")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-bytes.txt",
data_type="BINARY",
connection="AzureWebJobsStorage")
def get_blob_as_bytes(req: func.HttpRequest, file: bytes) -> str:
assert isinstance(file, bytes)
return file.decode('utf-8')
@app.function_name(name="get_blob_as_bytes_return_http_response")
@app.route(route="get_blob_as_bytes_return_http_response")
@app.read_blob(arg_name="file",
path="python-worker-tests/shmem-test-bytes.txt",
data_type="BINARY",
connection="AzureWebJobsStorage")
def get_blob_as_bytes_return_http_response(req: func.HttpRequest, file: bytes) \
-> func.HttpResponse:
"""
Read a blob (bytes) and respond back (in HTTP response) with the number of
bytes read and the MD5 digest of the content.
"""
assert isinstance(file, bytes)
content_size = len(file)
content_md5 = hashlib.md5(file).hexdigest()
response_dict = {
'content_size': content_size,
'content_md5': content_md5
}
response_body = json.dumps(response_dict, indent=2)
return func.HttpResponse(
body=response_body,
mimetype="application/json",
status_code=200
)
@app.function_name(name="get_blob_as_bytes_stream_return_http_response")
@app.route(route="get_blob_as_bytes_stream_return_http_response")
@app.read_blob(arg_name="file",
path="python-worker-tests/shmem-test-bytes.txt",
data_type="BINARY",
connection="AzureWebJobsStorage")
def get_blob_as_bytes_stream_return_http_response(req: func.HttpRequest,
file: func.InputStream) \
-> func.HttpResponse:
"""
Read a blob (as azf.InputStream) and respond back (in HTTP response) with
the number of bytes read and the MD5 digest of the content.
"""
file_bytes = file.read()
content_size = len(file_bytes)
content_md5 = hashlib.md5(file_bytes).hexdigest()
response_dict = {
'content_size': content_size,
'content_md5': content_md5
}
response_body = json.dumps(response_dict, indent=2)
return func.HttpResponse(
body=response_body,
mimetype="application/json",
status_code=200
)
@app.function_name(name="get_blob_as_str")
@app.route(route="get_blob_as_str")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-str.txt",
data_type="STRING",
connection="AzureWebJobsStorage")
def get_blob_as_str(req: func.HttpRequest, file: str) -> str:
assert isinstance(file, str)
return file
@app.function_name(name="get_blob_as_str_return_http_response")
@app.route(route="get_blob_as_str_return_http_response")
@app.read_blob(arg_name="file",
path="python-worker-tests/shmem-test-bytes.txt",
data_type="STRING",
connection="AzureWebJobsStorage")
def get_blob_as_str_return_http_response(req: func.HttpRequest,
file: str) -> func.HttpResponse:
"""
Read a blob (string) and respond back (in HTTP response) with the number of
characters read and the MD5 digest of the utf-8 encoded content.
"""
assert isinstance(file, str)
num_chars = len(file)
content_bytes = file.encode('utf-8')
content_md5 = hashlib.md5(content_bytes).hexdigest()
response_dict = {
'num_chars': num_chars,
'content_md5': content_md5
}
response_body = json.dumps(response_dict, indent=2)
return func.HttpResponse(
body=response_body,
mimetype="application/json",
status_code=200
)
@app.function_name(name="get_blob_bytes")
@app.route(route="get_blob_bytes")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-bytes.txt",
connection="AzureWebJobsStorage")
def get_blob_bytes(req: func.HttpRequest, file: func.InputStream) -> str:
return file.read().decode('utf-8')
@app.function_name(name="get_blob_filelike")
@app.route(route="get_blob_filelike")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-filelike.txt",
connection="AzureWebJobsStorage")
def get_blob_filelike(req: func.HttpRequest, file: func.InputStream) -> str:
return file.read().decode('utf-8')
@app.function_name(name="get_blob_return")
@app.route(route="get_blob_return")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-return.txt",
connection="AzureWebJobsStorage")
def get_blob_return(req: func.HttpRequest, file: func.InputStream) -> str:
return file.read().decode('utf-8')
@app.function_name(name="get_blob_str")
@app.route(route="get_blob_str")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-str.txt",
connection="AzureWebJobsStorage")
def get_blob_str(req: func.HttpRequest, file: func.InputStream) -> str:
return file.read().decode('utf-8')
@app.function_name(name="get_blob_triggered")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-blob-triggered.txt",
connection="AzureWebJobsStorage")
@app.route(route="get_blob_triggered")
def get_blob_triggered(req: func.HttpRequest, file: func.InputStream) -> str:
return file.read().decode('utf-8')
@app.function_name(name="put_blob_as_bytes_return_http_response")
@app.write_blob(arg_name="file",
path="python-worker-tests/shmem-test-bytes-out.txt",
data_type="BINARY",
connection="AzureWebJobsStorage")
@app.route(route="put_blob_as_bytes_return_http_response")
def put_blob_as_bytes_return_http_response(req: func.HttpRequest,
file: func.Out[
bytes]) -> func.HttpResponse:
"""
Write a blob (bytes) and respond back (in HTTP response) with the number of
bytes written and the MD5 digest of the content.
The number of bytes to write are specified in the input HTTP request.
"""
content_size = int(req.params['content_size'])
# When this is set, then 0x01 byte is repeated content_size number of
# times to use as input.
# This is to avoid generating random input for large size which can be
# slow.
if 'no_random_input' in req.params:
content = b'\x01' * content_size
else:
content = bytearray(random.getrandbits(8) for _ in range(content_size))
content_md5 = hashlib.md5(content).hexdigest()
file.set(content)
response_dict = {
'content_size': content_size,
'content_md5': content_md5
}
response_body = json.dumps(response_dict, indent=2)
return func.HttpResponse(
body=response_body,
mimetype="application/json",
status_code=200
)
@app.function_name(name="put_blob_as_str_return_http_response")
@app.write_blob(arg_name="file",
path="python-worker-tests/shmem-test-str-out.txt",
data_type="STRING",
connection="AzureWebJobsStorage")
@app.route(route="put_blob_as_str_return_http_response")
def put_blob_as_str_return_http_response(req: func.HttpRequest, file: func.Out[
str]) -> func.HttpResponse:
"""
Write a blob (string) and respond back (in HTTP response) with the number of
characters written and the MD5 digest of the utf-8 encoded content.
The number of characters to write are specified in the input HTTP request.
"""
num_chars = int(req.params['num_chars'])
content = ''.join(random.choices(string.ascii_uppercase + string.digits,
k=num_chars))
content_bytes = content.encode('utf-8')
content_size = len(content_bytes)
content_md5 = hashlib.md5(content_bytes).hexdigest()
file.set(content)
response_dict = {
'num_chars': num_chars,
'content_size': content_size,
'content_md5': content_md5
}
response_body = json.dumps(response_dict, indent=2)
return func.HttpResponse(
body=response_body,
mimetype="application/json",
status_code=200
)
@app.function_name(name="put_blob_bytes")
@app.write_blob(arg_name="file",
path="python-worker-tests/test-bytes.txt",
connection="AzureWebJobsStorage")
@app.route(route="put_blob_bytes")
def put_blob_bytes(req: func.HttpRequest, file: func.Out[bytes]) -> str:
file.set(req.get_body())
return 'OK'
@app.function_name(name="put_blob_filelike")
@app.write_blob(arg_name="file",
path="python-worker-tests/test-filelike.txt",
connection="AzureWebJobsStorage")
@app.route(route="put_blob_filelike")
def put_blob_filelike(req: func.HttpRequest,
file: func.Out[io.StringIO]) -> str:
file.set(io.StringIO('filelike'))
return 'OK'
@app.function_name(name="put_blob_return")
@app.write_blob(arg_name="$return",
path="python-worker-tests/test-return.txt",
connection="AzureWebJobsStorage")
@app.route(route="put_blob_return", binding_arg_name="resp")
def put_blob_return(req: func.HttpRequest,
resp: func.Out[func.HttpResponse]) -> str:
return 'FROM RETURN'
@app.function_name(name="put_blob_str")
@app.write_blob(arg_name="file",
path="python-worker-tests/test-str.txt",
connection="AzureWebJobsStorage")
@app.route(route="put_blob_str")
def put_blob_str(req: func.HttpRequest, file: func.Out[str]) -> str:
file.set(req.get_body())
return 'OK'
@app.function_name(name="put_blob_trigger")
@app.write_blob(arg_name="file",
path="python-worker-tests/test-blob-trigger.txt",
connection="AzureWebJobsStorage")
@app.route(route="put_blob_trigger")
def put_blob_trigger(req: func.HttpRequest, file: func.Out[str]) -> str:
file.set(req.get_body())
return 'OK'
def _generate_content_and_digest(content_size):
content = bytearray(random.getrandbits(8) for _ in range(content_size))
content_md5 = hashlib.md5(content).hexdigest()
return content, content_md5
@app.function_name(name="put_get_multiple_blobs_as_bytes_return_http_response")
@app.read_blob(arg_name="inputfile1",
data_type="BINARY",
path="python-worker-tests/shmem-test-bytes-1.txt",
connection="AzureWebJobsStorage")
@app.read_blob(arg_name="inputfile2",
data_type="BINARY",
path="python-worker-tests/shmem-test-bytes-2.txt",
connection="AzureWebJobsStorage")
@app.write_blob(arg_name="outputfile1",
path="python-worker-tests/shmem-test-bytes-out-1.txt",
data_type="BINARY",
connection="AzureWebJobsStorage")
@app.write_blob(arg_name="outputfile2",
path="python-worker-tests/shmem-test-bytes-out-2.txt",
data_type="BINARY",
connection="AzureWebJobsStorage")
@app.route(route="put_get_multiple_blobs_as_bytes_return_http_response")
def put_get_multiple_blobs_as_bytes_return_http_response(
req: func.HttpRequest,
inputfile1: bytes,
inputfile2: bytes,
outputfile1: func.Out[bytes],
outputfile2: func.Out[bytes]) -> func.HttpResponse:
"""
Read two blobs (bytes) and respond back (in HTTP response) with the number
of bytes read from each blob and the MD5 digest of the content of each.
Write two blobs (bytes) and respond back (in HTTP response) with the number
bytes written in each blob and the MD5 digest of the content of each.
The number of bytes to write are specified in the input HTTP request.
"""
input_content_size_1 = len(inputfile1)
input_content_size_2 = len(inputfile2)
input_content_md5_1 = hashlib.md5(inputfile1).hexdigest()
input_content_md5_2 = hashlib.md5(inputfile2).hexdigest()
output_content_size_1 = int(req.params['output_content_size_1'])
output_content_size_2 = int(req.params['output_content_size_2'])
output_content_1, output_content_md5_1 = \
_generate_content_and_digest(output_content_size_1)
output_content_2, output_content_md5_2 = \
_generate_content_and_digest(output_content_size_2)
outputfile1.set(output_content_1)
outputfile2.set(output_content_2)
response_dict = {
'input_content_size_1': input_content_size_1,
'input_content_size_2': input_content_size_2,
'input_content_md5_1': input_content_md5_1,
'input_content_md5_2': input_content_md5_2,
'output_content_size_1': output_content_size_1,
'output_content_size_2': output_content_size_2,
'output_content_md5_1': output_content_md5_1,
'output_content_md5_2': output_content_md5_2
}
response_body = json.dumps(response_dict, indent=2)
return func.HttpResponse(
body=response_body,
mimetype="application/json",
status_code=200
)

Просмотреть файл

@ -0,0 +1,47 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import azure.functions as func
app = func.FunctionApp()
@app.route()
@app.read_cosmos_db_documents(
arg_name="docs", database_name="test",
collection_name="items",
id="cosmosdb-input-test",
connection_string_setting="AzureWebJobsCosmosDBConnectionString")
def cosmosdb_input(req: func.HttpRequest, docs: func.DocumentList) -> str:
return func.HttpResponse(docs[0].to_json(), mimetype='application/json')
@app.cosmos_db_trigger(
arg_name="docs", database_name="test",
collection_name="items",
lease_collection_name="leases",
connection_string_setting="AzureWebJobsCosmosDBConnectionString",
create_lease_collection_if_not_exists=True)
@app.write_blob(arg_name="$return", connection="AzureWebJobsStorage",
path="python-worker-tests/test-cosmosdb-triggered.txt")
def cosmosdb_trigger(docs: func.DocumentList) -> str:
return docs[0].to_json()
@app.route()
@app.read_blob(arg_name="file", connection="AzureWebJobsStorage",
path="python-worker-tests/test-cosmosdb-triggered.txt")
def get_cosmosdb_triggered(req: func.HttpRequest,
file: func.InputStream) -> str:
return file.read().decode('utf-8')
@app.route()
@app.write_cosmos_db_documents(
arg_name="doc", database_name="test",
collection_name="items",
create_if_not_exists=True,
connection_string_setting="AzureWebJobsCosmosDBConnectionString")
def put_document(req: func.HttpRequest, doc: func.Out[func.Document]):
doc.set(func.Document.from_json(req.get_body()))
return 'OK'

Просмотреть файл

@ -0,0 +1,108 @@
import json
import os
import typing
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
import azure.functions as func
app = func.FunctionApp()
# An HttpTrigger to generating EventHub event from EventHub Output Binding
@app.function_name(name="eventhub_output")
@app.route(route="eventhub_output")
@app.write_event_hub_message(arg_name="event",
event_hub_name="python-worker-ci-eventhub-one",
connection="AzureWebJobsEventHubConnectionString")
def eventhub_output(req: func.HttpRequest, event: func.Out[str]):
event.set(req.get_body().decode('utf-8'))
return 'OK'
# This is an actual EventHub trigger which will convert the event data
# into a storage blob.
@app.function_name(name="eventhub_trigger")
@app.event_hub_message_trigger(arg_name="event",
event_hub_name="python-worker-ci-eventhub-one",
connection="AzureWebJobsEventHubConnectionString"
)
@app.write_blob(arg_name="$return",
path="python-worker-tests/test-eventhub-triggered.txt",
connection="AzureWebJobsStorage")
def eventhub_trigger(event: func.EventHubEvent) -> bytes:
return event.get_body()
# Retrieve the event data from storage blob and return it as Http response
@app.function_name(name="get_eventhub_triggered")
@app.route(route="get_eventhub_triggered")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-eventhub-triggered.txt",
connection="AzureWebJobsStorage")
def get_eventhub_triggered(req: func.HttpRequest,
file: func.InputStream) -> str:
return file.read().decode('utf-8')
# Retrieve the event data from storage blob and return it as Http response
@app.function_name(name="get_metadata_triggered")
@app.route(route="get_metadata_triggered")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-metadata-triggered.txt",
connection="AzureWebJobsStorage")
async def get_metadata_triggered(req: func.HttpRequest,
file: func.InputStream) -> str:
return func.HttpResponse(body=file.read().decode('utf-8'),
status_code=200,
mimetype='application/json')
# An HttpTrigger to generating EventHub event from azure-eventhub SDK.
# Events generated from azure-eventhub contain the full metadata.
@app.function_name(name="metadata_output")
@app.route(route="metadata_output")
async def metadata_output(req: func.HttpRequest):
# Parse event metadata from http request
json_string = req.get_body().decode('utf-8')
event_dict = json.loads(json_string)
# Create an EventHub Client and event batch
client = EventHubProducerClient.from_connection_string(
os.getenv('AzureWebJobsEventHubConnectionString'),
eventhub_name='python-worker-ci-eventhub-one-metadata')
# Generate new event based on http request with full metadata
event_data_batch = await client.create_batch()
event_data_batch.add(EventData(event_dict.get('body')))
# Send out event into event hub
try:
await client.send_batch(event_data_batch)
finally:
await client.close()
return 'OK'
@app.function_name(name="metadata_trigger")
@app.event_hub_message_trigger(
arg_name="event",
event_hub_name="python-worker-ci-eventhub-one-metadata",
connection="AzureWebJobsEventHubConnectionString")
@app.write_blob(arg_name="$return",
path="python-worker-tests/test-metadata-triggered.txt",
connection="AzureWebJobsStorage")
async def metadata_trigger(event: func.EventHubEvent) -> bytes:
event_dict: typing.Mapping[str, typing.Any] = {
'body': event.get_body().decode('utf-8'),
# Uncomment this when the EnqueuedTimeUtc is fixed in azure-functions
# 'enqueued_time': event.enqueued_time.isoformat(),
'partition_key': event.partition_key,
'sequence_number': event.sequence_number,
'offset': event.offset,
'metadata': event.metadata
}
return json.dumps(event_dict)

Просмотреть файл

@ -0,0 +1,34 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import logging
import azure.functions as func
app = func.FunctionApp()
@app.route(route="default_template")
def default_template(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
name = req.params.get('name')
if not name:
try:
req_body = req.get_json()
except ValueError:
pass
else:
name = req_body.get('name')
if name:
return func.HttpResponse(
f"Hello, {name}. This HTTP triggered function "
f"executed successfully.")
else:
return func.HttpResponse(
"This HTTP triggered function executed successfully. "
"Pass a name in the query string or in the request body for a"
" personalized response.",
status_code=200
)

Просмотреть файл

@ -0,0 +1,185 @@
import json
import logging
import typing
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="get_queue_blob")
@app.route(route="get_queue_blob")
@app.read_blob(arg_name="file",
connection="AzureWebJobsStorage",
path="python-worker-tests/test-queue-blob.txt")
def get_queue_blob(req: func.HttpRequest, file: func.InputStream) -> str:
return json.dumps({
'queue': json.loads(file.read().decode('utf-8'))
})
@app.function_name(name="get_queue_blob_message_return")
@app.route(route="get_queue_blob_message_return")
@app.read_blob(arg_name="file",
connection="AzureWebJobsStorage",
path="python-worker-tests/test-queue-blob-message-return.txt")
def get_queue_blob_message_return(req: func.HttpRequest,
file: func.InputStream) -> str:
return file.read().decode('utf-8')
@app.function_name(name="get_queue_blob_return")
@app.route(route="get_queue_blob_return")
@app.read_blob(arg_name="file",
connection="AzureWebJobsStorage",
path="python-worker-tests/test-queue-blob-return.txt")
def get_queue_blob_return(req: func.HttpRequest, file: func.InputStream) -> str:
return file.read().decode('utf-8')
@app.function_name(name="get_queue_untyped_blob_return")
@app.route(route="get_queue_untyped_blob_return")
@app.read_blob(arg_name="file",
connection="AzureWebJobsStorage",
path="python-worker-tests/test-queue-untyped-blob-return.txt")
def get_queue_untyped_blob_return(req: func.HttpRequest,
file: func.InputStream) -> str:
return file.read().decode('utf-8')
@app.function_name(name="put_queue")
@app.route(route="put_queue")
@app.write_queue(arg_name="msg",
connection="AzureWebJobsStorage",
queue_name="testqueue")
def put_queue(req: func.HttpRequest, msg: func.Out[str]):
msg.set(req.get_body())
return 'OK'
@app.function_name(name="put_queue_message_return")
@app.route(route="put_queue_message_return", binding_arg_name="resp")
@app.write_queue(arg_name="$return",
connection="AzureWebJobsStorage",
queue_name="testqueue-message-return")
def main(req: func.HttpRequest, resp: func.Out[str]) -> bytes:
return func.QueueMessage(body=req.get_body())
@app.function_name("put_queue_multiple_out")
@app.route(route="put_queue_multiple_out", binding_arg_name="resp")
@app.write_queue(arg_name="msg",
connection="AzureWebJobsStorage",
queue_name="testqueue-return-multiple-outparam")
def put_queue_multiple_out(req: func.HttpRequest,
resp: func.Out[func.HttpResponse],
msg: func.Out[func.QueueMessage]) -> None:
data = req.get_body().decode()
msg.set(func.QueueMessage(body=data))
resp.set(func.HttpResponse(body='HTTP response: {}'.format(data)))
@app.function_name("put_queue_return")
@app.route(route="put_queue_return", binding_arg_name="resp")
@app.write_queue(arg_name="$return",
connection="AzureWebJobsStorage",
queue_name="testqueue-return")
def put_queue_return(req: func.HttpRequest, resp: func.Out[str]) -> bytes:
return req.get_body()
@app.function_name(name="put_queue_multiple_return")
@app.route(route="put_queue_multiple_return")
@app.write_queue(arg_name="msgs",
connection="AzureWebJobsStorage",
queue_name="testqueue-return-multiple")
def put_queue_multiple_return(req: func.HttpRequest,
msgs: func.Out[typing.List[str]]):
msgs.set(['one', 'two'])
@app.function_name(name="put_queue_untyped_return")
@app.route(route="put_queue_untyped_return", binding_arg_name="resp")
@app.write_queue(arg_name="$return",
connection="AzureWebJobsStorage",
queue_name="testqueue-untyped-return")
def put_queue_untyped_return(req: func.HttpRequest,
resp: func.Out[str]) -> bytes:
return func.QueueMessage(body=req.get_body())
@app.function_name(name="queue_trigger")
@app.queue_trigger(arg_name="msg",
queue_name="testqueue",
connection="AzureWebJobsStorage")
@app.write_blob(arg_name="$return",
connection="AzureWebJobsStorage",
path="python-worker-tests/test-queue-blob.txt")
def queue_trigger(msg: func.QueueMessage) -> str:
result = json.dumps({
'id': msg.id,
'body': msg.get_body().decode('utf-8'),
'expiration_time': (msg.expiration_time.isoformat()
if msg.expiration_time else None),
'insertion_time': (msg.insertion_time.isoformat()
if msg.insertion_time else None),
'time_next_visible': (msg.time_next_visible.isoformat()
if msg.time_next_visible else None),
'pop_receipt': msg.pop_receipt,
'dequeue_count': msg.dequeue_count
})
return result
@app.function_name(name="queue_trigger_message_return")
@app.queue_trigger(arg_name="msg",
queue_name="testqueue-message-return",
connection="AzureWebJobsStorage")
@app.write_blob(arg_name="$return",
connection="AzureWebJobsStorage",
path="python-worker-tests/test-queue-blob-message-return.txt")
def queue_trigger_message_return(msg: func.QueueMessage) -> bytes:
return msg.get_body()
@app.function_name(name="queue_trigger_return")
@app.queue_trigger(arg_name="msg",
queue_name="testqueue-return",
connection="AzureWebJobsStorage")
@app.write_blob(arg_name="$return",
connection="AzureWebJobsStorage",
path="python-worker-tests/test-queue-blob-return.txt")
def queue_trigger_return(msg: func.QueueMessage) -> bytes:
return msg.get_body()
@app.function_name(name="queue_trigger_return_multiple")
@app.queue_trigger(arg_name="msg",
queue_name="testqueue-return-multiple",
connection="AzureWebJobsStorage")
def queue_trigger_return_multiple(msg: func.QueueMessage) -> None:
logging.info('trigger on message: %s', msg.get_body().decode('utf-8'))
@app.function_name(name="queue_trigger_untyped")
@app.queue_trigger(arg_name="msg",
queue_name="testqueue-untyped-return",
connection="AzureWebJobsStorage")
@app.write_blob(arg_name="$return",
connection="AzureWebJobsStorage",
path="python-worker-tests/test-queue-untyped-blob-return.txt")
def queue_trigger_untyped(msg: str) -> str:
return msg
@app.function_name(name="put_queue_return_multiple")
@app.route(route="put_queue_return_multiple", binding_arg_name="resp")
@app.write_queue(arg_name="msgs",
connection="AzureWebJobsStorage",
queue_name="testqueue-return-multiple")
def put_queue_return_multiple(req: func.HttpRequest,
resp: func.Out[str],
msgs: func.Out[typing.List[str]]):
msgs.set(['one', 'two'])

Просмотреть файл

@ -1,7 +0,0 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import azure.functions as azf
def main(req: azf.HttpRequest) -> bytes:
return req.get_body()

Просмотреть файл

@ -1,18 +0,0 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"direction": "out",
"name": "$return",
"queueName": "testqueue-return",
"connection": "AzureWebJobsServiceBusConnectionString",
"type": "serviceBus"
}
]
}

Просмотреть файл

@ -0,0 +1,55 @@
import json
import azure.functions as func
app = func.FunctionApp()
@app.route(route="put_message")
@app.write_service_bus_queue(
arg_name="msg",
connection="AzureWebJobsServiceBusConnectionString",
queue_name="testqueue")
def put_message(req: func.HttpRequest, msg: func.Out[str]):
msg.set(req.get_body().decode('utf-8'))
return 'OK'
@app.route(route="get_servicebus_triggered")
@app.read_blob(arg_name="file",
path="python-worker-tests/test-servicebus-triggered.txt",
connection="AzureWebJobsStorage")
def get_servicebus_triggered(req: func.HttpRequest,
file: func.InputStream) -> str:
return func.HttpResponse(
file.read().decode('utf-8'), mimetype='application/json')
@app.service_bus_queue_trigger(
arg_name="msg",
connection="AzureWebJobsServiceBusConnectionString",
queue_name="testqueue")
@app.write_blob(arg_name="$return",
path="python-worker-tests/test-servicebus-triggered.txt",
connection="AzureWebJobsStorage")
def servicebus_trigger(msg: func.ServiceBusMessage) -> str:
result = json.dumps({
'message_id': msg.message_id,
'body': msg.get_body().decode('utf-8'),
'content_type': msg.content_type,
'delivery_count': msg.delivery_count,
'expiration_time': (msg.expiration_time.isoformat() if
msg.expiration_time else None),
'label': msg.label,
'partition_key': msg.partition_key,
'reply_to': msg.reply_to,
'reply_to_session_id': msg.reply_to_session_id,
'scheduled_enqueue_time': (msg.scheduled_enqueue_time.isoformat() if
msg.scheduled_enqueue_time else None),
'session_id': msg.session_id,
'time_to_live': msg.time_to_live,
'to': msg.to,
'user_properties': msg.user_properties,
})
return result

Просмотреть файл

@ -159,3 +159,11 @@ class TestBlobFunctions(testutils.WebHostTestCase):
except AssertionError:
if try_no == max_retries - 1:
raise
class TestBlobFunctionsStein(TestBlobFunctions):
@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'blob_functions' / \
'blob_functions_stein'

Просмотреть файл

@ -80,3 +80,11 @@ class TestCosmosDBFunctions(testutils.WebHostTestCase):
raise
else:
break
class TestCosmosDBFunctionsStein(TestCosmosDBFunctions):
@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'cosmosdb_functions' / \
'cosmosdb_functions_stein'

Просмотреть файл

@ -3,6 +3,7 @@
import json
import time
from datetime import datetime
from dateutil import parser, tz
from azure_functions_worker import testutils
@ -97,3 +98,11 @@ class TestEventHubFunctions(testutils.WebHostTestCase):
self.assertIsNone(sys_props['PartitionKey'])
self.assertGreaterEqual(sys_props['SequenceNumber'], 0)
self.assertIsNotNone(sys_props['Offset'])
class TestEventHubFunctionsStein(TestEventHubFunctions):
@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'eventhub_functions' / \
'eventhub_functions_stein'

Просмотреть файл

@ -4,6 +4,7 @@ import os
from unittest.mock import patch
import requests
from azure_functions_worker import testutils
REQUEST_TIMEOUT_SEC = 5
@ -105,3 +106,11 @@ class TestHttpFunctions(testutils.WebHostTestCase):
params={'checkHealth': '1'},
timeout=REQUEST_TIMEOUT_SEC)
self.assertTrue(r.ok)
class TestHttpFunctionsStein(TestHttpFunctions):
@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'http_functions' /\
'http_functions_stein'

Просмотреть файл

@ -91,3 +91,11 @@ class TestQueueFunctions(testutils.WebHostTestCase):
f"Returned status code {r.status_code}, "
"not in the 200-300 range.")
self.assertEqual(r.text, 'HTTP response: foo')
class TestQueueFunctionsStein(TestQueueFunctions):
@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'queue_functions' / \
'queue_functions_stein'

Просмотреть файл

@ -36,3 +36,11 @@ class TestServiceBusFunctions(testutils.WebHostTestCase):
raise
else:
break
class TestServiceBusFunctionsStein(TestServiceBusFunctions):
@classmethod
def get_script_dir(cls):
return testutils.E2E_TESTS_FOLDER / 'servicebus_functions' / \
'servicebus_functions_stein'

Просмотреть файл

@ -5,8 +5,7 @@ import sys
import shutil
import asyncio
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
async def vertify_nested_namespace_import():

Просмотреть файл

@ -0,0 +1,8 @@
import azure.functions as func
app = func.FunctionApp()
@app.route()
def main():
pass

Просмотреть файл

@ -0,0 +1,8 @@
import azure.functions as func
app = func.FunctionApp()
@app.route()
def main(req: func.HttpRequest):
pass

Просмотреть файл

@ -0,0 +1,320 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import asyncio
import hashlib
import json
import logging
import sys
import time
from urllib.request import urlopen
import azure.functions as func
app = func.FunctionApp()
logger = logging.getLogger("my-function")
@app.route(route="return_str")
def return_str(req: func.HttpRequest) -> str:
return 'Hello World!'
@app.route(route="accept_json")
def accept_json(req: func.HttpRequest):
return json.dumps({
'method': req.method,
'url': req.url,
'headers': dict(req.headers),
'params': dict(req.params),
'get_body': req.get_body().decode(),
'get_json': req.get_json()
})
async def nested():
try:
1 / 0
except ZeroDivisionError:
logger.error('and another error', exc_info=True)
@app.route(route="async_logging")
async def async_logging(req: func.HttpRequest):
logger.info('hello %s', 'info')
await asyncio.sleep(0.1)
# Create a nested task to check if invocation_id is still
# logged correctly.
await asyncio.ensure_future(nested())
await asyncio.sleep(0.1)
return 'OK-async'
@app.route(route="async_return_str")
async def async_return_str(req: func.HttpRequest):
await asyncio.sleep(0.1)
return 'Hello Async World!'
@app.route(route="debug_logging")
def debug_logging(req: func.HttpRequest):
logging.critical('logging critical', exc_info=True)
logging.info('logging info', exc_info=True)
logging.warning('logging warning', exc_info=True)
logging.debug('logging debug', exc_info=True)
logging.error('logging error', exc_info=True)
return 'OK-debug'
@app.route(route="debug_user_logging")
def debug_user_logging(req: func.HttpRequest):
logger.setLevel(logging.DEBUG)
logging.critical('logging critical', exc_info=True)
logger.info('logging info', exc_info=True)
logger.warning('logging warning', exc_info=True)
logger.debug('logging debug', exc_info=True)
logger.error('logging error', exc_info=True)
return 'OK-user-debug'
# Attempt to log info into system log from customer code
disguised_logger = logging.getLogger('azure_functions_worker')
async def parallelly_print():
await asyncio.sleep(0.1)
print('parallelly_print')
async def parallelly_log_info():
await asyncio.sleep(0.2)
logging.info('parallelly_log_info at root logger')
async def parallelly_log_warning():
await asyncio.sleep(0.3)
logging.warning('parallelly_log_warning at root logger')
async def parallelly_log_error():
await asyncio.sleep(0.4)
logging.error('parallelly_log_error at root logger')
async def parallelly_log_exception():
await asyncio.sleep(0.5)
try:
raise Exception('custom exception')
except Exception:
logging.exception('parallelly_log_exception at root logger',
exc_info=sys.exc_info())
async def parallelly_log_custom():
await asyncio.sleep(0.6)
logger.info('parallelly_log_custom at custom_logger')
async def parallelly_log_system():
await asyncio.sleep(0.7)
disguised_logger.info('parallelly_log_system at disguised_logger')
@app.route(route="hijack_current_event_loop")
async def hijack_current_event_loop(req: func.HttpRequest) -> func.HttpResponse:
loop = asyncio.get_event_loop()
# Create multiple tasks and schedule it into one asyncio.wait blocker
task_print: asyncio.Task = loop.create_task(parallelly_print())
task_info: asyncio.Task = loop.create_task(parallelly_log_info())
task_warning: asyncio.Task = loop.create_task(parallelly_log_warning())
task_error: asyncio.Task = loop.create_task(parallelly_log_error())
task_exception: asyncio.Task = loop.create_task(parallelly_log_exception())
task_custom: asyncio.Task = loop.create_task(parallelly_log_custom())
task_disguise: asyncio.Task = loop.create_task(parallelly_log_system())
# Create an awaitable future and occupy the current event loop resource
future = loop.create_future()
loop.call_soon_threadsafe(future.set_result, 'callsoon_log')
# WaitAll
await asyncio.wait([task_print, task_info, task_warning, task_error,
task_exception, task_custom, task_disguise, future])
# Log asyncio low-level future result
logging.info(future.result())
return 'OK-hijack-current-event-loop'
@app.route(route="no_return")
def no_return(req: func.HttpRequest):
logger.info('hi')
@app.route(route="no_return_returns")
def no_return_returns(req):
return 'ABC'
@app.route(route="print_logging")
def print_logging(req: func.HttpRequest):
flush_required = False
is_console_log = False
is_stderr = False
message = req.params.get('message', '')
if req.params.get('flush') == 'true':
flush_required = True
if req.params.get('console') == 'true':
is_console_log = True
if req.params.get('is_stderr') == 'true':
is_stderr = True
# Adding LanguageWorkerConsoleLog will make function host to treat
# this as system log and will be propagated to kusto
prefix = 'LanguageWorkerConsoleLog' if is_console_log else ''
print(f'{prefix} {message}'.strip(),
file=sys.stderr if is_stderr else sys.stdout,
flush=flush_required)
return 'OK-print-logging'
@app.route(route="raw_body_bytes")
def raw_body_bytes(req: func.HttpRequest) -> func.HttpResponse:
body = req.get_body()
body_len = str(len(body))
headers = {'body-len': body_len}
return func.HttpResponse(body=body, status_code=200, headers=headers)
@app.route(route="remapped_context")
def remapped_context(req: func.HttpRequest):
return req.method
@app.route(route="return_bytes")
def return_bytes(req: func.HttpRequest):
# This function will fail, as we don't auto-convert "bytes" to "http".
return b'Hello World!'
@app.route(route="return_context")
def return_context(req: func.HttpRequest, context: func.Context):
return json.dumps({
'method': req.method,
'ctx_func_name': context.function_name,
'ctx_func_dir': context.function_directory,
'ctx_invocation_id': context.invocation_id,
'ctx_trace_context_Traceparent': context.trace_context.Traceparent,
'ctx_trace_context_Tracestate': context.trace_context.Tracestate,
})
@app.route(route="return_http")
def return_http(req: func.HttpRequest):
return func.HttpResponse('<h1>Hello World™</h1>',
mimetype='text/html')
@app.route(route="return_http_404")
def return_http_404(req: func.HttpRequest):
return func.HttpResponse('bye', status_code=404)
@app.route(route="return_http_auth_admin", auth_level=func.AuthLevel.ADMIN)
def return_http_auth_admin(req: func.HttpRequest):
return func.HttpResponse('<h1>Hello World™</h1>',
mimetype='text/html')
@app.route(route="return_http_no_body")
def return_http_no_body(req: func.HttpRequest):
return func.HttpResponse()
@app.route(route="return_http_redirect")
def return_http_redirect(req: func.HttpRequest):
location = 'return_http?code={}'.format(req.params['code'])
return func.HttpResponse(
status_code=302,
headers={'location': location})
@app.route(route="return_out", binding_arg_name="foo")
def return_out(req: func.HttpRequest, foo: func.Out[func.HttpResponse]):
foo.set(func.HttpResponse(body='hello', status_code=201))
@app.route(route="return_request")
def return_request(req: func.HttpRequest):
params = dict(req.params)
params.pop('code', None)
body = req.get_body()
return json.dumps({
'method': req.method,
'url': req.url,
'headers': dict(req.headers),
'params': params,
'get_body': body.decode(),
'body_hash': hashlib.sha256(body).hexdigest(),
})
@app.route(route="return_route_params/{param1}/{param2}")
def return_route_params(req: func.HttpRequest) -> str:
return json.dumps(dict(req.route_params))
@app.route(route="sync_logging")
def main(req: func.HttpRequest):
try:
1 / 0
except ZeroDivisionError:
logger.error('a gracefully handled error', exc_info=True)
logger.error('a gracefully handled critical error', exc_info=True)
time.sleep(0.05)
return 'OK-sync'
@app.route(route="unhandled_error")
def unhandled_error(req: func.HttpRequest):
1 / 0
@app.route(route="unhandled_urllib_error")
def unhandled_urllib_error(req: func.HttpRequest) -> str:
image_url = req.params.get('img')
urlopen(image_url).read()
class UnserializableException(Exception):
def __str__(self):
raise RuntimeError('cannot serialize me')
@app.route(route="unhandled_unserializable_error")
def unhandled_unserializable_error(req: func.HttpRequest) -> str:
raise UnserializableException('foo')
async def try_log():
logger.info("try_log")
@app.route(route="user_event_loop")
def user_event_loop(req: func.HttpRequest) -> func.HttpResponse:
loop = asyncio.SelectorEventLoop()
asyncio.set_event_loop(loop)
# This line should throws an asyncio RuntimeError exception
loop.run_until_complete(try_log())
loop.close()
return 'OK-user-event-loop'

Просмотреть файл

@ -1,21 +0,0 @@
{
"scriptFile": "main.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
],
"retry": {
"strategy": "exponentialBackoff",
"maxRetryCount": 3,
"minimumInterval": "00:00:01",
"maximumInterval": "00:00:05"
}
}

Просмотреть файл

@ -1,13 +0,0 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import azure.functions
import logging
logger = logging.getLogger('my function')
def main(req: azure.functions.HttpRequest, context: azure.functions.Context):
logger.info(f'Current retry count: {context.retry_context.retry_count}')
logger.info(f'Max retry count: {context.retry_context.max_retry_count}')
raise Exception("Testing retries")

Просмотреть файл

@ -1,20 +0,0 @@
{
"scriptFile": "main.py",
"bindings": [
{
"type": "httpTrigger",
"direction": "in",
"name": "req"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
],
"retry": {
"strategy": "fixedDelay",
"maxRetryCount": 3,
"delayInterval": "00:00:01"
}
}

Просмотреть файл

@ -1,13 +0,0 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import azure.functions
import logging
logger = logging.getLogger('my function')
def main(req: azure.functions.HttpRequest, context: azure.functions.Context):
logger.info(f'Current retry count: {context.retry_context.retry_count}')
logger.info(f'Max retry count: {context.retry_context.max_retry_count}')
raise Exception("Testing retries")

Просмотреть файл

@ -5,8 +5,7 @@ import sys
import shutil
import asyncio
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
async def verify_path_imports():

Просмотреть файл

@ -1,7 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
class TestMockHost(testutils.AsyncTestCase):
@ -10,7 +9,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__missing_py_param(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('missing_py_param')
self.assertEqual(r.response.function_id, func_id)
@ -26,7 +24,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__missing_json_param(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('missing_json_param')
self.assertEqual(r.response.function_id, func_id)
@ -42,7 +39,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__wrong_param_dir(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('wrong_param_dir')
self.assertEqual(r.response.function_id, func_id)
@ -57,7 +53,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__bad_out_annotation(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('bad_out_annotation')
self.assertEqual(r.response.function_id, func_id)
@ -72,7 +67,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__wrong_binding_dir(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('wrong_binding_dir')
self.assertEqual(r.response.function_id, func_id)
@ -88,7 +82,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__invalid_context_param(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('invalid_context_param')
self.assertEqual(r.response.function_id, func_id)
@ -103,7 +96,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__syntax_error(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('syntax_error')
self.assertEqual(r.response.function_id, func_id)
@ -115,7 +107,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__module_not_found_error(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('module_not_found_error')
self.assertEqual(r.response.function_id, func_id)
@ -128,7 +119,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__import_error(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('import_error')
self.assertEqual(r.response.function_id, func_id)
@ -145,7 +135,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__inout_param(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('inout_param')
self.assertEqual(r.response.function_id, func_id)
@ -160,7 +149,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__return_param_in(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('return_param_in')
self.assertEqual(r.response.function_id, func_id)
@ -175,7 +163,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__invalid_return_anno(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('invalid_return_anno')
self.assertEqual(r.response.function_id, func_id)
@ -191,7 +178,6 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__invalid_return_anno_non_type(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function(
'invalid_return_anno_non_type')
@ -207,55 +193,54 @@ class TestMockHost(testutils.AsyncTestCase):
async def test_load_broken__invalid_http_trigger_anno(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('invalid_http_trigger_anno')
self.assertEqual(r.response.function_id, func_id)
self.assertEqual(r.response.result.status,
protos.StatusResult.Failure)
self.assertRegex(
self.assertEqual(
r.response.result.exception.message,
r'.*cannot load the invalid_http_trigger_anno function'
r'.*type of req binding .* "httpTrigger" '
r'does not match its Python annotation "int"')
'FunctionLoadError: cannot load the invalid_http_trigger_anno '
'function: \'req\' binding type "httpTrigger" and dataType "0"'
' in function.json do not match the corresponding function'
' parameter\'s Python type annotation "int"')
async def test_load_broken__invalid_out_anno(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('invalid_out_anno')
self.assertEqual(r.response.function_id, func_id)
self.assertEqual(r.response.result.status,
protos.StatusResult.Failure)
self.assertRegex(
self.assertEqual(
r.response.result.exception.message,
r'.*cannot load the invalid_out_anno function'
r'.*type of ret binding .* "http" '
r'does not match its Python annotation "HttpRequest"')
'FunctionLoadError: cannot load the invalid_out_anno function: '
'\'ret\' binding type "http" and dataType "0" in function.json'
' do not match the corresponding function parameter\'s Python'
' type annotation "HttpRequest"')
async def test_load_broken__invalid_in_anno(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('invalid_in_anno')
self.assertEqual(r.response.function_id, func_id)
self.assertEqual(r.response.result.status,
protos.StatusResult.Failure)
self.assertRegex(
self.assertEqual(
r.response.result.exception.message,
r'.*cannot load the invalid_in_anno function'
r'.*type of req binding .* "httpTrigger" '
r'does not match its Python annotation "HttpResponse"')
'FunctionLoadError: cannot load the invalid_in_anno function:'
' \'req\' binding type "httpTrigger" and dataType "0" in '
'function.json do not match the corresponding function '
'parameter\'s Python type annotation "HttpResponse"')
async def test_load_broken__invalid_in_anno_non_type(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('invalid_in_anno_non_type')
self.assertEqual(r.response.function_id, func_id)
@ -266,3 +251,15 @@ class TestMockHost(testutils.AsyncTestCase):
r.response.result.exception.message,
r'.*cannot load the invalid_in_anno_non_type function: '
r'binding req has invalid non-type annotation 123')
async def test_import_module_troubleshooting_url(self):
async with testutils.start_mockhost(
script_root=self.broken_funcs_dir) as host:
func_id, r = await host.load_function('missing_module')
self.assertEqual(r.response.result.status,
protos.StatusResult.Failure)
self.assertRegex(
r.response.result.exception.message,
r'.*ModuleNotFoundError')

Просмотреть файл

@ -7,8 +7,7 @@ import unittest
from typing import Optional, Tuple
from unittest.mock import patch
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
from azure_functions_worker.constants import PYTHON_THREADPOOL_THREAD_COUNT, \
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT, \
PYTHON_THREADPOOL_THREAD_COUNT_MAX_37, PYTHON_THREADPOOL_THREAD_COUNT_MIN
@ -16,6 +15,12 @@ from azure_functions_worker.constants import PYTHON_THREADPOOL_THREAD_COUNT, \
SysVersionInfo = col.namedtuple("VersionInfo", ["major", "minor", "micro",
"releaselevel", "serial"])
DISPATCHER_FUNCTIONS_DIR = testutils.UNIT_TESTS_FOLDER / 'dispatcher_functions'
DISPATCHER_STEIN_FUNCTIONS_DIR = testutils.UNIT_TESTS_FOLDER / \
'dispatcher_functions' / \
'dispatcher_functions_stein'
DISPATCHER_STEIN_INVALID_FUNCTIONS_DIR = testutils.UNIT_TESTS_FOLDER / \
'broken_functions' / \
'invalid_stein'
class TestThreadPoolSettingsPython37(testutils.AsyncTestCase):
@ -31,6 +36,7 @@ class TestThreadPoolSettingsPython37(testutils.AsyncTestCase):
Ref:
NEW_TYPING = sys.version_info[:3] >= (3, 7, 0) # PEP 560
"""
def setUp(self):
self._ctrl = testutils.start_mockhost(
script_root=DISPATCHER_FUNCTIONS_DIR)
@ -121,7 +127,7 @@ class TestThreadPoolSettingsPython37(testutils.AsyncTestCase):
"""
# Configure thread pool max worker
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT:
f'{self._allowed_max_workers}'})
f'{self._allowed_max_workers}'})
async with self._ctrl as host:
await self._check_if_function_is_ok(host)
await self._assert_workers_threadpool(self._ctrl, host,
@ -171,7 +177,7 @@ class TestThreadPoolSettingsPython37(testutils.AsyncTestCase):
with patch('azure_functions_worker.dispatcher.logger'):
# Configure thread pool max worker to an invalid value
os.environ.update({PYTHON_THREADPOOL_THREAD_COUNT:
f'{self._over_max_workers}'})
f'{self._over_max_workers}'})
async with self._ctrl as host:
await self._check_if_function_is_ok(host)
@ -516,3 +522,59 @@ class TestThreadPoolSettingsPython310(TestThreadPoolSettingsPython39):
os.environ.update(self._pre_env)
self.mock_os_cpu.stop()
self.mock_version_info.stop()
class TestDispatcherStein(testutils.AsyncTestCase):
def setUp(self):
self._ctrl = testutils.start_mockhost(
script_root=DISPATCHER_STEIN_FUNCTIONS_DIR)
self._pre_env = dict(os.environ)
self.mock_version_info = patch(
'azure_functions_worker.dispatcher.sys.version_info',
SysVersionInfo(3, 9, 0, 'final', 0))
self.mock_version_info.start()
def tearDown(self):
os.environ.clear()
os.environ.update(self._pre_env)
self.mock_version_info.stop()
async def test_dispatcher_functions_metadata_request(self):
"""Test if the functions metadata response will be sent correctly
when a functions metadata request is received
"""
async with self._ctrl as host:
r = await host.get_functions_metadata()
self.assertIsInstance(r.response, protos.FunctionMetadataResponse)
self.assertFalse(r.response.use_default_metadata_indexing)
self.assertEqual(r.response.result.status,
protos.StatusResult.Success)
class TestDispatcherSteinLegacyFallback(testutils.AsyncTestCase):
def setUp(self):
self._ctrl = testutils.start_mockhost(
script_root=DISPATCHER_FUNCTIONS_DIR)
self._pre_env = dict(os.environ)
self.mock_version_info = patch(
'azure_functions_worker.dispatcher.sys.version_info',
SysVersionInfo(3, 9, 0, 'final', 0))
self.mock_version_info.start()
def tearDown(self):
os.environ.clear()
os.environ.update(self._pre_env)
self.mock_version_info.stop()
async def test_dispatcher_functions_metadata_request_legacy_fallback(self):
"""Test if the functions metadata response will be sent correctly
when a functions metadata request is received
"""
async with self._ctrl as host:
r = await host.get_functions_metadata()
self.assertIsInstance(r.response, protos.FunctionMetadataResponse)
self.assertTrue(r.response.use_default_metadata_indexing)
self.assertEqual(r.response.result.status,
protos.StatusResult.Success)

Просмотреть файл

@ -1,16 +1,18 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import hashlib
import pathlib
import filecmp
import typing
import hashlib
import os
import pathlib
import typing
import pytest
from azure_functions_worker import testutils
from azure_functions_worker.testutils import WebHostTestCase
class TestHttpFunctions(testutils.WebHostTestCase):
class TestHttpFunctions(WebHostTestCase):
@classmethod
def get_script_dir(cls):
@ -137,7 +139,6 @@ class TestHttpFunctions(testutils.WebHostTestCase):
self.assertEqual(data['method'], 'GET')
self.assertEqual(data['ctx_func_name'], 'return_context')
self.assertIn('return_context', data['ctx_func_dir'])
self.assertIn('ctx_invocation_id', data)
self.assertIn('ctx_trace_context_Tracestate', data)
self.assertIn('ctx_trace_context_Traceparent', data)
@ -313,10 +314,6 @@ class TestHttpFunctions(testutils.WebHostTestCase):
def check_log_user_event_loop_error(self, host_out: typing.List[str]):
self.assertIn('try_log', host_out)
def test_import_module_troubleshooting_url(self):
r = self.webhost.request('GET', 'missing_module/')
self.assertEqual(r.status_code, 500)
def check_log_import_module_troubleshooting_url(self,
host_out: typing.List[str]):
self.assertIn("Exception: ModuleNotFoundError: "
@ -359,11 +356,11 @@ class TestHttpFunctions(testutils.WebHostTestCase):
def test_print_to_console_stderr(self):
r = self.webhost.request('GET', 'print_logging?console=true'
'&message=Secret42&is_stderr=true')
'&message=Secret42&is_stderr=true')
self.assertEqual(r.status_code, 200)
self.assertEqual(r.text, 'OK-print-logging')
def check_log_print_to_console_stderr(self, host_out: typing.List[str],):
def check_log_print_to_console_stderr(self, host_out: typing.List[str], ):
# System logs stderr should not exist in host_out
self.assertNotIn('Secret42', host_out)
@ -386,25 +383,18 @@ class TestHttpFunctions(testutils.WebHostTestCase):
# System logs should not exist in host_out
self.assertNotIn('parallelly_log_system at disguised_logger', host_out)
def test_retry_context_fixed_delay(self):
r = self.webhost.request('GET', 'http_retries_fixed_delay')
class TestHttpFunctionsStein(TestHttpFunctions):
@classmethod
def get_script_dir(cls):
return testutils.UNIT_TESTS_FOLDER / 'http_functions' / \
'http_functions_stein'
def test_no_return(self):
r = self.webhost.request('GET', 'no_return')
self.assertEqual(r.status_code, 500)
def check_log_retry_context_fixed_delay(self, host_out: typing.List[str]):
self.assertIn('Current retry count: 1', host_out)
self.assertIn('Current retry count: 2', host_out)
self.assertIn('Current retry count: 3', host_out)
self.assertNotIn('Current retry count: 4', host_out)
self.assertIn('Max retry count: 3', host_out)
def test_retry_context_exponential_backoff(self):
r = self.webhost.request('GET', 'http_retries_exponential_backoff')
self.assertEqual(r.status_code, 500)
def check_log_retry_context_exponential_backoff(self,
host_out: typing.List[str]):
self.assertIn('Current retry count: 1', host_out)
self.assertIn('Current retry count: 2', host_out)
self.assertIn('Current retry count: 3', host_out)
self.assertNotIn('Current retry count: 4', host_out)
self.assertIn('Max retry count: 3', host_out)
def test_no_return_returns(self):
r = self.webhost.request('GET', 'no_return_returns')
self.assertEqual(r.status_code, 200)

Просмотреть файл

@ -11,8 +11,7 @@ from azure_functions_worker.bindings.shared_memory_data_transfer \
import SharedMemoryMap
from azure_functions_worker.bindings.shared_memory_data_transfer \
import SharedMemoryConstants as consts
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
@skipIf(sys.platform == 'darwin', 'MacOS M1 machines do not correctly test the'

Просмотреть файл

@ -1,7 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
class TestDurableFunctions(testutils.AsyncTestCase):

Просмотреть файл

@ -2,8 +2,7 @@
# Licensed under the MIT License.
import json
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
class TestEventHubMockFunctions(testutils.AsyncTestCase):

Просмотреть файл

@ -1,7 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
class TestGenericFunctions(testutils.AsyncTestCase):

Просмотреть файл

@ -1,7 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
class TestMockHost(testutils.AsyncTestCase):

Просмотреть файл

@ -2,7 +2,7 @@
# Licensed under the MIT License.
from unittest.mock import patch, call
from azure_functions_worker import testutils, protos
from azure_functions_worker import protos, testutils
from azure_functions_worker.logging import is_system_log_category

Просмотреть файл

@ -2,8 +2,7 @@
# Licensed under the MIT License.
import json
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
class TestTimerFunctions(testutils.AsyncTestCase):

Просмотреть файл

@ -7,8 +7,7 @@ import tempfile
import typing
import unittest
from azure_functions_worker import protos
from azure_functions_worker import testutils
from azure_functions_worker import protos, testutils
from azure_functions_worker.utils.common import is_python_version