зеркало из https://github.com/Azure/pykusto.git
263 строки
12 KiB
Python
263 строки
12 KiB
Python
import logging
|
|
from typing import Type
|
|
from unittest.mock import patch
|
|
|
|
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
|
|
from azure.kusto.data.exceptions import KustoError
|
|
|
|
from pykusto import PyKustoClient, column_generator as col, Query, KustoServiceError, RetryConfig, NO_RETRIES, ClientRequestProperties
|
|
# noinspection PyProtectedMember
|
|
from pykusto._src.logger import _logger
|
|
# noinspection PyProtectedMember
|
|
from test.test_base import TestBase, MockKustoClient, RecordedQuery, mock_response
|
|
|
|
|
|
class TestClient(TestBase):
|
|
def test_single_table(self):
|
|
mock_kusto_client = MockKustoClient()
|
|
table = PyKustoClient(mock_kusto_client)['test_db']['mock_table']
|
|
Query(table).take(5).execute()
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db', 'mock_table | take 5')],
|
|
mock_kusto_client.recorded_queries
|
|
)
|
|
|
|
def test_execute_no_table(self):
|
|
self.assertRaises(
|
|
RuntimeError("No table supplied"),
|
|
Query().take(5).execute
|
|
)
|
|
|
|
def test_execute_already_bound(self):
|
|
mock_kusto_client = MockKustoClient()
|
|
table = PyKustoClient(mock_kusto_client)['test_db']['mock_table']
|
|
|
|
self.assertRaises(
|
|
RuntimeError("This table is already bound to a query"),
|
|
Query(table).take(5).execute,
|
|
table
|
|
)
|
|
|
|
def test_single_table_on_execute(self):
|
|
mock_kusto_client = MockKustoClient()
|
|
table = PyKustoClient(mock_kusto_client)['test_db']['mock_table']
|
|
Query().take(5).execute(table)
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db', 'mock_table | take 5')],
|
|
mock_kusto_client.recorded_queries,
|
|
)
|
|
|
|
def test_get_table(self):
|
|
mock_kusto_client = MockKustoClient()
|
|
table = PyKustoClient(mock_kusto_client)['test_db'].get_table('mock_table')
|
|
Query(table).take(5).execute()
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db', 'mock_table | take 5')],
|
|
mock_kusto_client.recorded_queries,
|
|
)
|
|
|
|
def test_union_table(self):
|
|
mock_kusto_client = MockKustoClient()
|
|
table = PyKustoClient(mock_kusto_client)['test_db'].get_table('test_table1', 'test_table2')
|
|
Query(table).take(5).execute()
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db', 'union test_table1, test_table2 | take 5')],
|
|
mock_kusto_client.recorded_queries,
|
|
)
|
|
|
|
def test_union_table_with_wildcard(self):
|
|
mock_kusto_client = MockKustoClient()
|
|
table = PyKustoClient(mock_kusto_client)['test_db']['test_table_*']
|
|
Query(table).take(5).execute()
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db', 'union test_table_* | take 5')],
|
|
mock_kusto_client.recorded_queries,
|
|
)
|
|
|
|
def test_default_authentication(self):
|
|
mock_kusto_client = MockKustoClient()
|
|
with patch('pykusto._src.client.PyKustoClient._get_client_for_cluster', lambda s: mock_kusto_client):
|
|
table = PyKustoClient('https://help.kusto.windows.net/')['test_db']['mock_table']
|
|
Query().take(5).execute(table)
|
|
self.assertIs(
|
|
mock_kusto_client,
|
|
table._Table__database._Database__client._PyKustoClient__client,
|
|
)
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db', 'mock_table | take 5')],
|
|
mock_kusto_client.recorded_queries,
|
|
)
|
|
|
|
def test_client_instances(self):
|
|
with patch('pykusto._src.client.PyKustoClient._get_client_for_cluster', MockKustoClient):
|
|
client_1 = PyKustoClient('https://help.kusto.windows.net/')
|
|
client_2 = PyKustoClient('https://help.kusto.windows.net/')
|
|
|
|
self.assertIsNot(
|
|
client_1._PyKustoClient__client,
|
|
client_2._PyKustoClient__client,
|
|
)
|
|
|
|
def test_client_instances_cached(self):
|
|
with patch('pykusto._src.client.PyKustoClient._get_client_for_cluster', MockKustoClient):
|
|
client_1 = PyKustoClient('https://help.kusto.windows.net/', use_global_cache=True)
|
|
client_2 = PyKustoClient('https://help.kusto.windows.net/', use_global_cache=True)
|
|
|
|
self.assertIs(
|
|
client_1._PyKustoClient__client,
|
|
client_2._PyKustoClient__client,
|
|
)
|
|
|
|
def test_client_instances_cached_distinct(self):
|
|
with patch('pykusto._src.client.PyKustoClient._get_client_for_cluster', MockKustoClient):
|
|
client_1 = PyKustoClient('https://help1.kusto.windows.net/', use_global_cache=True)
|
|
client_2 = PyKustoClient('https://help2.kusto.windows.net/', use_global_cache=True)
|
|
|
|
self.assertIsNot(
|
|
client_1._PyKustoClient__client,
|
|
client_2._PyKustoClient__client,
|
|
)
|
|
|
|
def test_cross_cluster_join(self):
|
|
client1 = MockKustoClient("https://one.kusto.windows.net")
|
|
client2 = MockKustoClient("https://two.kusto.windows.net")
|
|
table1 = PyKustoClient(client1)['test_db_1']['test_table_1']
|
|
table2 = PyKustoClient(client2)['test_db_2']['test_table_2']
|
|
Query(table1).take(5).join(Query(table2).take(6)).on(col.foo).execute()
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db_1', 'test_table_1 | take 5 | join (cluster("two.kusto.windows.net").database("test_db_2").table("test_table_2") | take 6) on foo')],
|
|
client1.recorded_queries,
|
|
)
|
|
|
|
def test_cross_cluster_join_with_union(self):
|
|
client1 = MockKustoClient("https://one.kusto.windows.net")
|
|
client2 = MockKustoClient("https://two.kusto.windows.net")
|
|
table1 = PyKustoClient(client1)['test_db_1']['test_table_1']
|
|
table2 = PyKustoClient(client2)['test_db_2'].get_table('test_table_2_*')
|
|
Query(table1).take(5).join(Query(table2).take(6)).on(col.foo).execute()
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db_1', 'test_table_1 | take 5 | join (union cluster("two.kusto.windows.net").database("test_db_2").table("test_table_2_*") | take 6) on foo')],
|
|
client1.recorded_queries,
|
|
)
|
|
|
|
def test_cross_cluster_join_with_union_2(self):
|
|
client1 = MockKustoClient("https://one.kusto.windows.net")
|
|
client2 = MockKustoClient("https://two.kusto.windows.net")
|
|
table1 = PyKustoClient(client1)['test_db_1']['test_table_1']
|
|
table2 = PyKustoClient(client2)['test_db_2'].get_table('test_table_2_*', 'test_table_3_*')
|
|
Query(table1).take(5).join(Query(table2).take(6)).on(col.foo).execute()
|
|
self.assertEqual(
|
|
[RecordedQuery(
|
|
'test_db_1',
|
|
'test_table_1 | take 5 | join (union cluster("two.kusto.windows.net").database("test_db_2").table("test_table_2_*"), '
|
|
'cluster("two.kusto.windows.net").database("test_db_2").table("test_table_3_*") | take 6) on foo',
|
|
)],
|
|
client1.recorded_queries,
|
|
)
|
|
|
|
def test_client_for_cluster_with_azure_cli_auth(self):
|
|
with self.assertLogs(_logger, logging.INFO) as cm:
|
|
client = PyKustoClient('https://help.kusto.windows.net', fetch_by_default=False)
|
|
self.assertIsInstance(client._PyKustoClient__client, KustoClient)
|
|
self.assertEqual('https://help.kusto.windows.net', client.get_cluster_name())
|
|
self.assertEqual([], cm.output)
|
|
|
|
def test_client_for_cluster_with_aad_device_auth(self):
|
|
with self.assertLogs(_logger, logging.INFO) as cm:
|
|
client = PyKustoClient('https://help.kusto.windows.net', fetch_by_default=False, auth_method=KustoConnectionStringBuilder.with_aad_device_authentication)
|
|
self.assertIsInstance(client._PyKustoClient__client, KustoClient)
|
|
self.assertEqual('https://help.kusto.windows.net', client.get_cluster_name())
|
|
self.assertEqual([], cm.output)
|
|
|
|
def test_request_properties(self):
|
|
properties = ClientRequestProperties()
|
|
properties.set_option(ClientRequestProperties.results_defer_partial_query_failures_option_name, False)
|
|
properties.set_parameter('xIntValue', 11)
|
|
|
|
mock_kusto_client = MockKustoClient()
|
|
table = PyKustoClient(mock_kusto_client)['test_db']['mock_table']
|
|
Query(table).take(5).execute(properties=properties)
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db', 'mock_table | take 5', properties)],
|
|
mock_kusto_client.recorded_queries
|
|
)
|
|
|
|
@staticmethod
|
|
def unreliable_mock_kusto_client(number_of_failures: int, exception_type: Type[Exception] = KustoServiceError):
|
|
TestClient.attempt = 1
|
|
|
|
def main_response():
|
|
if TestClient.attempt <= number_of_failures:
|
|
TestClient.attempt += 1
|
|
raise exception_type("Mock exception for test", None)
|
|
return mock_response(tuple())()
|
|
|
|
return MockKustoClient(main_response=main_response)
|
|
|
|
def retries_base(self, number_of_retries: int):
|
|
mock_kusto_client = self.unreliable_mock_kusto_client(number_of_retries - 1)
|
|
table = PyKustoClient(mock_kusto_client, fetch_by_default=False, retry_config=RetryConfig(number_of_retries, sleep_time=0.1, jitter=0))['test_db']['mock_table']
|
|
with self.assertLogs(_logger, logging.INFO) as cm:
|
|
Query(table).take(5).execute()
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db', 'mock_table | take 5')] * number_of_retries,
|
|
mock_kusto_client.recorded_queries
|
|
)
|
|
self.assertEqual(
|
|
[
|
|
f"INFO:pykusto:Attempt number {i} out of {number_of_retries} failed, "
|
|
f"previous sleep time was 0.1 seconds. Exception: KustoServiceError('Mock exception for test')"
|
|
for i in range(1, number_of_retries)
|
|
],
|
|
cm.output
|
|
)
|
|
|
|
def test_2_retry(self):
|
|
self.retries_base(2)
|
|
|
|
def test_3_retries(self):
|
|
self.retries_base(3)
|
|
|
|
def test_missing_retries(self):
|
|
mock_kusto_client = self.unreliable_mock_kusto_client(1)
|
|
table = PyKustoClient(
|
|
mock_kusto_client, fetch_by_default=False, retry_config=NO_RETRIES
|
|
)['test_db']['mock_table']
|
|
self.assertRaises(
|
|
KustoServiceError("Mock exception for test", None),
|
|
lambda: Query(table).take(5).execute(),
|
|
)
|
|
|
|
def test_non_transient_exception(self):
|
|
mock_kusto_client = self.unreliable_mock_kusto_client(1, KustoError)
|
|
table = PyKustoClient(mock_kusto_client, fetch_by_default=False, retry_config=RetryConfig(2, sleep_time=0.1, jitter=0))['test_db']['mock_table']
|
|
self.assertRaises(
|
|
KustoError("Mock exception for test", None),
|
|
lambda: Query(table).take(5).execute(),
|
|
)
|
|
|
|
def test_override_retry_config_for_query(self):
|
|
num_retries = 2
|
|
num_failed_attempts = num_retries - 1
|
|
retry_sleep_time = 0.1
|
|
|
|
mock_kusto_client = self.unreliable_mock_kusto_client(num_failed_attempts)
|
|
table = PyKustoClient(mock_kusto_client, fetch_by_default=False)['test_db']['mock_table']
|
|
retry_config = RetryConfig(num_retries, sleep_time=retry_sleep_time, jitter=0)
|
|
|
|
with self.assertLogs(_logger, logging.INFO) as cm:
|
|
Query(table).take(5).execute(retry_config=retry_config)
|
|
|
|
self.assertEqual(
|
|
[RecordedQuery('test_db', 'mock_table | take 5')] * num_retries,
|
|
mock_kusto_client.recorded_queries
|
|
)
|
|
self.assertEqual(
|
|
[
|
|
f"INFO:pykusto:Attempt number {num_failed_attempts} out of {num_retries} failed, "
|
|
f"previous sleep time was {retry_sleep_time} seconds. "
|
|
f"Exception: KustoServiceError('Mock exception for test')"
|
|
],
|
|
cm.output
|
|
)
|