2020-06-01 12:56:47 +03:00
import logging
2021-03-16 15:53:04 +03:00
from typing import Type
2020-03-24 16:21:12 +03:00
from unittest . mock import patch
2021-05-13 14:06:13 +03:00
from azure . kusto . data import KustoClient , KustoConnectionStringBuilder
2021-03-16 15:53:04 +03:00
from azure . kusto . data . exceptions import KustoError
2020-03-24 16:21:12 +03:00
2021-03-16 15:53:04 +03:00
from pykusto import PyKustoClient , column_generator as col , Query , KustoServiceError , RetryConfig , NO_RETRIES
2020-07-21 13:10:38 +03:00
# noinspection PyProtectedMember
from pykusto . _src . logger import _logger
2021-05-13 14:06:13 +03:00
# noinspection PyProtectedMember
2021-03-16 15:53:04 +03:00
from test . test_base import TestBase , MockKustoClient , RecordedQuery , mock_response
2020-03-24 16:21:12 +03:00
class TestClient ( TestBase ) :
def test_single_table ( self ) :
mock_kusto_client = MockKustoClient ( )
2020-06-16 13:18:43 +03:00
table = PyKustoClient ( mock_kusto_client ) [ ' test_db ' ] [ ' mock_table ' ]
2020-03-24 16:21:12 +03:00
Query ( table ) . take ( 5 ) . execute ( )
self . assertEqual (
2020-06-16 13:18:43 +03:00
[ RecordedQuery ( ' test_db ' , ' mock_table | take 5 ' ) ] ,
2020-03-24 16:21:12 +03:00
mock_kusto_client . recorded_queries
)
def test_execute_no_table ( self ) :
self . assertRaises (
RuntimeError ( " No table supplied " ) ,
Query ( ) . take ( 5 ) . execute
)
def test_execute_already_bound ( self ) :
mock_kusto_client = MockKustoClient ( )
2020-06-16 13:18:43 +03:00
table = PyKustoClient ( mock_kusto_client ) [ ' test_db ' ] [ ' mock_table ' ]
2020-03-24 16:21:12 +03:00
self . assertRaises (
RuntimeError ( " This table is already bound to a query " ) ,
Query ( table ) . take ( 5 ) . execute ,
table
)
def test_single_table_on_execute ( self ) :
mock_kusto_client = MockKustoClient ( )
2020-06-16 13:18:43 +03:00
table = PyKustoClient ( mock_kusto_client ) [ ' test_db ' ] [ ' mock_table ' ]
2020-03-24 16:21:12 +03:00
Query ( ) . take ( 5 ) . execute ( table )
self . assertEqual (
2020-06-16 13:18:43 +03:00
[ RecordedQuery ( ' test_db ' , ' mock_table | take 5 ' ) ] ,
2020-03-24 16:21:12 +03:00
mock_kusto_client . recorded_queries ,
)
def test_get_table ( self ) :
mock_kusto_client = MockKustoClient ( )
2020-06-16 13:18:43 +03:00
table = PyKustoClient ( mock_kusto_client ) [ ' test_db ' ] . get_table ( ' mock_table ' )
2020-03-24 16:21:12 +03:00
Query ( table ) . take ( 5 ) . execute ( )
self . assertEqual (
2020-06-16 13:18:43 +03:00
[ RecordedQuery ( ' test_db ' , ' mock_table | take 5 ' ) ] ,
2020-03-24 16:21:12 +03:00
mock_kusto_client . recorded_queries ,
)
def test_union_table ( self ) :
mock_kusto_client = MockKustoClient ( )
table = PyKustoClient ( mock_kusto_client ) [ ' test_db ' ] . get_table ( ' test_table1 ' , ' test_table2 ' )
Query ( table ) . take ( 5 ) . execute ( )
self . assertEqual (
[ RecordedQuery ( ' test_db ' , ' union test_table1, test_table2 | take 5 ' ) ] ,
mock_kusto_client . recorded_queries ,
)
def test_union_table_with_wildcard ( self ) :
mock_kusto_client = MockKustoClient ( )
table = PyKustoClient ( mock_kusto_client ) [ ' test_db ' ] [ ' test_table_* ' ]
Query ( table ) . take ( 5 ) . execute ( )
self . assertEqual (
[ RecordedQuery ( ' test_db ' , ' union test_table_* | take 5 ' ) ] ,
mock_kusto_client . recorded_queries ,
)
def test_default_authentication ( self ) :
mock_kusto_client = MockKustoClient ( )
2021-05-13 14:06:13 +03:00
with patch ( ' pykusto._src.client.PyKustoClient._get_client_for_cluster ' , lambda s : mock_kusto_client ) :
2020-06-16 13:18:43 +03:00
table = PyKustoClient ( ' https://help.kusto.windows.net/ ' ) [ ' test_db ' ] [ ' mock_table ' ]
2020-03-24 16:21:12 +03:00
Query ( ) . take ( 5 ) . execute ( table )
self . assertIs (
mock_kusto_client ,
table . _Table__database . _Database__client . _PyKustoClient__client ,
)
self . assertEqual (
2020-06-16 13:18:43 +03:00
[ RecordedQuery ( ' test_db ' , ' mock_table | take 5 ' ) ] ,
2020-03-24 16:21:12 +03:00
mock_kusto_client . recorded_queries ,
)
2020-05-31 08:46:06 +03:00
def test_client_instances ( self ) :
2020-07-21 13:10:38 +03:00
with patch ( ' pykusto._src.client.PyKustoClient._get_client_for_cluster ' , MockKustoClient ) :
2020-05-31 08:46:06 +03:00
client_1 = PyKustoClient ( ' https://help.kusto.windows.net/ ' )
client_2 = PyKustoClient ( ' https://help.kusto.windows.net/ ' )
self . assertIsNot (
client_1 . _PyKustoClient__client ,
client_2 . _PyKustoClient__client ,
)
def test_client_instances_cached ( self ) :
2020-07-21 13:10:38 +03:00
with patch ( ' pykusto._src.client.PyKustoClient._get_client_for_cluster ' , MockKustoClient ) :
2020-05-31 08:46:06 +03:00
client_1 = PyKustoClient ( ' https://help.kusto.windows.net/ ' , use_global_cache = True )
client_2 = PyKustoClient ( ' https://help.kusto.windows.net/ ' , use_global_cache = True )
self . assertIs (
client_1 . _PyKustoClient__client ,
client_2 . _PyKustoClient__client ,
)
def test_client_instances_cached_distinct ( self ) :
2020-07-21 13:10:38 +03:00
with patch ( ' pykusto._src.client.PyKustoClient._get_client_for_cluster ' , MockKustoClient ) :
2020-05-31 08:46:06 +03:00
client_1 = PyKustoClient ( ' https://help1.kusto.windows.net/ ' , use_global_cache = True )
client_2 = PyKustoClient ( ' https://help2.kusto.windows.net/ ' , use_global_cache = True )
self . assertIsNot (
client_1 . _PyKustoClient__client ,
client_2 . _PyKustoClient__client ,
)
2020-03-24 16:21:12 +03:00
def test_cross_cluster_join ( self ) :
client1 = MockKustoClient ( " https://one.kusto.windows.net " )
client2 = MockKustoClient ( " https://two.kusto.windows.net " )
table1 = PyKustoClient ( client1 ) [ ' test_db_1 ' ] [ ' test_table_1 ' ]
table2 = PyKustoClient ( client2 ) [ ' test_db_2 ' ] [ ' test_table_2 ' ]
Query ( table1 ) . take ( 5 ) . join ( Query ( table2 ) . take ( 6 ) ) . on ( col . foo ) . execute ( )
self . assertEqual (
[ RecordedQuery ( ' test_db_1 ' , ' test_table_1 | take 5 | join (cluster( " two.kusto.windows.net " ).database( " test_db_2 " ).table( " test_table_2 " ) | take 6) on foo ' ) ] ,
client1 . recorded_queries ,
)
def test_cross_cluster_join_with_union ( self ) :
client1 = MockKustoClient ( " https://one.kusto.windows.net " )
client2 = MockKustoClient ( " https://two.kusto.windows.net " )
table1 = PyKustoClient ( client1 ) [ ' test_db_1 ' ] [ ' test_table_1 ' ]
table2 = PyKustoClient ( client2 ) [ ' test_db_2 ' ] . get_table ( ' test_table_2_* ' )
Query ( table1 ) . take ( 5 ) . join ( Query ( table2 ) . take ( 6 ) ) . on ( col . foo ) . execute ( )
self . assertEqual (
[ RecordedQuery ( ' test_db_1 ' , ' test_table_1 | take 5 | join (union cluster( " two.kusto.windows.net " ).database( " test_db_2 " ).table( " test_table_2_* " ) | take 6) on foo ' ) ] ,
client1 . recorded_queries ,
)
def test_cross_cluster_join_with_union_2 ( self ) :
client1 = MockKustoClient ( " https://one.kusto.windows.net " )
client2 = MockKustoClient ( " https://two.kusto.windows.net " )
table1 = PyKustoClient ( client1 ) [ ' test_db_1 ' ] [ ' test_table_1 ' ]
table2 = PyKustoClient ( client2 ) [ ' test_db_2 ' ] . get_table ( ' test_table_2_* ' , ' test_table_3_* ' )
Query ( table1 ) . take ( 5 ) . join ( Query ( table2 ) . take ( 6 ) ) . on ( col . foo ) . execute ( )
self . assertEqual (
[ RecordedQuery (
' test_db_1 ' ,
' test_table_1 | take 5 | join (union cluster( " two.kusto.windows.net " ).database( " test_db_2 " ).table( " test_table_2_* " ), '
' cluster( " two.kusto.windows.net " ).database( " test_db_2 " ).table( " test_table_3_* " ) | take 6) on foo ' ,
) ] ,
client1 . recorded_queries ,
)
2020-06-01 12:56:47 +03:00
def test_client_for_cluster_with_azure_cli_auth ( self ) :
2021-05-13 14:06:13 +03:00
with self . assertLogs ( _logger , logging . INFO ) as cm :
2020-06-01 12:56:47 +03:00
client = PyKustoClient ( ' https://help.kusto.windows.net ' , fetch_by_default = False )
self . assertIsInstance ( client . _PyKustoClient__client , KustoClient )
self . assertEqual ( ' https://help.kusto.windows.net ' , client . get_cluster_name ( ) )
self . assertEqual ( [ ] , cm . output )
2021-05-13 14:06:13 +03:00
def test_client_for_cluster_with_aad_device_auth ( self ) :
with self . assertLogs ( _logger , logging . INFO ) as cm :
client = PyKustoClient ( ' https://help.kusto.windows.net ' , fetch_by_default = False , auth_method = KustoConnectionStringBuilder . with_aad_device_authentication )
2020-06-01 12:56:47 +03:00
self . assertIsInstance ( client . _PyKustoClient__client , KustoClient )
self . assertEqual ( ' https://help.kusto.windows.net ' , client . get_cluster_name ( ) )
2021-05-13 14:06:13 +03:00
self . assertEqual ( [ ] , cm . output )
2021-03-16 15:53:04 +03:00
@staticmethod
def unreliable_mock_kusto_client ( number_of_failures : int , exception_type : Type [ Exception ] = KustoServiceError ) :
TestClient . attempt = 1
def main_response ( ) :
if TestClient . attempt < = number_of_failures :
TestClient . attempt + = 1
raise exception_type ( " Mock exception for test " , None )
return mock_response ( tuple ( ) ) ( )
return MockKustoClient ( main_response = main_response )
def retries_base ( self , number_of_retries : int ) :
mock_kusto_client = self . unreliable_mock_kusto_client ( number_of_retries - 1 )
table = PyKustoClient ( mock_kusto_client , fetch_by_default = False , retry_config = RetryConfig ( number_of_retries , sleep_time = 0.1 , jitter = 0 ) ) [ ' test_db ' ] [ ' mock_table ' ]
with self . assertLogs ( _logger , logging . INFO ) as cm :
Query ( table ) . take ( 5 ) . execute ( )
self . assertEqual (
[ RecordedQuery ( ' test_db ' , ' mock_table | take 5 ' ) ] * number_of_retries ,
mock_kusto_client . recorded_queries
)
self . assertEqual (
[
f " INFO:pykusto:Attempt number { i } out of { number_of_retries } failed, "
2021-05-13 14:06:13 +03:00
f " previous sleep time was 0.1 seconds. Exception: KustoServiceError( ' Mock exception for test ' ) "
2021-03-16 15:53:04 +03:00
for i in range ( 1 , number_of_retries )
] ,
cm . output
)
def test_2_retry ( self ) :
self . retries_base ( 2 )
def test_3_retries ( self ) :
self . retries_base ( 3 )
def test_missing_retries ( self ) :
mock_kusto_client = self . unreliable_mock_kusto_client ( 1 )
table = PyKustoClient (
mock_kusto_client , fetch_by_default = False , retry_config = NO_RETRIES
) [ ' test_db ' ] [ ' mock_table ' ]
self . assertRaises (
KustoServiceError ( " Mock exception for test " , None ) ,
lambda : Query ( table ) . take ( 5 ) . execute ( ) ,
)
def test_non_transient_exception ( self ) :
mock_kusto_client = self . unreliable_mock_kusto_client ( 1 , KustoError )
table = PyKustoClient ( mock_kusto_client , fetch_by_default = False , retry_config = RetryConfig ( 2 , sleep_time = 0.1 , jitter = 0 ) ) [ ' test_db ' ] [ ' mock_table ' ]
self . assertRaises (
KustoError ( " Mock exception for test " , None ) ,
lambda : Query ( table ) . take ( 5 ) . execute ( ) ,
)