Add support for using in PySpark notebook (#162)

This commit is contained in:
Yonatan Most 2021-07-06 14:30:22 +03:00 коммит произвёл GitHub
Родитель c42dea7d4b
Коммит 73fc10b0c3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 320 добавлений и 31 удалений

1
.github/workflows/runtests.yml поставляемый
Просмотреть файл

@ -22,7 +22,6 @@ jobs:
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
# TODO: test all relevant versions
python-version: ${{ matrix.python-version }}
- name: Install dependencies
# According to the internet using 'python -m pip' instead of 'pip' can prevent some issues

Просмотреть файл

@ -24,8 +24,10 @@
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="1">
<list size="3">
<item index="0" class="java.lang.String" itemvalue="azure" />
<item index="1" class="java.lang.String" itemvalue="pandas" />
<item index="2" class="java.lang.String" itemvalue="numpy" />
</list>
</value>
</option>

Просмотреть файл

@ -8,6 +8,7 @@ from ._src.client import *
from ._src.enums import *
from ._src.expressions import *
from ._src.functions import *
from ._src.pyspark_client import *
from ._src.query import *
__version__ = 'dev' # Version number is managed in the 'release' branch

Просмотреть файл

@ -1,13 +1,11 @@
from collections import defaultdict
from fnmatch import fnmatch
from threading import Lock
from typing import Union, List, Tuple, Dict, Generator, Optional, Set, Type, Callable
from typing import Union, List, Tuple, Dict, Generator, Optional, Set, Type, Callable, Iterable
from urllib.parse import urlparse
import pandas as pd
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties
# noinspection PyProtectedMember
from azure.kusto.data._models import KustoResultRow as _KustoResultRow
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
from azure.kusto.data.response import KustoResponseDataSet
@ -65,20 +63,20 @@ class KustoResponse:
def __init__(self, response: KustoResponseDataSet):
self.__response = response
def get_rows(self) -> List[_KustoResultRow]:
def get_rows(self) -> List[Iterable]:
return self.__response.primary_results[0].rows
@staticmethod
def is_row_valid(row: _KustoResultRow) -> bool:
def is_row_valid(row: Iterable) -> bool:
for field in row:
if field is None or (isinstance(field, str) and len(field.strip()) == 0):
return False
return True
def get_valid_rows(self) -> Generator[_KustoResultRow, None, None]:
def get_valid_rows(self) -> Generator[Tuple, None, None]:
for row in self.get_rows():
if self.is_row_valid(row):
yield row
yield tuple(row)
def to_dataframe(self) -> pd.DataFrame:
return dataframe_from_result_table(self.__response.primary_results[0])
@ -103,20 +101,26 @@ class PyKustoClient(_ItemFetcher):
def __init__(
self, client_or_cluster: Union[str, KustoClient], fetch_by_default: bool = True, use_global_cache: bool = False,
retry_config: RetryConfig = NO_RETRIES,
auth_method: Callable[[str], KustoConnectionStringBuilder] = KustoConnectionStringBuilder.with_az_cli_authentication,
auth_method: Optional[Callable[[str], KustoConnectionStringBuilder]] = KustoConnectionStringBuilder.with_az_cli_authentication,
) -> None:
"""
Create a new handle to Kusto cluster. The value of "fetch_by_default" is used for current instance, and also passed on to database instances.
Create a new handle to a Kusto cluster. The value of "fetch_by_default" is used for current instance, and also passed on to database instances.
:param client_or_cluster: Either a KustoClient instance, or a cluster name. In case a cluster name is provided, a KustoClient is generated using Azure CLI authentication,
falling back to AAD device authentication if needed.
:param client_or_cluster: Either a KustoClient instance, or a cluster URL. In case a cluster URL is provided, a KustoClient is generated using the provided auth_method.
:param use_global_cache: If true, share a global client cache between all instances. Provided for convenience during development, not recommended for general use.
:param retry_config: An instance of RetryConfig which instructs the client how to perform retries in case of failure. The default is NO_RETRIES.
:param auth_method: A method that returns a KustoConnectionStringBuilder for authentication. The default is 'KustoConnectionStringBuilder.with_az_cli_authentication'.
A popular alternative is 'KustoConnectionStringBuilder.with_aad_device_authentication'
"""
super().__init__(None, fetch_by_default)
self.__first_execution = True
self.__first_execution_lock = Lock()
self.__retry_config = retry_config
self.__auth_method = auth_method
self._internal_init(client_or_cluster, use_global_cache)
self._refresh_if_needed()
def _internal_init(self, client_or_cluster: Union[str, KustoClient], use_global_cache: bool):
if isinstance(client_or_cluster, KustoClient):
self.__client = client_or_cluster
# noinspection PyProtectedMember
@ -125,7 +129,6 @@ class PyKustoClient(_ItemFetcher):
else:
self.__cluster_name = client_or_cluster
self.__client = (self._cached_get_client_for_cluster if use_global_cache else self._get_client_for_cluster)()
self._refresh_if_needed()
def __repr__(self) -> str:
return f'PyKustoClient({self.__cluster_name})'
@ -147,10 +150,10 @@ class PyKustoClient(_ItemFetcher):
with self.__first_execution_lock:
if self.__first_execution:
self.__first_execution = False
return self.__internal_execute(database, query, properties, retry_config)
return self.__internal_execute(database, query, properties, retry_config)
return self._internal_execute(database, query, properties, retry_config)
return self._internal_execute(database, query, properties, retry_config)
def __internal_execute(self, database: str, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponse:
def _internal_execute(self, database: str, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponse:
resolved_retry_config = self.__retry_config if retry_config is None else retry_config
return KustoResponse(resolved_retry_config.retry(lambda: self.__client.execute(database, query, properties)))

Просмотреть файл

@ -0,0 +1,114 @@
from importlib.util import find_spec
from typing import Dict, Callable, Union, Tuple
import numpy as np
import pandas as pd
from azure.kusto.data import ClientRequestProperties, KustoClient
from pykusto import PyKustoClient, NO_RETRIES, KustoResponse, KQL, RetryConfig
from .logger import _logger
class DataframeBasedKustoResponse(KustoResponse):
"""
In PySpark Kusto results are returned as dataframes. We wrap the dataframe with this object for compatibility with :class:`PyKustoClient`.
"""
__dataframe: pd.DataFrame
# noinspection PyMissingConstructor
def __init__(self, dataframe: pd.DataFrame):
self.__dataframe = dataframe
def get_rows(self) -> np.ndarray:
return self.__dataframe.to_numpy()
def to_dataframe(self) -> pd.DataFrame:
return self.__dataframe
class PySparkKustoClient(PyKustoClient):
"""
Handle to a Kusto cluster, to be used inside a PySpark notebook.
"""
def __init__(self, cluster: str, linked_service: str = None, fetch_by_default: bool = True) -> None:
"""
Create a new handle to a Kusto cluster. The value of "fetch_by_default" is used for current instance, and also passed on to database instances.
:param cluster: a cluster URL.
:param linked_service: If provided, the connection to Kusto will be made via a pre-configured link (used only for Synapse). Otherwise, device authentication will be used
(tested only for Synapse, but should work for any PySpark notebook).
"""
self.__linked_service = linked_service
super().__init__(cluster, fetch_by_default, False, NO_RETRIES, None)
def _internal_init(self, client_or_cluster: Union[str, KustoClient], use_global_cache: bool):
assert isinstance(client_or_cluster, str), "PySparkKustoClient must be initialized with a cluster name"
self.__cluster_name = client_or_cluster
self.__options: Dict[str, Callable[[], str]] = {}
self.__kusto_session, self.__spark_context = self.__get_spark_session_and_context()
if self.__linked_service is None:
# Connect via device authentication
self.refresh_device_auth()
self.__format = 'com.microsoft.kusto.spark.datasource'
self.option('kustoCluster', self.__cluster_name)
else:
# Connect via pre-configured link
self.__format = 'com.microsoft.kusto.spark.synapse.datasource'
self.option('spark.synapse.linkedService', self.__linked_service)
def refresh_device_auth(self) -> None:
"""
Run device authentication sequence, called in the client constructor. Call this method again if you need to re-authenticate.
"""
assert self.__linked_service is None, "Device authentication can be used only when a linked_service was not provided to the client constructor"
# noinspection PyProtectedMember
device_auth = self.__spark_context._jvm.com.microsoft.kusto.spark.authentication.DeviceAuthentication(self.__cluster_name, "common")
_logger.info(device_auth.getDeviceCodeMessage())
self.option('accessToken', device_auth.acquireToken)
# noinspection PyUnresolvedReferences,PyPackageRequirements
@staticmethod
def __get_spark_session_and_context() -> Tuple['pyspark.sql.session.SparkSession', 'pyspark.context.SparkContext']: # noqa: F821 # pragma: no cover
if find_spec('pyspark') is None:
raise RuntimeError("pyspark package not found. PySparkKustoClient can only be used inside a PySpark notebook")
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
return SparkSession.builder.appName("kustoPySpark").getOrCreate(), SparkContext.getOrCreate()
def option(self, key: str, value: Union[str, Callable[[], str]]) -> 'PySparkKustoClient':
"""
Add an option to the underlying DataFrameReader. All authentication related options are already handled by this class, but use this method if you need any other options.
:param key: The option key.
:param value: Either an option value, or a callable to generate the option value.
:return: This instance for chained calls.
"""
if isinstance(value, str):
self.__options[key] = lambda: value
else:
self.__options[key] = value
return self
def clear_option(self, key: str) -> 'PySparkKustoClient':
"""
Clear an option from the underlying DataFrameReader.
:param key: The option key to clear.
:return: This instance for chained calls.
"""
self.__options.pop(key, None)
return self
def get_options(self) -> Dict[str, str]:
"""
Get the options set for the underlying DataFrameReader.
"""
return {key: value_producer() for key, value_producer in self.__options.items()}
def _internal_execute(self, database: str, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponse:
resolved_options = self.get_options()
resolved_options['kustoDatabase'] = database
resolved_options['kustoQuery'] = query
kusto_read_session = self.__kusto_session.read.format(self.__format)
for key, value in resolved_options.items():
kusto_read_session = kusto_read_session.option(key, value)
return DataframeBasedKustoResponse(kusto_read_session.load())

Просмотреть файл

@ -3,14 +3,8 @@ import logging
import sys
from concurrent.futures import Future
from threading import Event
from typing import Callable, Tuple, Any, List, Optional, Union
from typing import Callable, Tuple, Any, List, Optional, Union, Type
from unittest import TestCase
if sys.version_info[1] < 9:
# noinspection PyProtectedMember
from unittest.case import _AssertLogsContext
else:
# noinspection PyUnresolvedReferences,PyProtectedMember,PyCompatibility
from unittest._log import _AssertLogsContext
from urllib.parse import urljoin
from azure.kusto.data import KustoClient, ClientRequestProperties
@ -25,6 +19,13 @@ from pykusto._src.expressions import _NumberColumn, _BooleanColumn, _ArrayColumn
# noinspection PyProtectedMember
from pykusto._src.type_utils import _KustoType
if sys.version_info[1] < 9:
# noinspection PyProtectedMember
from unittest.case import _AssertLogsContext
else:
# noinspection PyUnresolvedReferences,PyProtectedMember,PyCompatibility
from unittest._log import _AssertLogsContext
# Naming this variable "test_table" triggers the following bug: https://github.com/pytest-dev/pytest/issues/7378
# noinspection PyTypeChecker
mock_table = Table(
@ -79,6 +80,9 @@ class TestBase(TestCase):
def raise_mock_exception():
raise Exception("Mock exception")
def assertType(self, obj: Any, expected_type: Type):
self.assertEqual(type(obj), expected_type)
# Get rid of this in Python 3.10, as this was resolved: https://bugs.python.org/issue39385
class CustomAssertLogsContext(_AssertLogsContext):
@ -236,4 +240,14 @@ class MockKustoClient(KustoClient):
return response()
def nested_attribute_dict(attributes: str, value: Any) -> Any:
"""
Used to mock chained lists of attribute references.
"""
result = value
for key in reversed(attributes.split('.')):
result = type(key + 'Wrapper', tuple(), {key: result})
return result
test_logger = logging.getLogger("pykusto_test")

Просмотреть файл

@ -1,5 +1,5 @@
from threading import Thread, Lock
from typing import Any, Type, Callable, List
from typing import Any, Callable, List
from unittest.mock import patch
from pykusto import PyKustoClient, Query
@ -18,9 +18,6 @@ class TestClientFetch(TestBase):
query_thread: Thread = None
query_results: List = []
def assertType(self, obj: Any, expected_type: Type):
self.assertEqual(type(obj), expected_type)
@staticmethod
def query_in_background(query: Callable[[], Any]):
with background_query_lock:

159
test/test_pyspark_client.py Normal file
Просмотреть файл

@ -0,0 +1,159 @@
import logging
from unittest.mock import patch
import pandas as pd
from pykusto import Query, PySparkKustoClient
# noinspection PyProtectedMember
from pykusto._src.expressions import _StringColumn, _NumberColumn
# noinspection PyProtectedMember
from pykusto._src.logger import _logger
from test.test_base import TestBase, nested_attribute_dict
class MockDataFrameReader:
def __init__(self, dataframe_to_return: pd.DataFrame) -> None:
self.recorded_format = None
self.recorded_options = {}
self.dataframe_to_return = dataframe_to_return
def format(self, the_format: str) -> 'MockDataFrameReader':
assert self.recorded_format is None, "Trying to set format twice"
self.recorded_format = the_format
return self
def option(self, key: str, value: str) -> 'MockDataFrameReader':
assert key not in self.recorded_options, f"Trying to set option '{key}' twice"
self.recorded_options[key] = value
return self
def load(self) -> pd.DataFrame:
return self.dataframe_to_return
class MockSparkSession:
def __init__(self, dataframe_to_return: pd.DataFrame) -> None:
self.read = MockDataFrameReader(dataframe_to_return)
# noinspection PyPep8Naming,PyMethodMayBeStatic
class MockDeviceAuthentication:
def __init__(self, mock_token: str):
self.mock_token = mock_token
def getDeviceCodeMessage(self):
return "To sign in, use a lubricated goat to open the pod bay doors."
def acquireToken(self):
return self.mock_token
class MockSparkContext:
def __init__(self, mock_token: str):
self.mock_token = mock_token
self._jvm = nested_attribute_dict('com.microsoft.kusto.spark.authentication.DeviceAuthentication', lambda s1, s2: MockDeviceAuthentication(self.mock_token))
class TestClient(TestBase):
def test_linked_service(self):
rows = (['foo', 10], ['bar', 20], ['baz', 30])
columns = ('stringField', 'numField')
expected_df = pd.DataFrame(rows, columns=columns)
mock_spark_session = MockSparkSession(expected_df)
with patch('pykusto._src.pyspark_client.PySparkKustoClient._PySparkKustoClient__get_spark_session_and_context', lambda s: (mock_spark_session, None)):
client = PySparkKustoClient('https://help.kusto.windows.net/', linked_service='MockLinkedKusto', fetch_by_default=False)
table = client['test_db']['mock_table']
actual_df = Query(table).take(5).to_dataframe()
self.assertTrue(expected_df.equals(actual_df))
self.assertEqual('com.microsoft.kusto.spark.synapse.datasource', mock_spark_session.read.recorded_format)
self.assertEqual(
{
'spark.synapse.linkedService': 'MockLinkedKusto',
'kustoDatabase': 'test_db',
'kustoQuery': 'mock_table | take 5',
},
mock_spark_session.read.recorded_options,
)
def test_linked_service_with_extra_options(self):
rows = (['foo', 10], ['bar', 20], ['baz', 30])
columns = ('stringField', 'numField')
expected_df = pd.DataFrame(rows, columns=columns)
mock_spark_session = MockSparkSession(expected_df)
with patch('pykusto._src.pyspark_client.PySparkKustoClient._PySparkKustoClient__get_spark_session_and_context', lambda s: (mock_spark_session, None)):
client = PySparkKustoClient('https://help.kusto.windows.net/', linked_service='MockLinkedKusto', fetch_by_default=False)
client.option('alsoMake', 'coffee')
client.option('performanceLevel', 'awesome')
client.clear_option('alsoMake')
table = client['test_db']['mock_table']
actual_df = Query(table).take(5).to_dataframe()
self.assertTrue(expected_df.equals(actual_df))
self.assertEqual('com.microsoft.kusto.spark.synapse.datasource', mock_spark_session.read.recorded_format)
self.assertEqual(
{
'spark.synapse.linkedService': 'MockLinkedKusto',
'kustoDatabase': 'test_db',
'kustoQuery': 'mock_table | take 5',
'performanceLevel': 'awesome',
},
mock_spark_session.read.recorded_options,
)
def test_device_auth(self):
rows = (['foo', 10], ['bar', 20], ['baz', 30])
columns = ('stringField', 'numField')
expected_df = pd.DataFrame(rows, columns=columns)
mock_spark_session = MockSparkSession(expected_df)
mock_spark_context = MockSparkContext('MOCK_TOKEN')
with patch('pykusto._src.pyspark_client.PySparkKustoClient._PySparkKustoClient__get_spark_session_and_context', lambda s: (mock_spark_session, mock_spark_context)),\
self.assertLogs(_logger, logging.INFO) as cm:
client = PySparkKustoClient('https://help.kusto.windows.net/', fetch_by_default=False)
self.assertEqual(["INFO:pykusto:To sign in, use a lubricated goat to open the pod bay doors."], cm.output)
table = client['test_db']['mock_table']
actual_df = Query(table).take(5).to_dataframe()
self.assertTrue(expected_df.equals(actual_df))
self.assertEqual('com.microsoft.kusto.spark.datasource', mock_spark_session.read.recorded_format)
self.assertEqual(
{
'kustoCluster': 'https://help.kusto.windows.net/',
'accessToken': 'MOCK_TOKEN',
'kustoDatabase': 'test_db',
'kustoQuery': 'mock_table | take 5',
},
mock_spark_session.read.recorded_options,
)
def test_linked_service_with_fetch(self):
rows = (
['test_db', 'mock_table', 'stringField', 'System.String'],
['test_db', 'mock_table', 'numField', 'System.Int32'],
)
columns = ('DatabaseName', 'TableName', 'ColumnName', 'ColumnType')
expected_df = pd.DataFrame(rows, columns=columns)
mock_spark_session = MockSparkSession(expected_df)
with patch('pykusto._src.pyspark_client.PySparkKustoClient._PySparkKustoClient__get_spark_session_and_context', lambda s: (mock_spark_session, None)):
client = PySparkKustoClient('https://help.kusto.windows.net/', linked_service='MockLinkedKusto')
client.wait_for_items()
self.assertType(client.test_db.mock_table.stringField, _StringColumn)
self.assertType(client.test_db.mock_table.numField, _NumberColumn)
self.assertEqual('com.microsoft.kusto.spark.synapse.datasource', mock_spark_session.read.recorded_format)
self.assertEqual(
{
'spark.synapse.linkedService': 'MockLinkedKusto',
'kustoDatabase': '',
'kustoQuery': '.show databases schema | project DatabaseName, TableName, ColumnName, ColumnType | limit 100000',
},
mock_spark_session.read.recorded_options,
)

Просмотреть файл

@ -469,6 +469,6 @@ class TestQuery(TestBase):
))
client.wait_for_items()
table = client.test_db.mock_table
self.assertTrue(
pd.DataFrame(rows, columns=columns).equals(Query(table).take(10).to_dataframe())
)
actual_df = Query(table).take(10).to_dataframe()
expected_df = pd.DataFrame(rows, columns=columns)
self.assertTrue(expected_df.equals(actual_df))