pykusto/test/test_base.py

240 строки
10 KiB
Python

import json
import logging
import sys
from concurrent.futures import Future
from threading import Event
from typing import Callable, Tuple, Any, List, Optional, Union
from unittest import TestCase
if sys.version_info[1] < 9:
# noinspection PyProtectedMember
from unittest.case import _AssertLogsContext
else:
# noinspection PyUnresolvedReferences,PyProtectedMember,PyCompatibility
from unittest._log import _AssertLogsContext
from urllib.parse import urljoin
from azure.kusto.data import KustoClient, ClientRequestProperties
# noinspection PyProtectedMember
from azure.kusto.data._models import KustoResultTable, KustoResultRow
from azure.kusto.data.response import KustoResponseDataSet
# noinspection PyProtectedMember
from pykusto._src.client import Table
# noinspection PyProtectedMember
from pykusto._src.expressions import _NumberColumn, _BooleanColumn, _ArrayColumn, _MappingColumn, _StringColumn, _DatetimeColumn, _TimespanColumn, _DynamicColumn
# noinspection PyProtectedMember
from pykusto._src.type_utils import _KustoType
# Naming this variable "test_table" triggers the following bug: https://github.com/pytest-dev/pytest/issues/7378
# noinspection PyTypeChecker
mock_table = Table(
None, "mock_table",
(
_NumberColumn('numField'), _NumberColumn('numField2'), _NumberColumn('numField3'), _NumberColumn('numField4'), _NumberColumn('numField5'), _NumberColumn('numField6'),
_BooleanColumn('boolField'), _ArrayColumn('arrayField'), _ArrayColumn('arrayField2'), _ArrayColumn('arrayField3'), _MappingColumn('mapField'), _StringColumn('stringField'),
_StringColumn('stringField2'), _DatetimeColumn('dateField'), _DatetimeColumn('dateField2'), _DatetimeColumn('dateField3'), _TimespanColumn('timespanField'),
_DynamicColumn('dynamicField')
)
)
class TestBase(TestCase):
@classmethod
def setUpClass(cls) -> None:
logging.basicConfig(
stream=sys.stdout,
level=logging.DEBUG,
format='%(asctime)s %(levelname)5s %(message)s'
)
def setUp(self) -> None:
test_logger.info("Running test: " + self._testMethodName)
def assertRaises(self, expected_exception: BaseException, test_callable: Callable, *args, **kwargs):
"""
This method overrides the one in `unittest.case.TestCase`.
Instead of providing it with an exception type, you provide it with an exception instance that contains
the expected message.
"""
expected_exception_type = type(expected_exception)
expected_exception_message = str(expected_exception)
with super().assertRaises(expected_exception_type) as cm:
test_callable(*args, **kwargs)
self.assertEqual(
expected_exception_message,
str(cm.exception)
)
def assertLogs(self, logger_to_watch=None, level=None) -> 'CustomAssertLogsContext':
"""
This method overrides the one in `unittest.case.TestCase`, and has the same behavior, except for not causing a failure when there are no log messages.
The point is to allow asserting there are no logs.
Get rid of this in Python 3.10, as this was resolved: https://bugs.python.org/issue39385
"""
# noinspection PyArgumentList
return CustomAssertLogsContext(self, logger_to_watch, level)
@staticmethod
def raise_mock_exception():
raise Exception("Mock exception")
# Get rid of this in Python 3.10, as this was resolved: https://bugs.python.org/issue39385
class CustomAssertLogsContext(_AssertLogsContext):
# noinspection PyUnresolvedReferences
def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
# Fool the original exit method to think there is at least one record, to avoid causing a failure
self.watcher.records.append("DUMMY")
result = super().__exit__(exc_type, exc_val, exc_tb)
self.watcher.records.pop()
return result
# noinspection PyMissingConstructor
class MockKustoResultTable(KustoResultTable):
def __init__(self, rows: Tuple[Any, ...], columns: Tuple[str, ...]):
self.kusto_result_rows = tuple(KustoResultRow(columns, row) for row in rows)
self.raw_rows = self.kusto_result_rows
self.columns = tuple(type('Column', (object,), {'column_name': col, 'column_type': ''}) for col in columns)
# noinspection PyTypeChecker
def mock_response(rows: Tuple[Any, ...], columns: Tuple[str, ...] = tuple()) -> Callable[[], KustoResponseDataSet]:
return lambda: type(
'MockKustoResponseDataSet',
(KustoResponseDataSet,),
{'primary_results': (MockKustoResultTable(rows, columns),)}
)
def mock_columns_response(columns: List[Tuple[str, _KustoType]] = tuple()) -> Callable[[], KustoResponseDataSet]:
return mock_response(tuple((c_name, c_type.internal_name) for c_name, c_type in columns), ('ColumnName', 'ColumnType'))
def mock_tables_response(tables: List[Tuple[str, List[Tuple[str, _KustoType]]]] = tuple()) -> Callable[[], KustoResponseDataSet]:
return mock_response(
tuple((t_name, c_name, c_type.dot_net_name) for t_name, columns in tables for c_name, c_type in columns),
('TableName', 'ColumnName', 'ColumnType')
)
def mock_databases_response(databases: List[Tuple[str, List[Tuple[str, List[Tuple[str, _KustoType]]]]]] = tuple()) -> Callable[[], KustoResponseDataSet]:
return mock_response(
tuple(
(d_name, t_name, c_name, c_type.dot_net_name)
for d_name, tables in databases
for t_name, columns in tables
for c_name, c_type in columns
),
('DatabaseName', 'TableName', 'ColumnName', 'ColumnType')
)
def mock_getschema_response(columns: List[Tuple[str, _KustoType]] = tuple()) -> Callable[[], KustoResponseDataSet]:
return mock_response(tuple((c_name, c_type.dot_net_name) for c_name, c_type in columns), ('ColumnName', 'DataType'))
class RecordedQuery:
database: str
query: str
properties: ClientRequestProperties
def __init__(self, database: str, query: str, properties: ClientRequestProperties = None):
self.database = database
self.query = query
self.properties = properties
def __repr__(self) -> str:
return json.dumps({'database': self.database, 'query': self.query, 'properties': self.properties})
def __eq__(self, o: 'RecordedQuery') -> bool:
if not isinstance(o, RecordedQuery):
return False
return self.database == o.database and self.query == o.query and self.properties == o.properties
# noinspection PyMissingConstructor
class MockKustoClient(KustoClient):
recorded_queries: List[RecordedQuery]
columns_response: Callable[[], KustoResponseDataSet]
tables_response: Callable[[], KustoResponseDataSet]
databases_response: Callable[[], KustoResponseDataSet]
getschema_response: Callable[[], KustoResponseDataSet]
main_response: Callable[[], KustoResponseDataSet]
record_metadata: bool
block: bool
query_future: Union[None, Future]
blocked_event: Event
def __init__(
self,
cluster="https://test_cluster.kusto.windows.net",
columns_response: Callable[[], KustoResponseDataSet] = mock_columns_response([('foo', _KustoType.STRING), ('bar', _KustoType.INT)]),
tables_response: Callable[[], KustoResponseDataSet] = mock_tables_response([
('mock_table', [('foo', _KustoType.STRING), ('bar', _KustoType.INT)]),
('mock_table_2', [('baz', _KustoType.BOOL)]),
]),
databases_response: Callable[[], KustoResponseDataSet] = mock_databases_response(
[('test_db', [('mock_table', [('foo', _KustoType.STRING), ('bar', _KustoType.INT)])])]
),
getschema_response: Callable[[], KustoResponseDataSet] = mock_getschema_response([]),
main_response: Callable[[], KustoResponseDataSet] = mock_response(tuple()),
record_metadata: bool = False,
block: bool = False,
):
self.recorded_queries = []
self._query_endpoint = urljoin(cluster, "/v2/rest/query")
self.columns_response = columns_response
self.tables_response = tables_response
self.databases_response = databases_response
self.getschema_response = getschema_response
self.main_response = main_response
self.record_metadata = record_metadata
if block:
self.block = True
self.query_future = Future()
else:
self.block = False
self.query_future = None
self.blocked_event = Event()
def release(self):
assert self.blocked()
if self.query_future is not None:
self.query_future.set_result(None)
def blocked(self):
return self.blocked_event.is_set()
def wait_until_blocked(self):
self.blocked_event.wait()
def do_not_block_next_requests(self):
self.block = False
def execute(self, database: str, rendered_query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
recorded_query = RecordedQuery(database, rendered_query, properties)
if self.block:
self.blocked_event.set()
self.query_future.result()
self.blocked_event.clear()
metadata_query = True
if rendered_query == '.show database schema | project TableName, ColumnName, ColumnType | limit 10000':
response = self.tables_response
elif rendered_query.startswith('.show table '):
response = self.columns_response
elif rendered_query.startswith('.show databases schema '):
response = self.databases_response
elif rendered_query.endswith(' | getschema | project ColumnName, DataType | limit 10000'):
response = self.getschema_response
else:
metadata_query = False
response = self.main_response
if self.record_metadata or not metadata_query:
self.recorded_queries.append(recorded_query)
return response()
test_logger = logging.getLogger("pykusto_test")