azure-functions-python-worker/azure_functions_worker/bindings/datumdef.py

293 строки
10 KiB
Python

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import logging
from typing import Any, Optional
import json
from .. import protos
from ..logging import logger
from typing import List
try:
from http.cookies import SimpleCookie
except ImportError:
from Cookie import SimpleCookie
from dateutil import parser
from dateutil.parser import ParserError
from .nullable_converters import to_nullable_bool, to_nullable_string, \
to_nullable_double, to_nullable_timestamp
class Datum:
def __init__(self, value, type):
self.value = value
self.type = type
@property
def python_value(self) -> Any:
if self.value is None or self.type is None:
return None
elif self.type in ('bytes', 'string', 'int', 'double'):
return self.value
elif self.type == 'json':
return json.loads(self.value)
elif self.type == 'collection_string':
return [v for v in self.value.string]
elif self.type == 'collection_bytes':
return [v for v in self.value.bytes]
elif self.type == 'collection_double':
return [v for v in self.value.double]
elif self.type == 'collection_sint64':
return [v for v in self.value.sint64]
else:
return self.value
@property
def python_type(self) -> type:
return type(self.python_value)
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.value == other.value and self.type == other.type
def __hash__(self):
return hash((type(self), (self.value, self.type)))
def __repr__(self):
val_repr = repr(self.value)
if len(val_repr) > 10:
val_repr = val_repr[:10] + '...'
return '<Datum {} {}>'.format(self.type, val_repr)
@classmethod
def from_typed_data(cls, td: protos.TypedData):
tt = td.WhichOneof('data')
if tt == 'http':
http = td.http
val = dict(
method=Datum(http.method, 'string'),
url=Datum(http.url, 'string'),
headers={
k: Datum(v, 'string') for k, v in http.headers.items()
},
body=(
Datum.from_typed_data(http.body)
or Datum(type='bytes', value=b'')
),
params={
k: Datum(v, 'string') for k, v in http.params.items()
},
query={
k: Datum(v, 'string') for k, v in http.query.items()
},
)
elif tt == 'string':
val = td.string
elif tt == 'bytes':
val = td.bytes
elif tt == 'json':
val = td.json
elif tt == 'collection_bytes':
val = td.collection_bytes
elif tt == 'collection_string':
val = td.collection_string
elif tt == 'collection_sint64':
val = td.collection_sint64
elif tt == 'model_binding_data':
val = td.model_binding_data
elif tt is None:
return None
else:
raise NotImplementedError(
'unsupported TypeData kind: {!r}'.format(tt)
)
return cls(val, tt)
@classmethod
def from_rpc_shared_memory(
cls,
shmem: protos.RpcSharedMemory,
shmem_mgr) -> Optional['Datum']:
"""
Reads the specified shared memory region and converts the read data
into a datum object of the corresponding type.
"""
if shmem is None:
logger.warning('Cannot read from shared memory. '
'RpcSharedMemory is None.')
return None
mem_map_name = shmem.name
offset = shmem.offset
count = shmem.count
data_type = shmem.type
ret_val = None
if data_type == protos.RpcDataType.bytes:
val = shmem_mgr.get_bytes(mem_map_name, offset, count)
if val is not None:
ret_val = cls(val, 'bytes')
elif data_type == protos.RpcDataType.string:
val = shmem_mgr.get_string(mem_map_name, offset, count)
if val is not None:
ret_val = cls(val, 'string')
if ret_val is not None:
logger.info(
'Read %s bytes from memory map %s for data type %s', count,
mem_map_name, data_type)
return ret_val
return None
@classmethod
def to_rpc_shared_memory(
cls,
datum: 'Datum',
shmem_mgr) -> Optional[protos.RpcSharedMemory]:
"""
Writes the given value to shared memory and returns the corresponding
RpcSharedMemory object which can be sent back to the functions host over
RPC.
"""
if datum.type == 'bytes':
value = datum.value
shared_mem_meta = shmem_mgr.put_bytes(value)
data_type = protos.RpcDataType.bytes
elif datum.type == 'string':
value = datum.value
shared_mem_meta = shmem_mgr.put_string(value)
data_type = protos.RpcDataType.string
else:
raise NotImplementedError(
f'Unsupported datum type ({datum.type}) for shared memory'
)
if shared_mem_meta is None:
logger.warning('Cannot write to shared memory for type: %s',
datum.type)
return None
shmem = protos.RpcSharedMemory(
name=shared_mem_meta.mem_map_name,
offset=0,
count=shared_mem_meta.count_bytes,
type=data_type)
logger.info(
'Wrote %s bytes to memory map %s for data type %s',
shared_mem_meta.count_bytes, shared_mem_meta.mem_map_name,
data_type)
return shmem
def datum_as_proto(datum: Datum) -> protos.TypedData:
if datum.type == 'string':
return protos.TypedData(string=datum.value)
elif datum.type == 'bytes':
return protos.TypedData(bytes=datum.value)
elif datum.type == 'json':
return protos.TypedData(json=datum.value)
elif datum.type == 'http':
return protos.TypedData(http=protos.RpcHttp(
status_code=datum.value['status_code'].value,
headers={
k: v.value
for k, v in datum.value['headers'].items()
},
cookies=parse_to_rpc_http_cookie_list(datum.value.get('cookies')),
enable_content_negotiation=False,
body=datum_as_proto(datum.value['body']),
))
elif datum.type is None:
return None
elif datum.type == 'dict':
# TypedData doesn't support dict, so we return it as json
return protos.TypedData(json=json.dumps(datum.value))
elif datum.type == 'list':
# TypedData doesn't support list, so we return it as json
return protos.TypedData(json=json.dumps(datum.value))
elif datum.type == 'int':
return protos.TypedData(int=datum.value)
elif datum.type == 'double':
return protos.TypedData(double=datum.value)
elif datum.type == 'bool':
# TypedData doesn't support bool, so we return it as an int
return protos.TypedData(int=int(datum.value))
else:
raise NotImplementedError(
'unexpected Datum type: {!r}'.format(datum.type)
)
def parse_to_rpc_http_cookie_list(cookies: Optional[List[SimpleCookie]]):
if cookies is None:
return cookies
rpc_http_cookies = []
for cookie in cookies:
for name, cookie_entity in cookie.items():
rpc_http_cookies.append(
protos.RpcHttpCookie(name=name,
value=cookie_entity.value,
domain=to_nullable_string(
cookie_entity['domain'],
'cookie.domain'),
path=to_nullable_string(
cookie_entity['path'], 'cookie.path'),
expires=to_nullable_timestamp(
parse_cookie_attr_expires(
cookie_entity), 'cookie.expires'),
secure=to_nullable_bool(
bool(cookie_entity['secure']),
'cookie.secure'),
http_only=to_nullable_bool(
bool(cookie_entity['httponly']),
'cookie.httpOnly'),
same_site=parse_cookie_attr_same_site(
cookie_entity),
max_age=to_nullable_double(
cookie_entity['max-age'],
'cookie.maxAge')))
return rpc_http_cookies
def parse_cookie_attr_expires(cookie_entity):
expires = cookie_entity['expires']
if expires is not None and len(expires) != 0:
try:
return parser.parse(expires)
except ParserError:
logging.error(
f"Can not parse value {expires} of expires in the cookie "
f"due to invalid format.")
raise
except OverflowError:
logging.error(
f"Can not parse value {expires} of expires in the cookie "
f"because the parsed date exceeds the largest valid C "
f"integer on your system.")
raise
return None
def parse_cookie_attr_same_site(cookie_entity):
same_site = getattr(protos.RpcHttpCookie.SameSite, "None")
try:
raw_same_site_str = cookie_entity['samesite'].lower()
if raw_same_site_str == 'lax':
same_site = protos.RpcHttpCookie.SameSite.Lax
elif raw_same_site_str == 'strict':
same_site = protos.RpcHttpCookie.SameSite.Strict
elif raw_same_site_str == 'none':
same_site = protos.RpcHttpCookie.SameSite.ExplicitNone
except Exception:
return same_site
return same_site