Add option for installing without dependencies which are not needed in PySpark (#171)

1. Extract `PyKustoClientBase` class which is inherited by `PyKustoClient` and `PySparkKustoClient`.
2. Remove dependency of `PySparkKustoClient` on azure-kust-data package.
3. Add `pyspark` installation option, for installing without dependencies which are not needed in PySpark.
4. Replace `tests_require` (which is deprecated) with `extras_require`.
5. Small improvement to test coverage exclusion style.
This commit is contained in:
Yonatan Most 2021-08-08 20:00:02 +03:00 коммит произвёл GitHub
Родитель 8a627e4fe7
Коммит edd11ddbf1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 612 добавлений и 388 удалений

7
.coveragerc Normal file
Просмотреть файл

@ -0,0 +1,7 @@
[report]
# Lines to exclude from tests coverage requirement
exclude_lines =
# Exclude lines ad-hoc by adding this text in a comment
pragma: no cover
# Exclude abstract methods
raise NotImplementedError()

2
.github/workflows/pythonpublish.yml поставляемый
Просмотреть файл

@ -1,4 +1,4 @@
# This workflows will upload a Python Package using Twine when a release is created
# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries
name: Upload Python Package

2
.github/workflows/runtests.yml поставляемый
Просмотреть файл

@ -29,7 +29,7 @@ jobs:
python -m pip install --upgrade pip
# For some reason transitive dependencies are not installed properly, unless we first explicitly install 'azure-kusto-data'. Hence this ugly hack.
python -m pip install `grep -oP "azure-kusto-data==[^']+" setup.py`
python setup.py install
python -m pip install .[test]
python -m pip freeze
- name: Lint with flake8
run: |

Просмотреть файл

@ -56,6 +56,7 @@
<w>rightantisemi</w>
<w>rightouter</w>
<w>rightsemi</w>
<w>runtests</w>
<w>sqrt</w>
<w>startofday</w>
<w>startofmonth</w>

Просмотреть файл

@ -7,11 +7,22 @@ Started as a project in the 2019 Microsoft Hackathon.
# Getting Started
### Installation
Default installation:
```bash
pip install pykusto
```
With dependencies required for running the tests:
```bash
pip install pykusto[test]
```
Without dependencies which are not needed in PySpark:
```bash
pip install pykusto --global-option pyspark
```
### Basic usage
```python
from datetime import timedelta
from pykusto import PyKustoClient, Query
@ -31,7 +42,7 @@ t = client.Samples.StormEvents
# Build query
(
Query(t)
Query(t)
# Access columns using table variable
.project(t.StartTime, t.EndTime, t.EventType, t.Source)
# Specify new column name using Python keyword argument
@ -46,9 +57,11 @@ t = client.Samples.StormEvents
### Retrying failed queries
```python
# Turn on retrying for all queries
# Turn on retrying for all queries
from pykusto import PyKustoClient, RetryConfig, Query
client = PyKustoClient(
'https://help.kusto.windows.net',
"https://help.kusto.windows.net",
retry_config=RetryConfig() # Use default retry config
)

Просмотреть файл

@ -1,5 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="CheckStyle-IDEA-Module">
<option name="configuration">
<map />
</option>
</component>
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/build" />

Просмотреть файл

@ -5,6 +5,7 @@
# "import *" does not import names which start with an underscore
from ._src.client import *
from ._src.client_base import *
from ._src.enums import *
from ._src.expressions import *
from ._src.functions import *

Просмотреть файл

@ -1,63 +1,18 @@
from collections import defaultdict
from fnmatch import fnmatch
from threading import Lock
from typing import Union, List, Tuple, Dict, Generator, Optional, Set, Type, Callable, Iterable
from multiprocessing import Lock
from typing import Iterable, Callable, Dict, Union, Optional
from urllib.parse import urlparse
import pandas as pd
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
from azure.kusto.data.response import KustoResponseDataSet
from redo import retrier
from .expressions import BaseColumn, _AnyTypeColumn
from .item_fetcher import _ItemFetcher
from .client_base import KustoResponseBase, PyKustoClientBase, RetryConfig, NO_RETRIES, ClientRequestProperties
from .kql_converters import KQL
from .logger import _logger
from .type_utils import _INTERNAL_NAME_TO_TYPE, _typed_column, _DOT_NAME_TO_TYPE
class RetryConfig:
def __init__(
self, attempts: int = 5, sleep_time: float = 60, max_sleep_time: float = 300, sleep_scale: float = 1.5, jitter: float = 1,
retry_exceptions: Tuple[Type[Exception], ...] = (KustoServiceError,)
) -> None:
"""
All time parameters are in seconds
"""
self.attempts = attempts
self.sleep_time = sleep_time
self.max_sleep_time = max_sleep_time
self.sleep_scale = sleep_scale
self.jitter = jitter
self.retry_exceptions = retry_exceptions
def retry(self, action: Callable):
attempt = 1
for sleep_time in retrier(attempts=self.attempts, sleeptime=self.sleep_time, max_sleeptime=self.max_sleep_time, sleepscale=self.sleep_scale, jitter=self.jitter):
try:
return action()
except Exception as e:
for exception_to_check in self.retry_exceptions:
if isinstance(e, exception_to_check):
if attempt == self.attempts:
_logger.warning(f"Reached maximum number of attempts ({self.attempts}), raising exception")
raise
_logger.info(
f"Attempt number {attempt} out of {self.attempts} failed, "
f"previous sleep time was {sleep_time} seconds. Exception: {e.__class__.__name__}('{str(e)}')"
)
break
else:
raise
attempt += 1
NO_RETRIES = RetryConfig(1)
class KustoResponse:
class KustoResponse(KustoResponseBase):
__response: KustoResponseDataSet
def __init__(self, response: KustoResponseDataSet):
@ -66,35 +21,20 @@ class KustoResponse:
def get_rows(self) -> Iterable[Iterable]:
return self.__response.primary_results[0].rows
@staticmethod
def is_row_valid(row: Iterable) -> bool:
for field in row:
if field is None or (isinstance(field, str) and len(field.strip()) == 0):
return False
return True
def get_valid_rows(self) -> Generator[Tuple, None, None]:
for row in self.get_rows():
if self.is_row_valid(row):
yield tuple(row)
def to_dataframe(self) -> pd.DataFrame:
return dataframe_from_result_table(self.__response.primary_results[0])
class PyKustoClient(_ItemFetcher):
class PyKustoClient(PyKustoClientBase):
"""
Handle to a Kusto cluster.
Uses :class:`ItemFetcher` to fetch and cache the full cluster schema, including all databases, tables, columns and
their types.
"""
__client: KustoClient
__cluster_name: str
__first_execution: bool
__first_execution_lock: Lock
__retry_config: RetryConfig
__auth_method: Callable[[str], KustoConnectionStringBuilder]
# Static members
__global_client_cache: Dict[str, KustoClient] = {}
__global_cache_lock: Lock = Lock()
@ -112,292 +52,41 @@ class PyKustoClient(_ItemFetcher):
:param auth_method: A method that returns a KustoConnectionStringBuilder for authentication. The default is 'KustoConnectionStringBuilder.with_az_cli_authentication'.
A popular alternative is 'KustoConnectionStringBuilder.with_aad_device_authentication'
"""
super().__init__(None, fetch_by_default)
self.__first_execution = True
self.__first_execution_lock = Lock()
self.__retry_config = retry_config
self.__auth_method = auth_method
self._internal_init(client_or_cluster, use_global_cache)
self._refresh_if_needed()
def _internal_init(self, client_or_cluster: Union[str, KustoClient], use_global_cache: bool):
client_resolved = False
if isinstance(client_or_cluster, KustoClient):
self.__client = client_or_cluster
client_resolved = True
# noinspection PyProtectedMember
self.__cluster_name = urlparse(client_or_cluster._query_endpoint).netloc
cluster_name = urlparse(client_or_cluster._query_endpoint).netloc
assert not use_global_cache, "Global cache not supported when providing your own client instance"
else:
self.__cluster_name = client_or_cluster
cluster_name = client_or_cluster
super().__init__(cluster_name, fetch_by_default, retry_config)
if not client_resolved:
self.__client = (self._cached_get_client_for_cluster if use_global_cache else self._get_client_for_cluster)()
def __repr__(self) -> str:
return f"PyKustoClient('{self.__cluster_name}')"
def to_query_format(self) -> KQL:
return KQL(f'cluster("{self.__cluster_name}")')
def _new_item(self, name: str) -> 'Database':
# "fetch_by_default" set to false because often a database generated this way is not represented by an actual
# Kusto database
return Database(self, name, fetch_by_default=False)
def get_database(self, name: str) -> 'Database':
return self[name]
def execute(self, database: str, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponse:
# The first execution usually triggers an authentication flow. We block all subsequent executions to prevent redundant authentications.
# Remove the below block once this is resolved: https://github.com/Azure/azure-kusto-python/issues/208
with self.__first_execution_lock:
if self.__first_execution:
self.__first_execution = False
return self._internal_execute(database, query, properties, retry_config)
return self._internal_execute(database, query, properties, retry_config)
return f"PyKustoClient('{self._cluster_name}')"
def _internal_execute(self, database: str, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponse:
resolved_retry_config = self.__retry_config if retry_config is None else retry_config
resolved_retry_config = self._retry_config if retry_config is None else retry_config
if resolved_retry_config is not None:
resolved_retry_config = resolved_retry_config.retry_on(KustoServiceError)
return KustoResponse(resolved_retry_config.retry(lambda: self.__client.execute(database, query, properties)))
def get_databases_names(self) -> Generator[str, None, None]:
yield from self._get_item_names()
def get_databases(self) -> Generator['Database', None, None]:
yield from self._get_items()
def get_cluster_name(self) -> str:
return self.__cluster_name
def _get_client_for_cluster(self) -> KustoClient:
return KustoClient(self.__auth_method(self.__cluster_name))
return KustoClient(self.__auth_method(self._cluster_name))
def _cached_get_client_for_cluster(self) -> KustoClient:
"""
Provided for convenience during development, not recommended for general use.
"""
with PyKustoClient.__global_cache_lock:
client = PyKustoClient.__global_client_cache.get(self.__cluster_name)
client = PyKustoClient.__global_client_cache.get(self._cluster_name)
if client is None:
client = self._get_client_for_cluster()
PyKustoClient.__global_client_cache[self.__cluster_name] = client
PyKustoClient.__global_client_cache[self._cluster_name] = client
assert len(PyKustoClient.__global_client_cache) <= 1024, "Global client cache cannot exceed size of 1024"
return client
def _internal_get_items(self) -> Dict[str, 'Database']:
# Retrieves database names, table names, column names and types for all databases. A database name is required
# by the "execute" method, but is ignored for this query
res: KustoResponse = self.execute(
'', KQL('.show databases schema | project DatabaseName, TableName, ColumnName, ColumnType | limit 100000')
)
database_to_table_to_columns = defaultdict(lambda: defaultdict(list))
for database_name, table_name, column_name, column_type in res.get_valid_rows():
database_to_table_to_columns[database_name][table_name].append(
_typed_column.registry[_DOT_NAME_TO_TYPE[column_type]](column_name)
)
return {
# Database instances are provided with all table and column data, preventing them from generating more
# queries. However the "fetch_by_default" behavior is passed on to them for future actions.
database_name: Database(
self, database_name,
{table_name: tuple(columns) for table_name, columns in table_to_columns.items()},
fetch_by_default=self._fetch_by_default
)
for database_name, table_to_columns in database_to_table_to_columns.items()
}
class Database(_ItemFetcher):
"""
Handle to a Kusto database.
Uses :class:`ItemFetcher` to fetch and cache the full database schema, including all tables, columns and their
types.
"""
__client: PyKustoClient
__name: str
def __init__(
self, client: PyKustoClient, name: str, tables: Dict[str, Tuple[BaseColumn, ...]] = None,
fetch_by_default: bool = True
) -> None:
"""
Create a new handle to Kusto database. The value of "fetch_by_default" is used for current instance, and also
passed on to database instances.
:param client: The associated PyKustoClient instance
:param name: Database name
:param tables: A mapping from table names to the columns of each table. If this is None and "fetch_by_default"
is true then they will be fetched in the constructor.
"""
super().__init__(
# Providing the items to ItemFetcher prevents further queries until the "refresh" method is explicitly
# called
None if tables is None else {
table_name: Table(self, table_name, columns, fetch_by_default=fetch_by_default)
for table_name, columns in tables.items()
},
fetch_by_default
)
self.__client = client
self.__name = name
self._refresh_if_needed()
def __repr__(self) -> str:
return f"{self.__client}.Database('{self.__name}')"
def to_query_format(self) -> KQL:
return KQL(f'{self.__client.to_query_format()}.database("{self.__name}")')
def get_name(self) -> str:
return self.__name
def _new_item(self, name: str) -> 'Table':
# "fetch_by_default" set to false because often a table generated this way is not represented by an actual
# Kusto table
return Table(self, name, fetch_by_default=False)
def execute(self, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponse:
return self.__client.execute(self.__name, query, properties, retry_config)
def get_table_names(self) -> Generator[str, None, None]:
yield from self._get_item_names()
def get_table(self, *tables: str) -> 'Table':
assert len(tables) > 0
if not Table.static_is_union(*tables):
return self[tables[0]]
columns: Optional[Tuple[BaseColumn, ...]] = None
if self._items_fetched():
resolved_tables: Set[Table] = set()
for table_pattern in tables:
if '*' in table_pattern:
resolved_tables.update(table for table in self._get_items() if fnmatch(table.get_name(), table_pattern))
else:
resolved_tables.add(self[table_pattern])
if len(resolved_tables) == 1:
return next(iter(resolved_tables))
columns = self.__try_to_resolve_union_columns(*resolved_tables)
return Table(self, tables, columns, fetch_by_default=self._fetch_by_default)
@staticmethod
def __try_to_resolve_union_columns(*resolved_tables: 'Table') -> Optional[Tuple[BaseColumn, ...]]:
column_by_name: Dict[str, BaseColumn] = {}
for table in resolved_tables:
for column in table.get_columns():
existing_column = column_by_name.setdefault(column.get_name(), column)
if type(column) is not type(existing_column):
return None # Fallback to Kusto query for column name conflict resolution
return tuple(column_by_name.values())
def _internal_get_items(self) -> Dict[str, 'Table']:
# Retrieves table names, column names and types for this database only (the database name is added in the
# "execute" method)
res: KustoResponse = self.execute(
KQL('.show database schema | project TableName, ColumnName, ColumnType | limit 10000')
)
table_to_columns = defaultdict(list)
for table_name, column_name, column_type in res.get_valid_rows():
table_to_columns[table_name].append(_typed_column.registry[_DOT_NAME_TO_TYPE[column_type]](column_name))
# Table instances are provided with all column data, preventing them from generating more queries. However the
# "fetch_by_default" behavior is
# passed on to them for future actions.
return {
table_name: Table(self, table_name, tuple(columns), fetch_by_default=self._fetch_by_default)
for table_name, columns in table_to_columns.items()
}
class Table(_ItemFetcher):
"""
Handle to a Kusto table.
Uses :class:`ItemFetcher` to fetch and cache the table schema of columns and their types.
"""
__database: Database
__tables: Tuple[str, ...]
def __init__(
self, database: Database, tables: Union[str, List[str], Tuple[str, ...]],
columns: Tuple[BaseColumn, ...] = None, fetch_by_default: bool = True
) -> None:
"""
Create a new handle to a Kusto table.
:param database: The associated Database instance
:param tables: Either a single table name, or a list of tables. If more than one table is given OR the table
name contains a wildcard, the Kusto 'union' statement will be used.
:param columns: Table columns. If this is None and "ItemFetcher" is true then they will be fetched in the
constructor.
"""
super().__init__(
None if columns is None else {c.get_name(): c for c in columns},
fetch_by_default
)
self.__database = database
self.__tables = (tables,) if isinstance(tables, str) else tuple(tables)
assert len(self.__tables) > 0
self._refresh_if_needed()
def __repr__(self) -> str:
table_string = ', '.join(f"'{table}'" for table in self.__tables)
return f'{self.__database}.Table({table_string})'
def _new_item(self, name: str) -> BaseColumn:
return _AnyTypeColumn(name)
def __getattr__(self, name: str) -> BaseColumn:
"""
Convenience function for obtaining a column using dot notation.
In contrast with the overridden method from the :class:`ItemFetcher` class, a new column is generated if needed,
since new columns can be created on the fly in the course of the query (e.g. using 'extend'), and there is no
fear of undesired erroneous queries sent to Kusto.
:param name: Name of column
:return: The column with the given name
"""
return self[name]
@staticmethod
def static_is_union(*table_names: str) -> bool:
return len(table_names) > 1 or '*' in table_names[0]
def is_union(self) -> bool:
return self.static_is_union(*self.__tables)
def get_name(self) -> str:
assert not self.is_union()
return self.__tables[0]
def to_query_format(self, fully_qualified: bool = False) -> KQL:
if fully_qualified:
table_names = tuple(f'{self.__database.to_query_format()}.table("{table}")' for table in self.__tables)
else:
table_names = self.__tables
if self.is_union():
return KQL('union ' + ', '.join(table_names))
return KQL(table_names[0])
def execute(self, query: KQL, retry_config: RetryConfig = None) -> KustoResponse:
return self.__database.execute(query, retry_config=retry_config)
def get_columns_names(self) -> Generator[str, None, None]:
yield from self._get_item_names()
def get_columns(self) -> Generator[BaseColumn, None, None]:
yield from self._get_items()
def _internal_get_items(self) -> Dict[str, BaseColumn]:
if not self.is_union():
# Retrieves column names and types for this table only
res: KustoResponse = self.execute(
KQL(f'.show table {self.get_name()} | project AttributeName, AttributeType | limit 10000')
)
return {
column_name: _typed_column.registry[_INTERNAL_NAME_TO_TYPE[column_type]](column_name)
for column_name, column_type in res.get_valid_rows()
}
# Get Kusto to figure out the schema of the union, especially useful for column name conflict resolution
res: KustoResponse = self.execute(
KQL(f'{self.to_query_format()} | getschema | project ColumnName, DataType | limit 10000')
)
return {
column_name: _typed_column.registry[_DOT_NAME_TO_TYPE[column_type]](column_name)
for column_name, column_type in res.get_valid_rows()
}

421
pykusto/_src/client_base.py Normal file
Просмотреть файл

@ -0,0 +1,421 @@
import json
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from fnmatch import fnmatch
from threading import Lock
from typing import Union, List, Tuple, Dict, Generator, Optional, Set, Type, Callable, Iterable
import pandas as pd
from redo import retrier
from .expressions import BaseColumn, _AnyTypeColumn
from .item_fetcher import _ItemFetcher
from .kql_converters import KQL
from .logger import _logger
from .type_utils import _INTERNAL_NAME_TO_TYPE, _typed_column, _DOT_NAME_TO_TYPE, PythonTypes
class RetryConfig:
def __init__(
self, attempts: int = 5, sleep_time: float = 60, max_sleep_time: float = 300, sleep_scale: float = 1.5, jitter: float = 1,
retry_exceptions: Tuple[Type[Exception], ...] = tuple(),
) -> None:
"""
All time parameters are in seconds
"""
self.attempts = attempts
self.sleep_time = sleep_time
self.max_sleep_time = max_sleep_time
self.sleep_scale = sleep_scale
self.jitter = jitter
self.retry_exceptions = retry_exceptions
def retry(self, action: Callable):
attempt = 1
for sleep_time in retrier(attempts=self.attempts, sleeptime=self.sleep_time, max_sleeptime=self.max_sleep_time, sleepscale=self.sleep_scale, jitter=self.jitter):
try:
return action()
except Exception as e:
for exception_to_check in self.retry_exceptions:
if isinstance(e, exception_to_check):
if attempt == self.attempts:
_logger.warning(f"Reached maximum number of attempts ({self.attempts}), raising exception")
raise
_logger.info(
f"Attempt number {attempt} out of {self.attempts} failed, "
f"previous sleep time was {sleep_time} seconds. Exception: {e.__class__.__name__}('{str(e)}')"
)
break
else:
raise
attempt += 1
def retry_on(self, *additional_exceptions: Type[Exception]) -> 'RetryConfig':
return RetryConfig(self.attempts, self.sleep_time, self.max_sleep_time, self.sleep_scale, self.jitter, self.retry_exceptions + additional_exceptions)
NO_RETRIES = RetryConfig(1)
class KustoResponseBase(metaclass=ABCMeta):
@abstractmethod
def get_rows(self) -> Iterable[Iterable]:
raise NotImplementedError()
@staticmethod
def is_row_valid(row: Iterable) -> bool:
for field in row:
if field is None or (isinstance(field, str) and len(field.strip()) == 0):
return False
return True
def get_valid_rows(self) -> Generator[Tuple, None, None]:
for row in self.get_rows():
if self.is_row_valid(row):
yield tuple(row)
@abstractmethod
def to_dataframe(self) -> pd.DataFrame:
raise NotImplementedError()
# Copied from https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-data/azure/kusto/data/client.py
# We are copying this class because we don't won't to force a dependency on azure-kusto-data unless it's actually needed (e.g. it's not needed for PySpark usage).
# noinspection SpellCheckingInspection
class ClientRequestProperties:
"""This class is a POD used by client making requests to describe specific needs from the service executing the requests.
For more information please look at: https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties
"""
results_defer_partial_query_failures_option_name = "deferpartialqueryfailures"
request_timeout_option_name = "servertimeout"
no_request_timeout_option_name = "norequesttimeout"
def __init__(self) -> None:
self._options = {}
self._parameters = {}
self.client_request_id = None
self.application = None
self.user = None
@staticmethod
def _assert_value_is_valid(value: str):
if not value or not value.strip():
raise ValueError("Value should not be empty")
def set_parameter(self, name: str, value: Optional[PythonTypes]):
"""Sets a parameter's value"""
self._assert_value_is_valid(name)
self._parameters[name] = value
def has_parameter(self, name: str) -> bool:
"""Checks if a parameter is specified."""
return name in self._parameters
def get_parameter(self, name: str, default_value: Optional[PythonTypes]) -> Optional[PythonTypes]:
"""Gets a parameter's value."""
return self._parameters.get(name, default_value)
def set_option(self, name: str, value: Optional[PythonTypes]) -> None:
"""Sets an option's value"""
self._assert_value_is_valid(name)
self._options[name] = value
def has_option(self, name: str) -> bool:
"""Checks if an option is specified."""
return name in self._options
def get_option(self, name: str, default_value: Optional[PythonTypes]) -> Optional[PythonTypes]:
"""Gets an option's value."""
return self._options.get(name, default_value)
def to_json(self) -> str:
"""Safe serialization to a JSON string."""
return json.dumps({"Options": self._options, "Parameters": self._parameters}, default=str)
class PyKustoClientBase(_ItemFetcher, metaclass=ABCMeta):
"""
Handle to a Kusto cluster.
Uses :class:`ItemFetcher` to fetch and cache the full cluster schema, including all databases, tables, columns and
their types.
"""
_cluster_name: str
_retry_config: RetryConfig
__first_execution: bool
__first_execution_lock: Lock
@abstractmethod
def __init__(
self, cluster_name: str, fetch_by_default: bool = True, retry_config: RetryConfig = NO_RETRIES,
) -> None:
"""
Create a new handle to a Kusto cluster. The value of "fetch_by_default" is used for current instance, and also passed on to database instances.
:param cluster_name: A cluster URL.
:param retry_config: An instance of RetryConfig which instructs the client how to perform retries in case of failure. The default is NO_RETRIES.
"""
super().__init__(None, fetch_by_default)
self._cluster_name = cluster_name
self._retry_config = retry_config
self.__first_execution = True
self.__first_execution_lock = Lock()
self._refresh_if_needed()
@abstractmethod
def __repr__(self) -> str:
raise NotImplementedError()
def to_query_format(self) -> KQL:
return KQL(f'cluster("{self._cluster_name}")')
def _new_item(self, name: str) -> 'Database':
# "fetch_by_default" set to false because often a database generated this way is not represented by an actual
# Kusto database
return Database(self, name, fetch_by_default=False)
def get_database(self, name: str) -> 'Database':
return self[name]
def execute(self, database: str, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponseBase:
# The first execution usually triggers an authentication flow. We block all subsequent executions to prevent redundant authentications.
# Remove the below block once this is resolved: https://github.com/Azure/azure-kusto-python/issues/208
with self.__first_execution_lock:
if self.__first_execution:
self.__first_execution = False
return self._internal_execute(database, query, properties, retry_config)
return self._internal_execute(database, query, properties, retry_config)
@abstractmethod
def _internal_execute(self, database: str, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponseBase:
raise NotImplementedError()
def get_databases_names(self) -> Generator[str, None, None]:
yield from self._get_item_names()
def get_databases(self) -> Generator['Database', None, None]:
yield from self._get_items()
def get_cluster_name(self) -> str:
return self._cluster_name
def _internal_get_items(self) -> Dict[str, 'Database']:
# Retrieves database names, table names, column names and types for all databases. A database name is required
# by the "execute" method, but is ignored for this query
res: KustoResponseBase = self.execute(
'', KQL('.show databases schema | project DatabaseName, TableName, ColumnName, ColumnType | limit 100000')
)
database_to_table_to_columns = defaultdict(lambda: defaultdict(list))
for database_name, table_name, column_name, column_type in res.get_valid_rows():
database_to_table_to_columns[database_name][table_name].append(
_typed_column.registry[_DOT_NAME_TO_TYPE[column_type]](column_name)
)
return {
# Database instances are provided with all table and column data, preventing them from generating more
# queries. However the "fetch_by_default" behavior is passed on to them for future actions.
database_name: Database(
self, database_name,
{table_name: tuple(columns) for table_name, columns in table_to_columns.items()},
fetch_by_default=self._fetch_by_default
)
for database_name, table_to_columns in database_to_table_to_columns.items()
}
class Database(_ItemFetcher):
"""
Handle to a Kusto database.
Uses :class:`ItemFetcher` to fetch and cache the full database schema, including all tables, columns and their
types.
"""
__client: PyKustoClientBase
__name: str
def __init__(
self, client: PyKustoClientBase, name: str, tables: Dict[str, Tuple[BaseColumn, ...]] = None,
fetch_by_default: bool = True
) -> None:
"""
Create a new handle to Kusto database. The value of "fetch_by_default" is used for current instance, and also
passed on to database instances.
:param client: The associated PyKustoClient instance
:param name: Database name
:param tables: A mapping from table names to the columns of each table. If this is None and "fetch_by_default"
is true then they will be fetched in the constructor.
"""
super().__init__(
# Providing the items to ItemFetcher prevents further queries until the "refresh" method is explicitly
# called
None if tables is None else {
table_name: Table(self, table_name, columns, fetch_by_default=fetch_by_default)
for table_name, columns in tables.items()
},
fetch_by_default
)
self.__client = client
self.__name = name
self._refresh_if_needed()
def __repr__(self) -> str:
return f"{self.__client}.Database('{self.__name}')"
def to_query_format(self) -> KQL:
return KQL(f'{self.__client.to_query_format()}.database("{self.__name}")')
def get_name(self) -> str:
return self.__name
def _new_item(self, name: str) -> 'Table':
# "fetch_by_default" set to false because often a table generated this way is not represented by an actual
# Kusto table
return Table(self, name, fetch_by_default=False)
def execute(self, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponseBase:
return self.__client.execute(self.__name, query, properties, retry_config)
def get_table_names(self) -> Generator[str, None, None]:
yield from self._get_item_names()
def get_table(self, *tables: str) -> 'Table':
assert len(tables) > 0
if not Table.static_is_union(*tables):
return self[tables[0]]
columns: Optional[Tuple[BaseColumn, ...]] = None
if self._items_fetched():
resolved_tables: Set[Table] = set()
for table_pattern in tables:
if '*' in table_pattern:
resolved_tables.update(table for table in self._get_items() if fnmatch(table.get_name(), table_pattern))
else:
resolved_tables.add(self[table_pattern])
if len(resolved_tables) == 1:
return next(iter(resolved_tables))
columns = self.__try_to_resolve_union_columns(*resolved_tables)
return Table(self, tables, columns, fetch_by_default=self._fetch_by_default)
@staticmethod
def __try_to_resolve_union_columns(*resolved_tables: 'Table') -> Optional[Tuple[BaseColumn, ...]]:
column_by_name: Dict[str, BaseColumn] = {}
for table in resolved_tables:
for column in table.get_columns():
existing_column = column_by_name.setdefault(column.get_name(), column)
if type(column) is not type(existing_column):
return None # Fallback to Kusto query for column name conflict resolution
return tuple(column_by_name.values())
def _internal_get_items(self) -> Dict[str, 'Table']:
# Retrieves table names, column names and types for this database only (the database name is added in the
# "execute" method)
res: KustoResponseBase = self.execute(
KQL('.show database schema | project TableName, ColumnName, ColumnType | limit 10000')
)
table_to_columns = defaultdict(list)
for table_name, column_name, column_type in res.get_valid_rows():
table_to_columns[table_name].append(_typed_column.registry[_DOT_NAME_TO_TYPE[column_type]](column_name))
# Table instances are provided with all column data, preventing them from generating more queries. However the
# "fetch_by_default" behavior is
# passed on to them for future actions.
return {
table_name: Table(self, table_name, tuple(columns), fetch_by_default=self._fetch_by_default)
for table_name, columns in table_to_columns.items()
}
class Table(_ItemFetcher):
"""
Handle to a Kusto table.
Uses :class:`ItemFetcher` to fetch and cache the table schema of columns and their types.
"""
__database: Database
__tables: Tuple[str, ...]
def __init__(
self, database: Database, tables: Union[str, List[str], Tuple[str, ...]],
columns: Tuple[BaseColumn, ...] = None, fetch_by_default: bool = True
) -> None:
"""
Create a new handle to a Kusto table.
:param database: The associated Database instance
:param tables: Either a single table name, or a list of tables. If more than one table is given OR the table
name contains a wildcard, the Kusto 'union' statement will be used.
:param columns: Table columns. If this is None and "ItemFetcher" is true then they will be fetched in the
constructor.
"""
super().__init__(
None if columns is None else {c.get_name(): c for c in columns},
fetch_by_default
)
self.__database = database
self.__tables = (tables,) if isinstance(tables, str) else tuple(tables)
assert len(self.__tables) > 0
self._refresh_if_needed()
def __repr__(self) -> str:
table_string = ', '.join(f"'{table}'" for table in self.__tables)
return f'{self.__database}.Table({table_string})'
def _new_item(self, name: str) -> BaseColumn:
return _AnyTypeColumn(name)
def __getattr__(self, name: str) -> BaseColumn:
"""
Convenience function for obtaining a column using dot notation.
In contrast with the overridden method from the :class:`ItemFetcher` class, a new column is generated if needed,
since new columns can be created on the fly in the course of the query (e.g. using 'extend'), and there is no
fear of undesired erroneous queries sent to Kusto.
:param name: Name of column
:return: The column with the given name
"""
return self[name]
@staticmethod
def static_is_union(*table_names: str) -> bool:
return len(table_names) > 1 or '*' in table_names[0]
def is_union(self) -> bool:
return self.static_is_union(*self.__tables)
def get_name(self) -> str:
assert not self.is_union()
return self.__tables[0]
def to_query_format(self, fully_qualified: bool = False) -> KQL:
if fully_qualified:
table_names = tuple(f'{self.__database.to_query_format()}.table("{table}")' for table in self.__tables)
else:
table_names = self.__tables
if self.is_union():
return KQL('union ' + ', '.join(table_names))
return KQL(table_names[0])
def execute(self, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponseBase:
return self.__database.execute(query, properties, retry_config)
def get_columns_names(self) -> Generator[str, None, None]:
yield from self._get_item_names()
def get_columns(self) -> Generator[BaseColumn, None, None]:
yield from self._get_items()
def _internal_get_items(self) -> Dict[str, BaseColumn]:
if not self.is_union():
# Retrieves column names and types for this table only
res: KustoResponseBase = self.execute(
KQL(f'.show table {self.get_name()} | project AttributeName, AttributeType | limit 10000')
)
return {
column_name: _typed_column.registry[_INTERNAL_NAME_TO_TYPE[column_type]](column_name)
for column_name, column_type in res.get_valid_rows()
}
# Get Kusto to figure out the schema of the union, especially useful for column name conflict resolution
res: KustoResponseBase = self.execute(
KQL(f'{self.to_query_format()} | getschema | project ColumnName, DataType | limit 10000')
)
return {
column_name: _typed_column.registry[_DOT_NAME_TO_TYPE[column_type]](column_name)
for column_name, column_type in res.get_valid_rows()
}

Просмотреть файл

@ -534,7 +534,7 @@ class Functions:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/make-timespanfunction
"""
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
# def max_of(self): return
#
@ -553,7 +553,7 @@ class Functions:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/newguidfunction
"""
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
@staticmethod
def now(offset: TimespanType = None) -> _DatetimeExpression:
@ -628,14 +628,14 @@ class Functions:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/percentile-tdigestfunction
"""
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
@staticmethod
def percentrank_tdigest() -> AnyExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/percentrank-tdigestfunction
"""
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
@staticmethod
def pow(expr1: NumberType, expr2: NumberType) -> _NumberExpression:
@ -1021,7 +1021,7 @@ class Functions:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/toguidfunction
"""
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
@staticmethod
def to_hex(expr1: NumberType, expr2: NumberType = None) -> _StringExpression:
@ -1104,21 +1104,21 @@ class Functions:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/urldecodefunction
"""
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
@staticmethod
def url_encode() -> _StringExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/urlencodefunction
"""
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
@staticmethod
def week_of_year() -> _NumberExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/weekofyearfunction
"""
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
# def welch_test(self): return
@ -1127,7 +1127,7 @@ class Functions:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/zipfunction
"""
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
# ----------------------------------------------------
# Aggregation functions

Просмотреть файл

@ -2,7 +2,7 @@ from abc import ABCMeta, abstractmethod
from concurrent.futures import Future, ThreadPoolExecutor, TimeoutError
from itertools import chain
from threading import Lock
from typing import Union, Dict, Any, Iterable, Callable, Generator
from typing import Union, Dict, Any, Iterable, Callable, Generator, Optional
from .logger import _logger
@ -24,7 +24,7 @@ class _ItemFetcher(metaclass=ABCMeta):
__future: Union[None, Future]
__items_lock: Lock
def __init__(self, items: Union[None, Dict[str, Any]], fetch_by_default: bool) -> None:
def __init__(self, items: Optional[Dict[str, Any]], fetch_by_default: bool) -> None:
"""
:param items: Initial items. If not None, items will not be fetched until the "refresh" method is explicitly called.
:param fetch_by_default: When true, items will be fetched even if not explicitly requested, but only if they were not supplied as a parameter. Subclasses are encouraged
@ -55,7 +55,7 @@ class _ItemFetcher(metaclass=ABCMeta):
@abstractmethod
def _new_item(self, name: str) -> Any:
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
def __getattr__(self, name: str) -> Any:
"""
@ -144,7 +144,7 @@ class _ItemFetcher(metaclass=ABCMeta):
@abstractmethod
def _internal_get_items(self) -> Dict[str, Any]:
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
def __fetch_items(self) -> None:
fetched_items = self._internal_get_items()

Просмотреть файл

@ -3,12 +3,12 @@ from typing import Dict, Callable, Union, Tuple
import numpy as np
import pandas as pd
from azure.kusto.data import ClientRequestProperties, KustoClient
from pykusto import PyKustoClient, NO_RETRIES, KustoResponse, KQL, RetryConfig
from pykusto import NO_RETRIES, KQL, RetryConfig, KustoResponseBase, PyKustoClientBase
from pykusto._src.client_base import ClientRequestProperties
class DataframeBasedKustoResponse(KustoResponse):
class DataframeBasedKustoResponse(KustoResponseBase):
"""
In PySpark Kusto results are returned as dataframes. We wrap the dataframe with this object for compatibility with :class:`PyKustoClient`.
"""
@ -25,25 +25,21 @@ class DataframeBasedKustoResponse(KustoResponse):
return self.__dataframe
class PySparkKustoClient(PyKustoClient):
class PySparkKustoClient(PyKustoClientBase):
"""
Handle to a Kusto cluster, to be used inside a PySpark notebook.
"""
def __init__(self, cluster: str, linked_service: str = None, fetch_by_default: bool = True) -> None:
def __init__(self, cluster_name: str, linked_service: str = None, fetch_by_default: bool = True) -> None:
"""
Create a new handle to a Kusto cluster. The value of "fetch_by_default" is used for current instance, and also passed on to database instances.
:param cluster: a cluster URL.
:param cluster_name: a cluster URL.
:param linked_service: If provided, the connection to Kusto will be made via a pre-configured link (used only for Synapse). Otherwise, device authentication will be used
(tested only for Synapse, but should work for any PySpark notebook).
"""
self.__linked_service = linked_service
super().__init__(cluster, fetch_by_default, False, NO_RETRIES, None)
def _internal_init(self, client_or_cluster: Union[str, KustoClient], use_global_cache: bool):
assert isinstance(client_or_cluster, str), "PySparkKustoClient must be initialized with a cluster name"
self.__cluster_name = client_or_cluster
super().__init__(cluster_name, fetch_by_default, NO_RETRIES)
self.__options: Dict[str, Callable[[], str]] = {}
self.__kusto_session, self.__spark_context = self.__get_spark_session_and_context()
@ -51,14 +47,14 @@ class PySparkKustoClient(PyKustoClient):
# Connect via device authentication
self.refresh_device_auth()
self.__format = 'com.microsoft.kusto.spark.datasource'
self.option('kustoCluster', self.__cluster_name)
self.option('kustoCluster', self._cluster_name)
else:
# Connect via pre-configured link
self.__format = 'com.microsoft.kusto.spark.synapse.datasource'
self.option('spark.synapse.linkedService', self.__linked_service)
def __repr__(self) -> str:
items = [self.__cluster_name]
items = [self._cluster_name]
if self.__linked_service is not None:
items.append(self.__linked_service)
item_string = ', '.join(f"'{item}'" for item in items)
@ -70,7 +66,7 @@ class PySparkKustoClient(PyKustoClient):
"""
assert self.__linked_service is None, "Device authentication can be used only when a linked_service was not provided to the client constructor"
# noinspection PyProtectedMember
device_auth = self.__spark_context._jvm.com.microsoft.kusto.spark.authentication.DeviceAuthentication(self.__cluster_name, "common")
device_auth = self.__spark_context._jvm.com.microsoft.kusto.spark.authentication.DeviceAuthentication(self._cluster_name, "common")
print(device_auth.getDeviceCodeMessage()) # Logging is better than printing, but the PySpark notebook does not display logs by default
self.option('accessToken', device_auth.acquireToken)
@ -111,8 +107,10 @@ class PySparkKustoClient(PyKustoClient):
"""
return {key: value_producer() for key, value_producer in self.__options.items()}
def _internal_execute(self, database: str, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponse:
def _internal_execute(self, database: str, query: KQL, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> DataframeBasedKustoResponse:
resolved_options = self.get_options()
if properties is not None:
resolved_options['clientRequestPropertiesJson'] = properties.to_json()
resolved_options['kustoDatabase'] = database
resolved_options['kustoQuery'] = query
kusto_read_session = self.__kusto_session.read.format(self.__format)

Просмотреть файл

@ -5,7 +5,7 @@ from os import linesep
from types import FunctionType
from typing import Tuple, List, Union, Optional
from .client import Table, KustoResponse, RetryConfig
from .client_base import Table, KustoResponseBase, RetryConfig, ClientRequestProperties
from .enums import Order, Nulls, JoinKind, Distribution, BagExpansion
from .expressions import BooleanType, ExpressionType, AggregationExpression, _AssignmentBase, _AssignmentFromAggregationToColumn, _AssignmentToSingleColumn, \
_AnyTypeColumn, BaseExpression, _AssignmentFromColumnToColumn, AnyExpression, _to_kql, _expression_to_type, BaseColumn, NumberType, OrderedType
@ -239,7 +239,7 @@ class Query:
@abstractmethod
def _compile(self) -> KQL:
raise NotImplementedError() # pragma: no cover
raise NotImplementedError()
def _compile_all(self, use_full_table_name) -> KQL:
if self._head is None:
@ -280,7 +280,7 @@ class Query:
kql = KQL(kql.replace(" |", linesep + "|"))
return kql
def execute(self, table: Table = None, retry_config: RetryConfig = None) -> KustoResponse:
def execute(self, table: Table = None, properties: ClientRequestProperties = None, retry_config: RetryConfig = None) -> KustoResponseBase:
if self.get_table() is None:
if table is None:
raise RuntimeError("No table supplied")
@ -292,10 +292,10 @@ class Query:
rendered_query = self.render()
_logger.debug("Running query: " + rendered_query)
return table.execute(rendered_query, retry_config)
return table.execute(rendered_query, properties, retry_config)
def to_dataframe(self, table: Table = None, retry_config: RetryConfig = None):
return self.execute(table, retry_config).to_dataframe()
def to_dataframe(self, table: Table = None, properties: ClientRequestProperties = None, retry_config: RetryConfig = None):
return self.execute(table, properties, retry_config).to_dataframe()
@staticmethod
def _extract_assignments(*args: Union[_AssignmentBase, BaseExpression], **kwargs: ExpressionType) -> List[_AssignmentBase]:

Просмотреть файл

@ -2,6 +2,7 @@ import os
import sys
from setuptools import setup, find_packages
from setuptools.command.install import install
assert sys.version_info[0] == 3
__version__ = None
@ -12,22 +13,47 @@ with open(os.path.join('.', 'pykusto', '__init__.py')) as f:
__version__ = line.split(delim)[1]
assert __version__ is not None, 'Unable to determine version'
install_requires = [
# Release notes: https://github.com/Azure/azure-kusto-python/releases
'azure-kusto-data==2.1.1', # Earlier versions not supported because of: https://github.com/Azure/azure-kusto-python/issues/312
core_requires = [
'redo==2.0.4',
]
non_pyspark_requires = [
# Not required in PySpark, because authentication is handled differently there.
# Release notes: https://github.com/Azure/azure-kusto-python/releases
'azure-kusto-data==2.1.1', # Earlier versions not supported because of: https://github.com/Azure/azure-kusto-python/issues/312
]
# Not required in PySpark because it is already installed there.
# pandas release notes: https://pandas.pydata.org/docs/whatsnew/index.html
# Tests use DataFrame constructor options introduced in 0.25.0
if sys.version_info[1] <= 6:
# pandas support for Python 3.6 was dropped starting from version 1.2.0
install_requires.append('pandas>=0.25.0,<1.2.0')
non_pyspark_requires.append('pandas>=0.25.0,<1.2.0')
# In numpy the support was dropped in 1.20.0, and also the transitive dependency in pandas is not correctly restricted
install_requires.append('numpy<1.20.0')
non_pyspark_requires.append('numpy<1.20.0')
else:
install_requires.append('pandas>=0.25.0,<=1.2.4')
non_pyspark_requires.append('pandas>=0.25.0,<=1.2.4')
# Allows installing with '--pyspark' to avoid unneeded dependencies.
# Usage:
# pip install pykusto --global-option pyspark
# OR
# python setup.py install --pyspark
class CustomInstall(install):
user_options = install.user_options + [('pyspark', None, None)]
def initialize_options(self):
super().initialize_options()
# noinspection PyAttributeOutsideInit
self.pyspark = None
def run(self):
if self.pyspark:
# noinspection PyUnresolvedReferences
self.distribution.install_requires = core_requires
super().run()
setup(
name='pykusto',
@ -41,13 +67,16 @@ setup(
long_description=open("README.md", "r").read(),
long_description_content_type="text/markdown",
keywords="kusto azure-data-explorer client library query",
install_requires=install_requires,
tests_require=[
'pytest',
'pytest-cov',
'flake8',
'typeguard',
],
cmdclass={'install': CustomInstall},
install_requires=core_requires + non_pyspark_requires,
extras_require={
'test': [
'pytest',
'pytest-cov',
'flake8',
'typeguard',
],
},
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",

Просмотреть файл

@ -15,7 +15,9 @@ from azure.kusto.data._models import KustoResultTable, KustoResultRow
from azure.kusto.data.response import KustoResponseDataSet
# noinspection PyProtectedMember
from pykusto._src.client import Table, Database, PyKustoClient
from pykusto._src.client import PyKustoClient
# noinspection PyProtectedMember
from pykusto._src.client_base import Table, Database
# noinspection PyProtectedMember
from pykusto._src.expressions import _NumberColumn, _BooleanColumn, _ArrayColumn, _MappingColumn, _StringColumn, _DatetimeColumn, _TimespanColumn, _DynamicColumn
# noinspection PyProtectedMember

Просмотреть файл

@ -5,7 +5,7 @@ from unittest.mock import patch
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoError
from pykusto import PyKustoClient, column_generator as col, Query, KustoServiceError, RetryConfig, NO_RETRIES
from pykusto import PyKustoClient, column_generator as col, Query, KustoServiceError, RetryConfig, NO_RETRIES, ClientRequestProperties
# noinspection PyProtectedMember
from pykusto._src.logger import _logger
# noinspection PyProtectedMember
@ -169,6 +169,19 @@ class TestClient(TestBase):
self.assertEqual('https://help.kusto.windows.net', client.get_cluster_name())
self.assertEqual([], cm.output)
def test_request_properties(self):
properties = ClientRequestProperties()
properties.set_option(ClientRequestProperties.results_defer_partial_query_failures_option_name, False)
properties.set_parameter('xIntValue', 11)
mock_kusto_client = MockKustoClient()
table = PyKustoClient(mock_kusto_client)['test_db']['mock_table']
Query(table).take(5).execute(properties=properties)
self.assertEqual(
[RecordedQuery('test_db', 'mock_table | take 5', properties)],
mock_kusto_client.recorded_queries
)
@staticmethod
def unreliable_mock_kusto_client(number_of_failures: int, exception_type: Type[Exception] = KustoServiceError):
TestClient.attempt = 1

Просмотреть файл

@ -4,7 +4,7 @@ from unittest.mock import patch
from pykusto import PyKustoClient, Query
# noinspection PyProtectedMember
from pykusto._src.client import Database
from pykusto._src.client_base import Database
# noinspection PyProtectedMember
from pykusto._src.expressions import _StringColumn, _NumberColumn, _AnyTypeColumn, _BooleanColumn
# noinspection PyProtectedMember

Просмотреть файл

@ -2,7 +2,7 @@ from unittest.mock import patch
import pandas as pd
from pykusto import Query, PySparkKustoClient
from pykusto import Query, PySparkKustoClient, ClientRequestProperties
# noinspection PyProtectedMember
from pykusto._src.expressions import _StringColumn, _NumberColumn
# noinspection PyProtectedMember
@ -76,6 +76,34 @@ class TestClient(TestBase):
mock_spark_session.read.recorded_options,
)
def test_linked_service_with_request_properties(self):
rows = (['foo', 10], ['bar', 20], ['baz', 30])
columns = ('stringField', 'numField')
expected_df = pd.DataFrame(rows, columns=columns)
mock_spark_session = MockSparkSession(expected_df)
with patch('pykusto._src.pyspark_client.PySparkKustoClient._PySparkKustoClient__get_spark_session_and_context', lambda s: (mock_spark_session, None)):
client = PySparkKustoClient('https://help.kusto.windows.net/', linked_service='MockLinkedKusto', fetch_by_default=False)
properties = ClientRequestProperties()
properties.set_option(ClientRequestProperties.results_defer_partial_query_failures_option_name, False)
properties.set_parameter('xIntValue', 11)
table = client['test_db']['mock_table']
actual_df = Query(table).take(5).to_dataframe(properties=properties)
self.assertTrue(expected_df.equals(actual_df))
self.assertEqual('com.microsoft.kusto.spark.synapse.datasource', mock_spark_session.read.recorded_format)
self.assertEqual(
{
'spark.synapse.linkedService': 'MockLinkedKusto',
'clientRequestPropertiesJson': '{"Options": {"deferpartialqueryfailures": false}, "Parameters": {"xIntValue": 11}}',
'kustoDatabase': 'test_db',
'kustoQuery': 'mock_table | take 5',
},
mock_spark_session.read.recorded_options,
)
def test_linked_service_with_extra_options(self):
rows = (['foo', 10], ['bar', 20], ['baz', 30])
columns = ('stringField', 'numField')

Просмотреть файл

@ -1,4 +1,5 @@
# noinspection PyProtectedMember
from pykusto import ClientRequestProperties
from pykusto._src.expressions import _to_kql
# noinspection PyProtectedMember
from pykusto._src.kql_converters import KQL
@ -85,3 +86,19 @@ class TestUtils(TestBase):
TypeError("Test annotation: type already registered: string"),
lambda: test_annotation(_KustoType.STRING)(str_annotated_2)
)
def test_request_properties(self):
properties = ClientRequestProperties()
properties.set_option(ClientRequestProperties.results_defer_partial_query_failures_option_name, False)
self.assertTrue(properties.has_option(ClientRequestProperties.results_defer_partial_query_failures_option_name))
self.assertEqual(properties.get_option(ClientRequestProperties.results_defer_partial_query_failures_option_name, None), False)
properties.set_parameter('xIntValue', 11)
self.assertTrue(properties.has_parameter('xIntValue'))
self.assertEqual(properties.get_parameter('xIntValue', None), 11)
self.assertRaises(
ValueError("Value should not be empty"),
lambda: properties.set_option(' ', True)
)