зеркало из https://github.com/Azure/pykusto.git
Fix race condition when executing a query before fetch is done (#63)
This commit is contained in:
Родитель
a769c49785
Коммит
1c6163c50e
|
@ -332,6 +332,7 @@ ASALocalRun/
|
|||
/.idea/workspace.xml
|
||||
/.idea/checkstyle-idea.xml
|
||||
/.idea/misc.xml
|
||||
/.idea/shelf/
|
||||
|
||||
# Python venv and installation files
|
||||
/venv
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from collections import defaultdict
|
||||
from fnmatch import fnmatch
|
||||
from threading import Lock
|
||||
from typing import Union, List, Tuple, Dict, Generator, Optional, Set
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
@ -50,6 +51,8 @@ class PyKustoClient(ItemFetcher):
|
|||
"""
|
||||
__client: KustoClient
|
||||
__cluster_name: str
|
||||
__first_execution: bool
|
||||
__first_execution_lock: Lock
|
||||
|
||||
def __init__(self, client_or_cluster: Union[str, KustoClient], fetch_by_default: bool = True) -> None:
|
||||
"""
|
||||
|
@ -60,6 +63,8 @@ class PyKustoClient(ItemFetcher):
|
|||
a KustoClient is generated with AAD device authentication
|
||||
"""
|
||||
super().__init__(None, fetch_by_default)
|
||||
self.__first_execution = True
|
||||
self.__first_execution_lock = Lock()
|
||||
if isinstance(client_or_cluster, KustoClient):
|
||||
self.__client = client_or_cluster
|
||||
# noinspection PyProtectedMember
|
||||
|
@ -84,6 +89,15 @@ class PyKustoClient(ItemFetcher):
|
|||
return self[name]
|
||||
|
||||
def execute(self, database: str, query: KQL, properties: ClientRequestProperties = None) -> KustoResponse:
|
||||
# The first execution usually triggers an authentication flow. We block all subsequent executions to prevent redundant authentications.
|
||||
# Remove the below block once this is resolved: https://github.com/Azure/azure-kusto-python/issues/208
|
||||
with self.__first_execution_lock:
|
||||
if self.__first_execution:
|
||||
self.__first_execution = False
|
||||
return self.__internal_execute(database, query, properties)
|
||||
return self.__internal_execute(database, query, properties)
|
||||
|
||||
def __internal_execute(self, database: str, query: KQL, properties: ClientRequestProperties = None) -> KustoResponse:
|
||||
return KustoResponse(self.__client.execute(database, query, properties))
|
||||
|
||||
def get_databases_names(self) -> Generator[str, None, None]:
|
||||
|
|
|
@ -5,7 +5,10 @@ from typing import Callable, Tuple, Any, List
|
|||
from unittest import TestCase
|
||||
from urllib.parse import urljoin
|
||||
|
||||
# noinspection PyProtectedMember
|
||||
from azure.kusto.data._models import KustoResultTable, KustoResultRow
|
||||
# noinspection PyProtectedMember
|
||||
from azure.kusto.data._response import KustoResponseDataSet
|
||||
from azure.kusto.data.request import KustoClient, ClientRequestProperties
|
||||
|
||||
from pykusto.client import Table
|
||||
|
@ -61,27 +64,28 @@ class MockKustoResultTable(KustoResultTable):
|
|||
self.columns = tuple(type('Column', (object,), {'column_name': col, 'column_type': ''}) for col in columns)
|
||||
|
||||
|
||||
def mock_response(rows: Tuple[Any, ...], columns: Tuple[str, ...] = tuple()):
|
||||
# noinspection PyTypeChecker
|
||||
def mock_response(rows: Tuple[Any, ...], columns: Tuple[str, ...] = tuple()) -> KustoResponseDataSet:
|
||||
return type(
|
||||
'KustoResponseDataSet',
|
||||
(object,),
|
||||
'MockKustoResponseDataSet',
|
||||
(KustoResponseDataSet,),
|
||||
{'primary_results': (MockKustoResultTable(rows, columns),)}
|
||||
)
|
||||
|
||||
|
||||
def mock_columns_response(columns: List[Tuple[str, KustoType]] = tuple()) -> Callable:
|
||||
return lambda: mock_response(tuple((c_name, c_type.internal_name) for c_name, c_type in columns), ('ColumnName', 'ColumnType'))
|
||||
def mock_columns_response(columns: List[Tuple[str, KustoType]] = tuple()) -> KustoResponseDataSet:
|
||||
return mock_response(tuple((c_name, c_type.internal_name) for c_name, c_type in columns), ('ColumnName', 'ColumnType'))
|
||||
|
||||
|
||||
def mock_tables_response(tables: List[Tuple[str, List[Tuple[str, KustoType]]]] = tuple()) -> Callable:
|
||||
return lambda: mock_response(
|
||||
def mock_tables_response(tables: List[Tuple[str, List[Tuple[str, KustoType]]]] = tuple()) -> KustoResponseDataSet:
|
||||
return mock_response(
|
||||
tuple((t_name, c_name, c_type.dot_net_name) for t_name, columns in tables for c_name, c_type in columns),
|
||||
('TableName', 'ColumnName', 'ColumnType')
|
||||
)
|
||||
|
||||
|
||||
def mock_databases_response(databases: List[Tuple[str, List[Tuple[str, List[Tuple[str, KustoType]]]]]] = tuple()) -> Callable:
|
||||
return lambda: mock_response(
|
||||
def mock_databases_response(databases: List[Tuple[str, List[Tuple[str, List[Tuple[str, KustoType]]]]]] = tuple()) -> KustoResponseDataSet:
|
||||
return mock_response(
|
||||
tuple(
|
||||
(d_name, t_name, c_name, c_type.dot_net_name)
|
||||
for d_name, tables in databases
|
||||
|
@ -92,8 +96,8 @@ def mock_databases_response(databases: List[Tuple[str, List[Tuple[str, List[Tupl
|
|||
)
|
||||
|
||||
|
||||
def mock_getschema_response(columns: List[Tuple[str, KustoType]] = tuple()) -> Callable:
|
||||
return lambda: mock_response(tuple((c_name, c_type.dot_net_name) for c_name, c_type in columns), ('ColumnName', 'DataType'))
|
||||
def mock_getschema_response(columns: List[Tuple[str, KustoType]] = tuple()) -> KustoResponseDataSet:
|
||||
return mock_response(tuple((c_name, c_type.dot_net_name) for c_name, c_type in columns), ('ColumnName', 'DataType'))
|
||||
|
||||
|
||||
class RecordedQuery:
|
||||
|
@ -118,21 +122,23 @@ class RecordedQuery:
|
|||
# noinspection PyMissingConstructor
|
||||
class MockKustoClient(KustoClient):
|
||||
recorded_queries: List[RecordedQuery]
|
||||
columns_response: Callable
|
||||
tables_response: Callable
|
||||
databases_response: Callable
|
||||
getschema_response: Callable
|
||||
main_response: Callable
|
||||
columns_response: KustoResponseDataSet
|
||||
tables_response: KustoResponseDataSet
|
||||
databases_response: KustoResponseDataSet
|
||||
getschema_response: KustoResponseDataSet
|
||||
main_response: KustoResponseDataSet
|
||||
upon_execute: Callable
|
||||
record_metadata: bool
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cluster="https://test_cluster.kusto.windows.net",
|
||||
columns_response: Callable = mock_columns_response([]),
|
||||
tables_response: Callable = mock_tables_response([]),
|
||||
databases_response: Callable = mock_databases_response([]),
|
||||
getschema_response: Callable = mock_getschema_response([]),
|
||||
main_response: Callable = mock_response(tuple()),
|
||||
columns_response: KustoResponseDataSet = mock_columns_response([]),
|
||||
tables_response: KustoResponseDataSet = mock_tables_response([]),
|
||||
databases_response: KustoResponseDataSet = mock_databases_response([]),
|
||||
getschema_response: KustoResponseDataSet = mock_getschema_response([]),
|
||||
main_response: KustoResponseDataSet = mock_response(tuple()),
|
||||
upon_execute: Callable = None,
|
||||
record_metadata: bool = False
|
||||
):
|
||||
self.recorded_queries = []
|
||||
|
@ -142,21 +148,25 @@ class MockKustoClient(KustoClient):
|
|||
self.databases_response = databases_response
|
||||
self.getschema_response = getschema_response
|
||||
self.main_response = main_response
|
||||
self.upon_execute = upon_execute
|
||||
self.record_metadata = record_metadata
|
||||
|
||||
def execute(self, database: str, rendered_query: str, properties: ClientRequestProperties = None):
|
||||
def execute(self, database: str, rendered_query: str, properties: ClientRequestProperties = None) -> KustoResponseDataSet:
|
||||
recorded_query = RecordedQuery(database, rendered_query, properties)
|
||||
if self.upon_execute is not None:
|
||||
self.upon_execute(recorded_query)
|
||||
metadata_query = True
|
||||
if rendered_query == '.show database schema | project TableName, ColumnName, ColumnType | limit 10000':
|
||||
response = self.tables_response()
|
||||
response = self.tables_response
|
||||
elif rendered_query.startswith('.show table '):
|
||||
response = self.columns_response()
|
||||
response = self.columns_response
|
||||
elif rendered_query.startswith('.show databases schema '):
|
||||
response = self.databases_response()
|
||||
response = self.databases_response
|
||||
elif rendered_query.endswith(' | getschema | project ColumnName, DataType | limit 10000'):
|
||||
response = self.getschema_response()
|
||||
response = self.getschema_response
|
||||
else:
|
||||
metadata_query = False
|
||||
response = self.main_response()
|
||||
response = self.main_response
|
||||
if self.record_metadata or not metadata_query:
|
||||
self.recorded_queries.append(RecordedQuery(database, rendered_query, properties))
|
||||
self.recorded_queries.append(recorded_query)
|
||||
return response
|
||||
|
|
|
@ -121,7 +121,6 @@ class TestClient(TestBase):
|
|||
client1.recorded_queries,
|
||||
)
|
||||
|
||||
|
||||
def test_client_for_cluster(self):
|
||||
client = PyKustoClient('https://help.kusto.windows.net', fetch_by_default=False)
|
||||
self.assertIsInstance(client._PyKustoClient__client, KustoClient)
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
from concurrent.futures import Future
|
||||
from threading import Thread
|
||||
|
||||
from pykusto.client import PyKustoClient, Database
|
||||
from pykusto.expressions import StringColumn, NumberColumn, AnyTypeColumn, BooleanColumn
|
||||
from pykusto.query import Query
|
||||
from pykusto.type_utils import KustoType
|
||||
from test.test_base import TestBase, MockKustoClient, mock_columns_response, RecordedQuery, mock_tables_response, mock_getschema_response, mock_databases_response
|
||||
|
||||
|
@ -29,14 +31,69 @@ class TestClientFetch(TestBase):
|
|||
|
||||
def test_column_fetch_slow(self):
|
||||
mock_response_future = Future()
|
||||
mock_response_future.executed = False
|
||||
|
||||
def upon_execute(query):
|
||||
mock_response_future.result()
|
||||
mock_response_future.executed = True
|
||||
|
||||
try:
|
||||
mock_kusto_client = MockKustoClient(columns_response=lambda: mock_response_future.result(), record_metadata=True)
|
||||
table = PyKustoClient(mock_kusto_client)['test_db']['test_table']
|
||||
mock_kusto_client = MockKustoClient(upon_execute=upon_execute, record_metadata=True)
|
||||
table = PyKustoClient(mock_kusto_client, fetch_by_default=False)['test_db']['test_table']
|
||||
table.refresh()
|
||||
self.assertIsInstance(table['foo'], AnyTypeColumn)
|
||||
self.assertIsInstance(table['bar'], AnyTypeColumn)
|
||||
self.assertIsInstance(table['baz'], AnyTypeColumn)
|
||||
# Make sure above lines were called while the fetch query was still waiting
|
||||
assert not mock_response_future.executed
|
||||
finally:
|
||||
mock_response_future.set_result(mock_columns_response([])())
|
||||
# Return the fetch
|
||||
mock_response_future.set_result(None)
|
||||
|
||||
table.wait_for_items()
|
||||
# Make sure the fetch query was indeed called
|
||||
assert mock_response_future.executed
|
||||
|
||||
def test_query_before_fetch_returned(self):
|
||||
mock_response_future = Future()
|
||||
mock_response_future.returned_queries = []
|
||||
mock_response_future.called = False
|
||||
mock_response_future.executed = False
|
||||
|
||||
def upon_execute(query):
|
||||
if not mock_response_future.called:
|
||||
mock_response_future.called = True
|
||||
mock_response_future.result()
|
||||
mock_response_future.executed = True
|
||||
mock_response_future.returned_queries.append(query)
|
||||
|
||||
try:
|
||||
mock_kusto_client = MockKustoClient(upon_execute=upon_execute, record_metadata=True)
|
||||
table = PyKustoClient(mock_kusto_client, fetch_by_default=False)['test_db']['test_table']
|
||||
table.refresh()
|
||||
|
||||
# Executing a query in a separate thread, because it is supposed to block until the fetch returns
|
||||
query_thread = Thread(target=Query(table).take(5).execute)
|
||||
query_thread.start()
|
||||
|
||||
# Make sure above lines were called while the fetch query was still waiting
|
||||
assert not mock_response_future.executed
|
||||
finally:
|
||||
# Return the fetch
|
||||
mock_response_future.set_result(None)
|
||||
|
||||
table.wait_for_items()
|
||||
query_thread.join()
|
||||
# Make sure the fetch query was indeed called
|
||||
assert mock_response_future.executed
|
||||
# Before the fix the order of returned query was reveresed
|
||||
self.assertEqual(
|
||||
[
|
||||
RecordedQuery('test_db', '.show table test_table | project AttributeName, AttributeType | limit 10000'),
|
||||
RecordedQuery('test_db', 'test_table | take 5'),
|
||||
],
|
||||
mock_response_future.returned_queries,
|
||||
)
|
||||
|
||||
def test_table_fetch(self):
|
||||
mock_kusto_client = MockKustoClient(
|
||||
|
@ -186,4 +243,3 @@ class TestClientFetch(TestBase):
|
|||
client = PyKustoClient(MockKustoClient(), fetch_by_default=False)
|
||||
self.assertEqual(frozenset(), set(client.get_databases_names()))
|
||||
self.assertEqual(frozenset(), set(client.get_databases()))
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче