This commit is contained in:
Amos Rimon 2020-07-01 14:30:54 +03:00
Родитель ca6a0b67ae
Коммит eeb19c53a1
12 изменённых файлов: 776 добавлений и 774 удалений

Просмотреть файл

@ -14,8 +14,8 @@ from azure.kusto.data.response import KustoResponseDataSet
# noinspection PyProtectedMember
from azure.kusto.data.security import _get_azure_cli_auth_token
from pykusto.expressions import BaseColumn, AnyTypeColumn
from pykusto.item_fetcher import ItemFetcher
from pykusto.expressions import BaseColumn, _AnyTypeColumn
from pykusto.item_fetcher import _ItemFetcher
from pykusto.kql_converters import KQL
from pykusto.logger import logger
from pykusto.type_utils import INTERNAL_NAME_TO_TYPE, typed_column, DOT_NAME_TO_TYPE
@ -46,7 +46,7 @@ class KustoResponse:
return dataframe_from_result_table(self.__response.primary_results[0])
class PyKustoClient(ItemFetcher):
class PyKustoClient(_ItemFetcher):
"""
Handle to a Kusto cluster.
Uses :class:`ItemFetcher` to fetch and cache the full cluster schema, including all databases, tables, columns and
@ -85,12 +85,12 @@ class PyKustoClient(ItemFetcher):
def to_query_format(self) -> KQL:
return KQL(f'cluster("{self.__cluster_name}")')
def _new_item(self, name: str) -> 'Database':
def _new_item(self, name: str) -> '_Database':
# "fetch_by_default" set to false because often a database generated this way is not represented by an actual
# Kusto database
return Database(self, name, fetch_by_default=False)
return _Database(self, name, fetch_by_default=False)
def get_database(self, name: str) -> 'Database':
def get_database(self, name: str) -> '_Database':
return self[name]
def execute(self, database: str, query: KQL, properties: ClientRequestProperties = None) -> KustoResponse:
@ -108,7 +108,7 @@ class PyKustoClient(ItemFetcher):
def get_databases_names(self) -> Generator[str, None, None]:
yield from self._get_item_names()
def get_databases(self) -> Generator['Database', None, None]:
def get_databases(self) -> Generator['_Database', None, None]:
yield from self._get_items()
def get_cluster_name(self) -> str:
@ -135,7 +135,7 @@ class PyKustoClient(ItemFetcher):
"""
return PyKustoClient._get_client_for_cluster(cluster)
def _internal_get_items(self) -> Dict[str, 'Database']:
def _internal_get_items(self) -> Dict[str, '_Database']:
# Retrieves database names, table names, column names and types for all databases. A database name is required
# by the "execute" method, but is ignored for this query
res: KustoResponse = self.execute(
@ -149,7 +149,7 @@ class PyKustoClient(ItemFetcher):
return {
# Database instances are provided with all table and column data, preventing them from generating more
# queries. However the "fetch_by_default" behavior is passed on to them for future actions.
database_name: Database(
database_name: _Database(
self, database_name,
{table_name: tuple(columns) for table_name, columns in table_to_columns.items()},
fetch_by_default=self._fetch_by_default
@ -158,7 +158,7 @@ class PyKustoClient(ItemFetcher):
}
class Database(ItemFetcher):
class _Database(_ItemFetcher):
"""
Handle to a Kusto database.
Uses :class:`ItemFetcher` to fetch and cache the full database schema, including all tables, columns and their
@ -184,7 +184,7 @@ class Database(ItemFetcher):
# Providing the items to ItemFetcher prevents further queries until the "refresh" method is explicitly
# called
None if tables is None else {
table_name: Table(self, table_name, columns, fetch_by_default=fetch_by_default)
table_name: _Table(self, table_name, columns, fetch_by_default=fetch_by_default)
for table_name, columns in tables.items()
},
fetch_by_default
@ -202,10 +202,10 @@ class Database(ItemFetcher):
def get_name(self) -> str:
return self.__name
def _new_item(self, name: str) -> 'Table':
def _new_item(self, name: str) -> '_Table':
# "fetch_by_default" set to false because often a table generated this way is not represented by an actual
# Kusto table
return Table(self, name, fetch_by_default=False)
return _Table(self, name, fetch_by_default=False)
def execute(self, query: KQL, properties: ClientRequestProperties = None) -> KustoResponse:
return self.__client.execute(self.__name, query, properties)
@ -213,13 +213,13 @@ class Database(ItemFetcher):
def get_table_names(self) -> Generator[str, None, None]:
yield from self._get_item_names()
def get_table(self, *tables: str) -> 'Table':
def get_table(self, *tables: str) -> '_Table':
assert len(tables) > 0
if not Table.static_is_union(*tables):
if not _Table.static_is_union(*tables):
return self[tables[0]]
columns: Optional[Tuple[BaseColumn, ...]] = None
if self._items_fetched():
resolved_tables: Set[Table] = set()
resolved_tables: Set[_Table] = set()
for table_pattern in tables:
if '*' in table_pattern:
resolved_tables.update(table for table in self._get_items() if fnmatch(table.get_name(), table_pattern))
@ -228,10 +228,10 @@ class Database(ItemFetcher):
if len(resolved_tables) == 1:
return next(iter(resolved_tables))
columns = self.__try_to_resolve_union_columns(*resolved_tables)
return Table(self, tables, columns, fetch_by_default=self._fetch_by_default)
return _Table(self, tables, columns, fetch_by_default=self._fetch_by_default)
@staticmethod
def __try_to_resolve_union_columns(*resolved_tables: 'Table') -> Optional[Tuple[BaseColumn, ...]]:
def __try_to_resolve_union_columns(*resolved_tables: '_Table') -> Optional[Tuple[BaseColumn, ...]]:
column_by_name: Dict[str, BaseColumn] = {}
for table in resolved_tables:
for column in table.get_columns():
@ -240,7 +240,7 @@ class Database(ItemFetcher):
return None # Fallback to Kusto query for column name conflict resolution
return tuple(column_by_name.values())
def _internal_get_items(self) -> Dict[str, 'Table']:
def _internal_get_items(self) -> Dict[str, '_Table']:
# Retrieves table names, column names and types for this database only (the database name is added in the
# "execute" method)
res: KustoResponse = self.execute(
@ -253,21 +253,21 @@ class Database(ItemFetcher):
# "fetch_by_default" behavior is
# passed on to them for future actions.
return {
table_name: Table(self, table_name, tuple(columns), fetch_by_default=self._fetch_by_default)
table_name: _Table(self, table_name, tuple(columns), fetch_by_default=self._fetch_by_default)
for table_name, columns in table_to_columns.items()
}
class Table(ItemFetcher):
class _Table(_ItemFetcher):
"""
Handle to a Kusto table.
Uses :class:`ItemFetcher` to fetch and cache the table schema of columns and their types.
"""
__database: Database
__database: _Database
__tables: Tuple[str, ...]
def __init__(
self, database: Database, tables: Union[str, List[str], Tuple[str, ...]],
self, database: _Database, tables: Union[str, List[str], Tuple[str, ...]],
columns: Tuple[BaseColumn, ...] = None, fetch_by_default: bool = True
) -> None:
"""
@ -292,7 +292,7 @@ class Table(ItemFetcher):
return f'{self.__database}.Table({", ".join(self.__tables)})'
def _new_item(self, name: str) -> BaseColumn:
return AnyTypeColumn(name)
return _AnyTypeColumn(name)
def __getattr__(self, name: str) -> BaseColumn:
"""

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -9,7 +9,7 @@ from typing import Union, Dict, Any, Iterable, Callable, Generator
POOL = ThreadPoolExecutor(max_workers=1)
class ItemFetcher(metaclass=ABCMeta):
class _ItemFetcher(metaclass=ABCMeta):
"""
Abstract class that caches a collection of items, fetching them in certain scenarios.
"""

Просмотреть файл

@ -4,24 +4,24 @@ from itertools import chain
from numbers import Number
from typing import NewType, Union, Mapping, List, Tuple
from pykusto.type_utils import kql_converter, KustoType, NUMBER_TYPES
from pykusto.type_utils import kql_converter, _KustoType, NUMBER_TYPES
KQL = NewType('KQL', str)
@kql_converter(KustoType.DATETIME)
@kql_converter(_KustoType.DATETIME)
def datetime_to_kql(dt: datetime) -> KQL:
return KQL(dt.strftime('datetime(%Y-%m-%d %H:%M:%S.%f)'))
@kql_converter(KustoType.TIMESPAN)
@kql_converter(_KustoType.TIMESPAN)
def timedelta_to_kql(td: timedelta) -> KQL:
hours, remainder = divmod(td.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return KQL(f'time({td.days}.{hours}:{minutes}:{seconds}.{td.microseconds})')
@kql_converter(KustoType.ARRAY, KustoType.MAPPING)
@kql_converter(_KustoType.ARRAY, _KustoType.MAPPING)
def dynamic_to_kql(d: Union[Mapping, List, Tuple]) -> KQL:
try:
return KQL(f"dynamic({json.dumps(d)})")
@ -42,12 +42,12 @@ def build_dynamic(d: Union[Mapping, List, Tuple]) -> KQL:
return to_kql(d)
@kql_converter(KustoType.BOOL)
@kql_converter(_KustoType.BOOL)
def bool_to_kql(b: bool) -> KQL:
return KQL('true') if b else KQL('false')
@kql_converter(KustoType.STRING)
@kql_converter(_KustoType.STRING)
def str_to_kql(s: str) -> KQL:
return KQL(f'"{s}"')

Просмотреть файл

@ -5,27 +5,27 @@ from os import linesep
from types import FunctionType
from typing import Tuple, List, Union, Optional
from pykusto.client import Table, KustoResponse
from pykusto.client import _Table, KustoResponse
from pykusto.enums import Order, Nulls, JoinKind, Distribution, BagExpansion
from pykusto.expressions import BooleanType, ExpressionType, AggregationExpression, OrderedType, \
StringType, AssignmentBase, AssignmentFromAggregationToColumn, AssignmentToSingleColumn, AnyTypeColumn, \
StringType, _AssignmentBase, _AssignmentFromAggregationToColumn, _AssignmentToSingleColumn, _AnyTypeColumn, \
BaseExpression, \
AssignmentFromColumnToColumn, AnyExpression, to_kql, expression_to_type, BaseColumn, NumberType
_AssignmentFromColumnToColumn, AnyExpression, to_kql, expression_to_type, BaseColumn, NumberType
from pykusto.functions import Functions as f
from pykusto.kql_converters import KQL
from pykusto.logger import logger
from pykusto.type_utils import KustoType, typed_column, plain_expression
from pykusto.type_utils import _KustoType, typed_column, plain_expression
from pykusto.udf import stringify_python_func
class Query:
_head: Optional['Query']
_table: Optional[Table]
_table: Optional[_Table]
_table_name: Optional[str]
def __init__(self, head=None) -> None:
self._head = head if isinstance(head, Query) else None
self._table = head if isinstance(head, Table) else None
self._table = head if isinstance(head, _Table) else None
self._table_name = head if isinstance(head, str) else None
def __add__(self, other: 'Query') -> 'Query':
@ -69,152 +69,152 @@ class Query:
if len(filtered_predicates) == 0:
# Do no generate 'where' clause
return self
return WhereQuery(self, *filtered_predicates)
return _WhereQuery(self, *filtered_predicates)
def take(self, num_rows: int) -> 'TakeQuery':
def take(self, num_rows: int) -> '_TakeQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/takeoperator
"""
return TakeQuery(self, num_rows)
return _TakeQuery(self, num_rows)
def limit(self, num_rows: int) -> 'LimitQuery':
def limit(self, num_rows: int) -> '_LimitQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/limitoperator
"""
return LimitQuery(self, num_rows)
return _LimitQuery(self, num_rows)
def sample(self, num_rows: int) -> 'SampleQuery':
def sample(self, num_rows: int) -> '_SampleQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sampleoperator
"""
return SampleQuery(self, num_rows)
return _SampleQuery(self, num_rows)
def count(self) -> 'CountQuery':
def count(self) -> '_CountQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/countoperator
"""
return CountQuery(self)
return _CountQuery(self)
def sort_by(self, col: OrderedType, order: Order = None, nulls: Nulls = None) -> 'SortQuery':
def sort_by(self, col: OrderedType, order: Order = None, nulls: Nulls = None) -> '_SortQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sortoperator
"""
return SortQuery(self, col, order, nulls)
return _SortQuery(self, col, order, nulls)
def order_by(self, col: OrderedType, order: Order = None, nulls: Nulls = None) -> 'SortQuery':
def order_by(self, col: OrderedType, order: Order = None, nulls: Nulls = None) -> '_SortQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/orderoperator
"""
return self.sort_by(col, order, nulls)
def top(self, num_rows: int, col: AnyTypeColumn, order: Order = None, nulls: Nulls = None) -> 'TopQuery':
def top(self, num_rows: int, col: _AnyTypeColumn, order: Order = None, nulls: Nulls = None) -> '_TopQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/topoperator
"""
return TopQuery(self, num_rows, col, order, nulls)
return _TopQuery(self, num_rows, col, order, nulls)
def join(self, query: 'Query', kind: JoinKind = None) -> 'JoinQuery':
def join(self, query: 'Query', kind: JoinKind = None) -> '_JoinQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/joinoperator
"""
return JoinQuery(self, query, kind)
return _JoinQuery(self, query, kind)
def project(self, *args: Union[AssignmentBase, BaseExpression], **kwargs: ExpressionType) -> 'ProjectQuery':
def project(self, *args: Union[_AssignmentBase, BaseExpression], **kwargs: ExpressionType) -> '_ProjectQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/projectoperator
"""
return ProjectQuery(self, self._extract_assignments(*args, **kwargs))
return _ProjectQuery(self, self._extract_assignments(*args, **kwargs))
def project_rename(self, *args: AssignmentFromColumnToColumn, **kwargs: AnyTypeColumn) -> 'ProjectRenameQuery':
def project_rename(self, *args: _AssignmentFromColumnToColumn, **kwargs: _AnyTypeColumn) -> '_ProjectRenameQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/projectrenameoperator
"""
assignments: List[AssignmentFromColumnToColumn] = list(args)
assignments: List[_AssignmentFromColumnToColumn] = list(args)
for column_name, column in kwargs.items():
assignments.append(AssignmentFromColumnToColumn(AnyTypeColumn(column_name), column))
return ProjectRenameQuery(self, assignments)
assignments.append(_AssignmentFromColumnToColumn(_AnyTypeColumn(column_name), column))
return _ProjectRenameQuery(self, assignments)
def project_away(self, *columns: StringType) -> 'ProjectAwayQuery':
def project_away(self, *columns: StringType) -> '_ProjectAwayQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/projectawayoperator
"""
return ProjectAwayQuery(self, columns)
return _ProjectAwayQuery(self, columns)
def distinct(self, *columns: BaseColumn) -> 'DistinctQuery':
def distinct(self, *columns: BaseColumn) -> '_DistinctQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/distinctoperator
"""
return DistinctQuery(self, columns)
return _DistinctQuery(self, columns)
def distinct_all(self) -> 'DistinctQuery':
def distinct_all(self) -> '_DistinctQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/distinctoperator
"""
return DistinctQuery(self, (AnyTypeColumn(KQL("*")),))
return _DistinctQuery(self, (_AnyTypeColumn(KQL("*")),))
def extend(self, *args: Union[BaseExpression, AssignmentBase], **kwargs: ExpressionType) -> 'ExtendQuery':
def extend(self, *args: Union[BaseExpression, _AssignmentBase], **kwargs: ExpressionType) -> '_ExtendQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/extendoperator
"""
return ExtendQuery(self, *self._extract_assignments(*args, **kwargs))
return _ExtendQuery(self, *self._extract_assignments(*args, **kwargs))
def summarize(self, *args: Union[AggregationExpression, AssignmentFromAggregationToColumn],
**kwargs: AggregationExpression) -> 'SummarizeQuery':
def summarize(self, *args: Union[AggregationExpression, _AssignmentFromAggregationToColumn],
**kwargs: AggregationExpression) -> '_SummarizeQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/summarizeoperator
"""
assignments: List[AssignmentFromAggregationToColumn] = []
assignments: List[_AssignmentFromAggregationToColumn] = []
for arg in args:
if isinstance(arg, AggregationExpression):
assignments.append(arg.assign_to())
else:
assert isinstance(arg, AssignmentFromAggregationToColumn), "Invalid assignment"
assert isinstance(arg, _AssignmentFromAggregationToColumn), "Invalid assignment"
assignments.append(arg)
for column_name, agg in kwargs.items():
assignments.append(AssignmentFromAggregationToColumn(AnyTypeColumn(column_name), agg))
return SummarizeQuery(self, assignments)
assignments.append(_AssignmentFromAggregationToColumn(_AnyTypeColumn(column_name), agg))
return _SummarizeQuery(self, assignments)
def mv_expand(
self, *args: Union[BaseExpression, AssignmentBase], bag_expansion: BagExpansion = None,
self, *args: Union[BaseExpression, _AssignmentBase], bag_expansion: BagExpansion = None,
with_item_index: BaseColumn = None, limit: int = None, **kwargs: ExpressionType
) -> 'MvExpandQuery':
) -> '_MvExpandQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/mvexpandoperator
"""
assignments = self._extract_assignments(*args, **kwargs)
if len(assignments) == 0:
raise ValueError("Please specify one or more columns for mv-expand")
return MvExpandQuery(self, bag_expansion, with_item_index, limit, *assignments)
return _MvExpandQuery(self, bag_expansion, with_item_index, limit, *assignments)
def custom(self, custom_query: str) -> 'CustomQuery':
return CustomQuery(self, custom_query)
def custom(self, custom_query: str) -> '_CustomQuery':
return _CustomQuery(self, custom_query)
def evaluate(self, plugin_name, *args: ExpressionType, distribution: Distribution = None) -> 'EvaluateQuery':
def evaluate(self, plugin_name, *args: ExpressionType, distribution: Distribution = None) -> '_EvaluateQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/evaluateoperator
"""
return EvaluateQuery(self, plugin_name, *args, distribution=distribution)
return _EvaluateQuery(self, plugin_name, *args, distribution=distribution)
def evaluate_udf(
self, udf: FunctionType, extend: bool = True, distribution: Distribution = None, **type_specs: KustoType
) -> 'EvaluateQuery':
self, udf: FunctionType, extend: bool = True, distribution: Distribution = None, **type_specs: _KustoType
) -> '_EvaluateQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/pythonplugin
"""
return EvaluateQuery(
return _EvaluateQuery(
self, 'python',
AnyExpression(KQL(f'typeof({("*, " if extend else "") + ", ".join(field_name + ":" + kusto_type.primary_name for field_name, kusto_type in type_specs.items())})')),
stringify_python_func(udf),
distribution=distribution
)
def bag_unpack(self, col: AnyTypeColumn, prefix: str = None) -> 'EvaluateQuery':
def bag_unpack(self, col: _AnyTypeColumn, prefix: str = None) -> '_EvaluateQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/bag-unpackplugin
"""
if prefix is None:
return EvaluateQuery(self, 'bag_unpack', col)
return EvaluateQuery(self, 'bag_unpack', col, prefix)
return _EvaluateQuery(self, 'bag_unpack', col)
return _EvaluateQuery(self, 'bag_unpack', col, prefix)
@abstractmethod
def _compile(self) -> KQL:
@ -236,7 +236,7 @@ class Query:
else:
return KQL(f"{self._head._compile_all(use_full_table_name)} | {self._compile()}")
def get_table(self) -> Table:
def get_table(self) -> _Table:
if self._head is None:
return self._table
else:
@ -259,7 +259,7 @@ class Query:
kql = KQL(kql.replace(" |", linesep + "|"))
return kql
def execute(self, table: Table = None) -> KustoResponse:
def execute(self, table: _Table = None) -> KustoResponse:
if self.get_table() is None:
if table is None:
raise RuntimeError("No table supplied")
@ -273,19 +273,19 @@ class Query:
logger.debug("Running query: " + rendered_query)
return table.execute(rendered_query)
def to_dataframe(self, table: Table = None):
def to_dataframe(self, table: _Table = None):
return self.execute(table).to_dataframe()
@staticmethod
def _extract_assignments(*args: Union[AssignmentBase, BaseExpression], **kwargs: ExpressionType) -> List[AssignmentBase]:
assignments: List[AssignmentBase] = []
def _extract_assignments(*args: Union[_AssignmentBase, BaseExpression], **kwargs: ExpressionType) -> List[_AssignmentBase]:
assignments: List[_AssignmentBase] = []
for arg in args:
if isinstance(arg, BaseExpression):
assignments.append(arg.assign_to())
else:
assignments.append(arg)
for column_name, expression in kwargs.items():
column_type = expression_to_type(expression, typed_column, AnyTypeColumn)
column_type = expression_to_type(expression, typed_column, _AnyTypeColumn)
if isinstance(expression, BaseExpression):
assignments.append(expression.assign_to(column_type(column_name)))
else:
@ -294,11 +294,11 @@ class Query:
return assignments
class ProjectQuery(Query):
_columns: List[AnyTypeColumn]
_assignments: List[AssignmentBase]
class _ProjectQuery(Query):
_columns: List[_AnyTypeColumn]
_assignments: List[_AssignmentBase]
def __init__(self, head: 'Query', assignments: List[AssignmentBase]) -> None:
def __init__(self, head: 'Query', assignments: List[_AssignmentBase]) -> None:
super().__init__(head)
self._assignments = assignments
@ -306,10 +306,10 @@ class ProjectQuery(Query):
return KQL(f"project {', '.join(a.to_kql() for a in self._assignments)}")
class ProjectRenameQuery(Query):
_assignments: List[AssignmentBase]
class _ProjectRenameQuery(Query):
_assignments: List[_AssignmentBase]
def __init__(self, head: 'Query', assignments: List[AssignmentFromColumnToColumn]) -> None:
def __init__(self, head: 'Query', assignments: List[_AssignmentFromColumnToColumn]) -> None:
super().__init__(head)
self._assignments = assignments
@ -317,7 +317,7 @@ class ProjectRenameQuery(Query):
return KQL(f"project-rename {', '.join(a.to_kql() for a in self._assignments)}")
class ProjectAwayQuery(Query):
class _ProjectAwayQuery(Query):
_columns: Tuple[StringType, ...]
def __init__(self, head: 'Query', columns: Tuple[StringType]) -> None:
@ -328,32 +328,32 @@ class ProjectAwayQuery(Query):
return KQL(f"project-away {', '.join(str(c) for c in self._columns)}")
class DistinctQuery(Query):
class _DistinctQuery(Query):
_columns: Tuple[BaseColumn, ...]
def __init__(self, head: 'Query', columns: Tuple[BaseColumn]) -> None:
super().__init__(head)
self._columns = columns
def sample(self, number_of_values: NumberType) -> 'SampleDistinctQuery':
def sample(self, number_of_values: NumberType) -> '_SampleDistinctQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sampledistinctoperator
"""
assert len(self._columns) == 1, "sample-distinct supports only one column"
return SampleDistinctQuery(self._head, self._columns[0], number_of_values)
return _SampleDistinctQuery(self._head, self._columns[0], number_of_values)
def top_hitters(self, number_of_values: NumberType) -> 'TopHittersQuery':
def top_hitters(self, number_of_values: NumberType) -> '_TopHittersQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/tophittersoperator
"""
assert len(self._columns) == 1, "top-hitters supports only one column"
return TopHittersQuery(self._head, self._columns[0], number_of_values)
return _TopHittersQuery(self._head, self._columns[0], number_of_values)
def _compile(self) -> KQL:
return KQL(f"distinct {', '.join(c.kql for c in self._columns)}")
class SampleDistinctQuery(Query):
class _SampleDistinctQuery(Query):
_number_of_values: NumberType
_column: BaseColumn
@ -366,7 +366,7 @@ class SampleDistinctQuery(Query):
return KQL(f"sample-distinct {to_kql(self._number_of_values)} of {self._column.kql}")
class TopHittersQuery(Query):
class _TopHittersQuery(Query):
_number_of_values: NumberType
_column: BaseColumn
_by_expression: Optional[NumberType]
@ -377,18 +377,18 @@ class TopHittersQuery(Query):
self._number_of_values = number_of_values
self._by_expression = by_expression
def by(self, by_expression: NumberType) -> 'TopHittersQuery':
def by(self, by_expression: NumberType) -> '_TopHittersQuery':
assert self._by_expression is None, "duplicate 'by' clause"
return TopHittersQuery(self._head, self._column, self._number_of_values, by_expression)
return _TopHittersQuery(self._head, self._column, self._number_of_values, by_expression)
def _compile(self) -> KQL:
return KQL(f"top-hitters {to_kql(self._number_of_values)} of {self._column.kql}{'' if self._by_expression is None else f' by {to_kql(self._by_expression)}'}")
class ExtendQuery(Query):
_assignments: Tuple[AssignmentBase, ...]
class _ExtendQuery(Query):
_assignments: Tuple[_AssignmentBase, ...]
def __init__(self, head: 'Query', *assignments: AssignmentBase) -> None:
def __init__(self, head: 'Query', *assignments: _AssignmentBase) -> None:
super().__init__(head)
self._assignments = assignments
@ -396,11 +396,11 @@ class ExtendQuery(Query):
return KQL(f"extend {', '.join(a.to_kql() for a in self._assignments)}")
class WhereQuery(Query):
class _WhereQuery(Query):
_predicates: Tuple[BooleanType, ...]
def __init__(self, head: Query, *predicates: BooleanType):
super(WhereQuery, self).__init__(head)
super(_WhereQuery, self).__init__(head)
self._predicates = predicates
def _compile(self) -> KQL:
@ -422,32 +422,32 @@ class _SingleNumberQuery(Query):
return KQL(f'{self._query_name} {self._num_rows}')
class TakeQuery(_SingleNumberQuery):
class _TakeQuery(_SingleNumberQuery):
_num_rows: int
def __init__(self, head: Query, num_rows: int):
super(TakeQuery, self).__init__(head, 'take', num_rows)
super(_TakeQuery, self).__init__(head, 'take', num_rows)
class LimitQuery(_SingleNumberQuery):
class _LimitQuery(_SingleNumberQuery):
_num_rows: int
def __init__(self, head: Query, num_rows: int):
super(LimitQuery, self).__init__(head, 'limit', num_rows)
super(_LimitQuery, self).__init__(head, 'limit', num_rows)
class SampleQuery(_SingleNumberQuery):
class _SampleQuery(_SingleNumberQuery):
_num_rows: int
def __init__(self, head: Query, num_rows: int):
super(SampleQuery, self).__init__(head, 'sample', num_rows)
super(_SampleQuery, self).__init__(head, 'sample', num_rows)
class CountQuery(Query):
class _CountQuery(Query):
_num_rows: int
def __init__(self, head: Query):
super(CountQuery, self).__init__(head)
super(_CountQuery, self).__init__(head)
def _compile(self) -> KQL:
return KQL('count')
@ -490,47 +490,47 @@ class _OrderQueryBase(Query):
return KQL(f'{self._query_name} by {", ".join([self._compile_order_spec(order_spec) for order_spec in self._order_specs])}')
class SortQuery(_OrderQueryBase):
class _SortQuery(_OrderQueryBase):
def __init__(self, head: Query, col: OrderedType, order: Order, nulls: Nulls):
super(SortQuery, self).__init__(head, "sort", col, order, nulls)
super(_SortQuery, self).__init__(head, "sort", col, order, nulls)
class TopQuery(Query):
class _TopQuery(Query):
_num_rows: int
_order_spec: _OrderQueryBase.OrderSpec
def __init__(self, head: Query, num_rows: int, col: AnyTypeColumn, order: Order, nulls: Nulls):
super(TopQuery, self).__init__(head)
def __init__(self, head: Query, num_rows: int, col: _AnyTypeColumn, order: Order, nulls: Nulls):
super(_TopQuery, self).__init__(head)
self._num_rows = num_rows
self._order_spec = _OrderQueryBase.OrderSpec(col, order, nulls)
def _compile(self) -> KQL:
# noinspection PyProtectedMember
return KQL(f'top {self._num_rows} by {SortQuery._compile_order_spec(self._order_spec)}')
return KQL(f'top {self._num_rows} by {_SortQuery._compile_order_spec(self._order_spec)}')
class JoinException(Exception):
pass
class JoinQuery(Query):
class _JoinQuery(Query):
_joined_query: Query
_kind: JoinKind
_on_attributes: Tuple[Tuple[AnyTypeColumn, ...], ...]
_on_attributes: Tuple[Tuple[_AnyTypeColumn, ...], ...]
def __init__(self, head: Query, joined_query: Query, kind: JoinKind,
on_attributes: Tuple[Tuple[AnyTypeColumn, ...], ...] = tuple()):
super(JoinQuery, self).__init__(head)
on_attributes: Tuple[Tuple[_AnyTypeColumn, ...], ...] = tuple()):
super(_JoinQuery, self).__init__(head)
self._joined_query = joined_query
self._kind = kind
self._on_attributes = on_attributes
def on(self, col1: AnyTypeColumn, col2: AnyTypeColumn = None) -> 'JoinQuery':
def on(self, col1: _AnyTypeColumn, col2: _AnyTypeColumn = None) -> '_JoinQuery':
self._on_attributes = self._on_attributes + (((col1,),) if col2 is None else ((col1, col2),))
return self
@staticmethod
def _compile_on_attribute(attribute: Tuple[AnyTypeColumn]):
def _compile_on_attribute(attribute: Tuple[_AnyTypeColumn]):
assert len(attribute) in (1, 2)
if len(attribute) == 1:
return attribute[0].kql
@ -548,28 +548,28 @@ class JoinQuery(Query):
f'{", ".join([self._compile_on_attribute(attr) for attr in self._on_attributes])}')
class SummarizeQuery(Query):
_assignments: List[AssignmentFromAggregationToColumn]
_by_columns: List[Union[AnyTypeColumn, BaseExpression]]
_by_assignments: List[AssignmentToSingleColumn]
class _SummarizeQuery(Query):
_assignments: List[_AssignmentFromAggregationToColumn]
_by_columns: List[Union[_AnyTypeColumn, BaseExpression]]
_by_assignments: List[_AssignmentToSingleColumn]
def __init__(self, head: Query,
assignments: List[AssignmentFromAggregationToColumn]):
super(SummarizeQuery, self).__init__(head)
assignments: List[_AssignmentFromAggregationToColumn]):
super(_SummarizeQuery, self).__init__(head)
self._assignments = assignments
self._by_columns = []
self._by_assignments = []
def by(self, *args: Union[AssignmentToSingleColumn, AnyTypeColumn, BaseExpression],
def by(self, *args: Union[_AssignmentToSingleColumn, _AnyTypeColumn, BaseExpression],
**kwargs: BaseExpression):
for arg in args:
if isinstance(arg, AnyTypeColumn) or isinstance(arg, BaseExpression):
if isinstance(arg, _AnyTypeColumn) or isinstance(arg, BaseExpression):
self._by_columns.append(arg)
else:
assert isinstance(arg, AssignmentToSingleColumn), "Invalid assignment"
assert isinstance(arg, _AssignmentToSingleColumn), "Invalid assignment"
self._by_assignments.append(arg)
for column_name, group_exp in kwargs.items():
self._by_assignments.append(AssignmentToSingleColumn(AnyTypeColumn(column_name), group_exp))
self._by_assignments.append(_AssignmentToSingleColumn(_AnyTypeColumn(column_name), group_exp))
return self
def _compile(self) -> KQL:
@ -579,14 +579,14 @@ class SummarizeQuery(Query):
return KQL(result)
class MvExpandQuery(Query):
_assignments: Tuple[AssignmentBase]
class _MvExpandQuery(Query):
_assignments: Tuple[_AssignmentBase]
_bag_expansion: BagExpansion
_with_item_index: BaseColumn
_limit: int
def __init__(self, head: Query, bag_expansion: BagExpansion, with_item_index: BaseColumn, limit: int, *assignments: AssignmentBase):
super(MvExpandQuery, self).__init__(head)
def __init__(self, head: Query, bag_expansion: BagExpansion, with_item_index: BaseColumn, limit: int, *assignments: _AssignmentBase):
super(_MvExpandQuery, self).__init__(head)
self._assignments = assignments
self._bag_expansion = bag_expansion
self._with_item_index = with_item_index
@ -604,18 +604,18 @@ class MvExpandQuery(Query):
return KQL(res)
class CustomQuery(Query):
class _CustomQuery(Query):
_custom_query: str
def __init__(self, head: Query, custom_query: str):
super(CustomQuery, self).__init__(head)
super(_CustomQuery, self).__init__(head)
self._custom_query = custom_query
def _compile(self) -> KQL:
return KQL(self._custom_query)
class EvaluateQuery(Query):
class _EvaluateQuery(Query):
_plugin_name: str
_args: Tuple[ExpressionType]
_distribution: Distribution

Просмотреть файл

@ -5,7 +5,7 @@ from typing import Union, Mapping, Type, Dict, Callable, Tuple, List, Set, Froze
PythonTypes = Union[str, int, float, bool, datetime, Mapping, List, Tuple, timedelta]
class KustoType(Enum):
class _KustoType(Enum):
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/scalar-data-types/
"""
@ -54,21 +54,21 @@ class KustoType(Enum):
return False
INTERNAL_NAME_TO_TYPE: Dict[str, KustoType] = {t.internal_name: t for t in KustoType}
DOT_NAME_TO_TYPE: Dict[str, KustoType] = {t.dot_net_name: t for t in KustoType}
NUMBER_TYPES: FrozenSet[KustoType] = frozenset([
KustoType.INT, KustoType.LONG, KustoType.REAL, KustoType.DECIMAL, KustoType.FLOAT, KustoType.INT16, KustoType.UINT16, KustoType.UINT32, KustoType.UINT64, KustoType.UINT8
INTERNAL_NAME_TO_TYPE: Dict[str, _KustoType] = {t.internal_name: t for t in _KustoType}
DOT_NAME_TO_TYPE: Dict[str, _KustoType] = {t.dot_net_name: t for t in _KustoType}
NUMBER_TYPES: FrozenSet[_KustoType] = frozenset([
_KustoType.INT, _KustoType.LONG, _KustoType.REAL, _KustoType.DECIMAL, _KustoType.FLOAT, _KustoType.INT16, _KustoType.UINT16, _KustoType.UINT32, _KustoType.UINT64, _KustoType.UINT8
])
class TypeRegistrar:
class _TypeRegistrar:
"""
A factory for annotations that are used to create a mapping between Kusto types and python types / functions.
Each annotation must be called with a Kusto type as a parameter. The `for_obj` and `for_type` methods
can then be used to retrieve the python type or function corresponding to a given Kusto type.
"""
name: str
registry: Dict[KustoType, Union[Type, Callable]]
registry: Dict[_KustoType, Union[Type, Callable]]
def __init__(self, name: str) -> None:
"""
@ -80,7 +80,7 @@ class TypeRegistrar:
def __repr__(self) -> str:
return self.name
def __call__(self, *types: KustoType) -> Callable[[Union[Type, Callable]], Union[Type, Callable]]:
def __call__(self, *types: _KustoType) -> Callable[[Union[Type, Callable]], Union[Type, Callable]]:
def inner(wrapped: Union[Type, Callable]) -> Union[Type, Callable]:
for t in types:
previous = self.registry.setdefault(t, wrapped)
@ -115,45 +115,45 @@ class TypeRegistrar:
return registered_callable
raise ValueError(f"{self}: no registered callable for type {t.__name__}")
def inverse(self, target_callable: Union[Type, Callable]) -> Set[KustoType]:
result: Set[KustoType] = set()
def inverse(self, target_callable: Union[Type, Callable]) -> Set[_KustoType]:
result: Set[_KustoType] = set()
for kusto_type, associated_callable in self.registry.items():
if isinstance(target_callable, associated_callable):
result.add(kusto_type)
return result
def get_base_types(self, obj: Union[Type, Callable]) -> Set[KustoType]:
def get_base_types(self, obj: Union[Type, Callable]) -> Set[_KustoType]:
"""
For a given object, return the associated basic type, which is a member of :class:`KustoType`
:param obj: The given object for which the type is resolved
:return: A type which is a member of `KustoType`
"""
for kusto_type in KustoType:
for kusto_type in _KustoType:
if kusto_type.is_type_of(obj):
# The object is already a member of Kusto types
return {kusto_type}
# The object is one of the expression types decorated with a TypeRegistrar, therefore the original types are
base_types: Set[KustoType] = self.inverse(obj)
base_types: Set[_KustoType] = self.inverse(obj)
assert len(base_types) > 0, f"get_base_types called for unsupported type: {type(obj).__name__}"
return base_types
def assert_all_types_covered(self) -> None:
missing = set(t for t in KustoType if len(t.python_types) > 0) - set(self.registry.keys())
missing = set(t for t in _KustoType if len(t.python_types) > 0) - set(self.registry.keys())
assert len(missing) == 0, [t.name for t in missing]
kql_converter = TypeRegistrar("KQL Converter")
typed_column = TypeRegistrar("Column")
plain_expression = TypeRegistrar("Plain expression")
aggregation_expression = TypeRegistrar("Aggregation expression")
kql_converter = _TypeRegistrar("KQL Converter")
typed_column = _TypeRegistrar("Column")
plain_expression = _TypeRegistrar("Plain expression")
aggregation_expression = _TypeRegistrar("Aggregation expression")
def get_base_types(obj: Union[Type, Callable]) -> Set[KustoType]:
def get_base_types(obj: Union[Type, Callable]) -> Set[_KustoType]:
"""
A registrar-agnostic version of TypeRegistrar.get_base_types
"""
for kusto_type in KustoType:
for kusto_type in _KustoType:
if kusto_type.is_type_of(obj):
# The object is already a member of Kusto types
return {kusto_type}

Просмотреть файл

@ -12,20 +12,20 @@ from azure.kusto.data import KustoClient, ClientRequestProperties
from azure.kusto.data._models import KustoResultTable, KustoResultRow
from azure.kusto.data.response import KustoResponseDataSet
from pykusto.client import Table
from pykusto.expressions import NumberColumn, BooleanColumn, ArrayColumn, MappingColumn, StringColumn, DatetimeColumn, TimespanColumn, DynamicColumn
from pykusto.client import _Table
from pykusto.expressions import _NumberColumn, _BooleanColumn, _ArrayColumn, _MappingColumn, _StringColumn, _DatetimeColumn, _TimespanColumn, _DynamicColumn
from pykusto.logger import logger
from pykusto.type_utils import KustoType
from pykusto.type_utils import _KustoType
# Naming this variable "test_table" triggers the following bug: https://github.com/pytest-dev/pytest/issues/7378
# noinspection PyTypeChecker
mock_table = Table(
mock_table = _Table(
None, "mock_table",
(
NumberColumn('numField'), NumberColumn('numField2'), NumberColumn('numField3'), NumberColumn('numField4'), NumberColumn('numField5'), NumberColumn('numField6'),
BooleanColumn('boolField'), ArrayColumn('arrayField'), ArrayColumn('arrayField2'), ArrayColumn('arrayField3'), MappingColumn('mapField'), StringColumn('stringField'),
StringColumn('stringField2'), DatetimeColumn('dateField'), DatetimeColumn('dateField2'), DatetimeColumn('dateField3'), TimespanColumn('timespanField'),
DynamicColumn('dynamicField')
_NumberColumn('numField'), _NumberColumn('numField2'), _NumberColumn('numField3'), _NumberColumn('numField4'), _NumberColumn('numField5'), _NumberColumn('numField6'),
_BooleanColumn('boolField'), _ArrayColumn('arrayField'), _ArrayColumn('arrayField2'), _ArrayColumn('arrayField3'), _MappingColumn('mapField'), _StringColumn('stringField'),
_StringColumn('stringField2'), _DatetimeColumn('dateField'), _DatetimeColumn('dateField2'), _DatetimeColumn('dateField3'), _TimespanColumn('timespanField'),
_DynamicColumn('dynamicField')
)
)
@ -94,18 +94,18 @@ def mock_response(rows: Tuple[Any, ...], columns: Tuple[str, ...] = tuple()) ->
)
def mock_columns_response(columns: List[Tuple[str, KustoType]] = tuple()) -> KustoResponseDataSet:
def mock_columns_response(columns: List[Tuple[str, _KustoType]] = tuple()) -> KustoResponseDataSet:
return mock_response(tuple((c_name, c_type.internal_name) for c_name, c_type in columns), ('ColumnName', 'ColumnType'))
def mock_tables_response(tables: List[Tuple[str, List[Tuple[str, KustoType]]]] = tuple()) -> KustoResponseDataSet:
def mock_tables_response(tables: List[Tuple[str, List[Tuple[str, _KustoType]]]] = tuple()) -> KustoResponseDataSet:
return mock_response(
tuple((t_name, c_name, c_type.dot_net_name) for t_name, columns in tables for c_name, c_type in columns),
('TableName', 'ColumnName', 'ColumnType')
)
def mock_databases_response(databases: List[Tuple[str, List[Tuple[str, List[Tuple[str, KustoType]]]]]] = tuple()) -> KustoResponseDataSet:
def mock_databases_response(databases: List[Tuple[str, List[Tuple[str, List[Tuple[str, _KustoType]]]]]] = tuple()) -> KustoResponseDataSet:
return mock_response(
tuple(
(d_name, t_name, c_name, c_type.dot_net_name)
@ -117,7 +117,7 @@ def mock_databases_response(databases: List[Tuple[str, List[Tuple[str, List[Tupl
)
def mock_getschema_response(columns: List[Tuple[str, KustoType]] = tuple()) -> KustoResponseDataSet:
def mock_getschema_response(columns: List[Tuple[str, _KustoType]] = tuple()) -> KustoResponseDataSet:
return mock_response(tuple((c_name, c_type.dot_net_name) for c_name, c_type in columns), ('ColumnName', 'DataType'))

Просмотреть файл

@ -1,17 +1,17 @@
from concurrent.futures import Future
from threading import Thread, Lock
from pykusto.client import PyKustoClient, Database
from pykusto.expressions import StringColumn, NumberColumn, AnyTypeColumn, BooleanColumn
from pykusto.client import PyKustoClient, _Database
from pykusto.expressions import _StringColumn, _NumberColumn, _AnyTypeColumn, _BooleanColumn
from pykusto.query import Query
from pykusto.type_utils import KustoType
from pykusto.type_utils import _KustoType
from test.test_base import TestBase, MockKustoClient, mock_columns_response, RecordedQuery, mock_tables_response, mock_getschema_response, mock_databases_response
class TestClientFetch(TestBase):
def test_column_fetch(self):
mock_kusto_client = MockKustoClient(
columns_response=mock_columns_response([('foo', KustoType.STRING), ('bar', KustoType.INT)]),
columns_response=mock_columns_response([('foo', _KustoType.STRING), ('bar', _KustoType.INT)]),
record_metadata=True,
)
table = PyKustoClient(mock_kusto_client, fetch_by_default=False)['test_db']['mock_table']
@ -22,12 +22,12 @@ class TestClientFetch(TestBase):
mock_kusto_client.recorded_queries,
)
# Dot notation
self.assertEqual(type(table.foo), StringColumn)
self.assertEqual(type(table.bar), NumberColumn)
self.assertEqual(type(table.foo), _StringColumn)
self.assertEqual(type(table.bar), _NumberColumn)
# Bracket notation
self.assertEqual(type(table['foo']), StringColumn)
self.assertEqual(type(table['bar']), NumberColumn)
self.assertEqual(type(table['baz']), AnyTypeColumn)
self.assertEqual(type(table['foo']), _StringColumn)
self.assertEqual(type(table['bar']), _NumberColumn)
self.assertEqual(type(table['baz']), _AnyTypeColumn)
def test_column_fetch_slow(self):
mock_response_future = Future()
@ -42,9 +42,9 @@ class TestClientFetch(TestBase):
mock_kusto_client = MockKustoClient(upon_execute=upon_execute, record_metadata=True)
table = PyKustoClient(mock_kusto_client, fetch_by_default=False)['test_db']['mock_table']
table.refresh()
self.assertEqual(type(table['foo']), AnyTypeColumn)
self.assertEqual(type(table['bar']), AnyTypeColumn)
self.assertEqual(type(table['baz']), AnyTypeColumn)
self.assertEqual(type(table['foo']), _AnyTypeColumn)
self.assertEqual(type(table['bar']), _AnyTypeColumn)
self.assertEqual(type(table['baz']), _AnyTypeColumn)
# Make sure above lines were called while the fetch query was still waiting
assert not mock_response_future.executed
finally:
@ -104,7 +104,7 @@ class TestClientFetch(TestBase):
def test_table_fetch(self):
mock_kusto_client = MockKustoClient(
tables_response=mock_tables_response([('mock_table', [('foo', KustoType.STRING), ('bar', KustoType.INT)])]),
tables_response=mock_tables_response([('mock_table', [('foo', _KustoType.STRING), ('bar', _KustoType.INT)])]),
record_metadata=True,
)
db = PyKustoClient(mock_kusto_client, fetch_by_default=False)['test_db']
@ -115,11 +115,11 @@ class TestClientFetch(TestBase):
)
table = db.mock_table
# Table columns
self.assertEqual(type(table.foo), StringColumn)
self.assertEqual(type(table.bar), NumberColumn)
self.assertEqual(type(table['baz']), AnyTypeColumn)
self.assertEqual(type(table.foo), _StringColumn)
self.assertEqual(type(table.bar), _NumberColumn)
self.assertEqual(type(table['baz']), _AnyTypeColumn)
# Bracket notation
self.assertEqual(type(db['other_table']['foo']), AnyTypeColumn)
self.assertEqual(type(db['other_table']['foo']), _AnyTypeColumn)
# Dot notation error
self.assertRaises(
AttributeError("PyKustoClient(test_cluster.kusto.windows.net).Database(test_db) has no attribute 'test_table_1'"),
@ -129,8 +129,8 @@ class TestClientFetch(TestBase):
def test_two_tables_fetch(self):
mock_kusto_client = MockKustoClient(
tables_response=mock_tables_response([
('test_table_1', [('foo', KustoType.STRING), ('bar', KustoType.INT)]),
('test_table_2', [('baz', KustoType.BOOL)])
('test_table_1', [('foo', _KustoType.STRING), ('bar', _KustoType.INT)]),
('test_table_2', [('baz', _KustoType.BOOL)])
]),
record_metadata=True,
)
@ -141,29 +141,29 @@ class TestClientFetch(TestBase):
mock_kusto_client.recorded_queries,
)
# Table columns
self.assertEqual(type(db.test_table_1.foo), StringColumn)
self.assertEqual(type(db.test_table_1.bar), NumberColumn)
self.assertEqual(type(db.test_table_2['baz']), BooleanColumn)
self.assertEqual(type(db['other_table']['foo']), AnyTypeColumn)
self.assertEqual(type(db.test_table_1.foo), _StringColumn)
self.assertEqual(type(db.test_table_1.bar), _NumberColumn)
self.assertEqual(type(db.test_table_2['baz']), _BooleanColumn)
self.assertEqual(type(db['other_table']['foo']), _AnyTypeColumn)
# Union
table = db.get_table('test_table_1', 'test_table_2')
self.assertEqual(type(table.foo), StringColumn)
self.assertEqual(type(table.bar), NumberColumn)
self.assertEqual(type(table.baz), BooleanColumn)
self.assertEqual(type(table.foo), _StringColumn)
self.assertEqual(type(table.bar), _NumberColumn)
self.assertEqual(type(table.baz), _BooleanColumn)
# Wildcard
table = db.get_table('test_table_*')
self.assertEqual(type(table.foo), StringColumn)
self.assertEqual(type(table.bar), NumberColumn)
self.assertEqual(type(table.baz), BooleanColumn)
self.assertEqual(type(table.foo), _StringColumn)
self.assertEqual(type(table.bar), _NumberColumn)
self.assertEqual(type(table.baz), _BooleanColumn)
def test_union_column_name_conflict(self):
mock_kusto_client = MockKustoClient(
tables_response=mock_tables_response([
('test_table_1', [('foo', KustoType.STRING), ('bar', KustoType.INT)]),
('test_table_2', [('foo', KustoType.BOOL)])
('test_table_1', [('foo', _KustoType.STRING), ('bar', _KustoType.INT)]),
('test_table_2', [('foo', _KustoType.BOOL)])
]),
getschema_response=mock_getschema_response([
('foo_string', KustoType.STRING), ('bar', KustoType.INT), ('foo_bool', KustoType.BOOL)
('foo_string', _KustoType.STRING), ('bar', _KustoType.INT), ('foo_bool', _KustoType.BOOL)
]),
record_metadata=True,
)
@ -180,15 +180,15 @@ class TestClientFetch(TestBase):
],
mock_kusto_client.recorded_queries,
)
self.assertEqual(type(table.foo_string), StringColumn)
self.assertEqual(type(table.bar), NumberColumn)
self.assertEqual(type(table.foo_bool), BooleanColumn)
self.assertEqual(type(table.foo_string), _StringColumn)
self.assertEqual(type(table.bar), _NumberColumn)
self.assertEqual(type(table.foo_bool), _BooleanColumn)
def test_union_wildcard_one_table(self):
mock_kusto_client = MockKustoClient(
tables_response=mock_tables_response([
('test_table_1', [('foo', KustoType.STRING), ('bar', KustoType.INT)]),
('other_table_2', [('baz', KustoType.BOOL)])
('test_table_1', [('foo', _KustoType.STRING), ('bar', _KustoType.INT)]),
('other_table_2', [('baz', _KustoType.BOOL)])
]),
record_metadata=True,
)
@ -199,13 +199,13 @@ class TestClientFetch(TestBase):
mock_kusto_client.recorded_queries,
)
table = db.get_table('test_table_*')
self.assertEqual(type(table.foo), StringColumn)
self.assertEqual(type(table.bar), NumberColumn)
self.assertEqual(type(table['baz']), AnyTypeColumn)
self.assertEqual(type(table.foo), _StringColumn)
self.assertEqual(type(table.bar), _NumberColumn)
self.assertEqual(type(table['baz']), _AnyTypeColumn)
def test_database_fetch(self):
mock_kusto_client = MockKustoClient(
databases_response=mock_databases_response([('test_db', [('mock_table', [('foo', KustoType.STRING), ('bar', KustoType.INT)])])]),
databases_response=mock_databases_response([('test_db', [('mock_table', [('foo', _KustoType.STRING), ('bar', _KustoType.INT)])])]),
record_metadata=True,
)
client = PyKustoClient(mock_kusto_client)
@ -216,13 +216,13 @@ class TestClientFetch(TestBase):
)
# Table columns
table = client.test_db.mock_table
self.assertEqual(type(table.foo), StringColumn)
self.assertEqual(type(table.bar), NumberColumn)
self.assertEqual(type(table['baz']), AnyTypeColumn)
self.assertEqual(type(client.test_db['other_table']['foo']), AnyTypeColumn)
self.assertEqual(type(table.foo), _StringColumn)
self.assertEqual(type(table.bar), _NumberColumn)
self.assertEqual(type(table['baz']), _AnyTypeColumn)
self.assertEqual(type(client.test_db['other_table']['foo']), _AnyTypeColumn)
# Various utility methods
db = client.get_database('test_db')
self.assertEqual(type(db), Database)
self.assertEqual(type(db), _Database)
self.assertEqual('test_db', db.get_name())
self.assertEqual(('test_db',), tuple(client.get_databases_names()))
self.assertEqual(('mock_table', 'other_table'), tuple(client.test_db.get_table_names()))
@ -232,16 +232,16 @@ class TestClientFetch(TestBase):
def test_autocomplete_with_dot(self):
mock_kusto_client = MockKustoClient(
databases_response=mock_databases_response([('test_db', [('mock_table', [('foo', KustoType.STRING), ('bar.baz', KustoType.INT)])])]),
databases_response=mock_databases_response([('test_db', [('mock_table', [('foo', _KustoType.STRING), ('bar.baz', _KustoType.INT)])])]),
record_metadata=True,
)
client = PyKustoClient(mock_kusto_client)
client.wait_for_items()
# Table columns
table = client.test_db.mock_table
self.assertEqual(type(table.foo), StringColumn)
self.assertEqual(type(table.bar), AnyTypeColumn)
self.assertEqual(type(table['bar.baz']), NumberColumn)
self.assertEqual(type(table.foo), _StringColumn)
self.assertEqual(type(table.bar), _AnyTypeColumn)
self.assertEqual(type(table['bar.baz']), _NumberColumn)
autocomplete_list = set(dir(client.test_db.mock_table))
self.assertIn('foo', autocomplete_list)
self.assertNotIn('bar.baz', autocomplete_list)
@ -249,8 +249,8 @@ class TestClientFetch(TestBase):
def test_empty_database(self):
mock_kusto_client = MockKustoClient(
databases_response=mock_databases_response([
('test_db', [('mock_table', [('foo', KustoType.STRING), ('bar', KustoType.INT)])]),
('', [('test_table1', [('foo1', KustoType.STRING), ('bar1', KustoType.INT)])])
('test_db', [('mock_table', [('foo', _KustoType.STRING), ('bar', _KustoType.INT)])]),
('', [('test_table1', [('foo1', _KustoType.STRING), ('bar1', _KustoType.INT)])])
]),
record_metadata=True,
)
@ -260,7 +260,7 @@ class TestClientFetch(TestBase):
[RecordedQuery('', '.show databases schema | project DatabaseName, TableName, ColumnName, ColumnType | limit 100000')],
mock_kusto_client.recorded_queries,
)
self.assertEqual(type(client.test_db.mock_table.foo), StringColumn)
self.assertEqual(type(client.test_db.mock_table.foo), _StringColumn)
def test_client_not_fetched(self):
client = PyKustoClient(MockKustoClient(), fetch_by_default=False)

Просмотреть файл

@ -1,6 +1,6 @@
from datetime import timedelta, datetime
from pykusto.expressions import column_generator as col, AnyTypeColumn
from pykusto.expressions import column_generator as col, _AnyTypeColumn
from pykusto.functions import Functions as f
from pykusto.query import Query
from test.test_base import TestBase, mock_table as t
@ -53,7 +53,7 @@ class TestExpressions(TestBase):
def test_repr(self):
self.assertEqual(
'StringColumn(stringField)',
'_StringColumn(stringField)',
repr(t.stringField)
)
self.assertEqual(
@ -412,8 +412,8 @@ class TestExpressions(TestBase):
def test_column_generator(self):
field1 = col.foo
field2 = col['foo.bar']
self.assertIsInstance(field1, AnyTypeColumn)
self.assertIsInstance(field2, AnyTypeColumn)
self.assertIsInstance(field1, _AnyTypeColumn)
self.assertIsInstance(field2, _AnyTypeColumn)
self.assertEqual('foo', field1.get_name())
self.assertEqual('foo.bar', field2.get_name())

Просмотреть файл

@ -7,7 +7,7 @@ from pykusto.enums import Order, Nulls, JoinKind, Distribution, BagExpansion
from pykusto.expressions import column_generator as col
from pykusto.functions import Functions as f
from pykusto.query import Query, JoinException
from pykusto.type_utils import KustoType
from pykusto.type_utils import _KustoType
from test.test_base import TestBase, mock_databases_response, MockKustoClient, mock_response
from test.test_base import mock_table as t, mock_columns_response
from test.udf import func, STRINGIFIED
@ -47,7 +47,7 @@ class TestQuery(TestBase):
)
def test_add_queries_with_table(self):
table = PyKustoClient(MockKustoClient(columns_response=mock_columns_response([('numField', KustoType.INT)])))['test_db']['mock_table']
table = PyKustoClient(MockKustoClient(columns_response=mock_columns_response([('numField', _KustoType.INT)])))['test_db']['mock_table']
query_a = Query(table).where(table.numField > 4)
query_b = Query(t).take(5).take(2).sort_by(t.stringField, Order.ASC, Nulls.LAST)
query = query_a + query_b
@ -196,7 +196,7 @@ class TestQuery(TestBase):
)
def test_join_with_table(self):
table = PyKustoClient(MockKustoClient(columns_response=mock_columns_response([('tableStringField', KustoType.STRING), ('numField', KustoType.INT)])))['test_db'][
table = PyKustoClient(MockKustoClient(columns_response=mock_columns_response([('tableStringField', _KustoType.STRING), ('numField', _KustoType.INT)])))['test_db'][
'mock_table']
self.assertEqual(
@ -209,7 +209,7 @@ class TestQuery(TestBase):
def test_join_with_table_and_query(self):
table = PyKustoClient(MockKustoClient(columns_response=mock_columns_response([
('tableStringField', KustoType.STRING), ('numField', KustoType.INT)
('tableStringField', _KustoType.STRING), ('numField', _KustoType.INT)
])))['test_db']['mock_table']
self.assertEqual(
@ -316,7 +316,7 @@ class TestQuery(TestBase):
def test_mv_expand_to_type(self):
self.assertEqual(
"mock_table | mv-expand arrayField to typeof(string), arrayField2 to typeof(int), arrayField3",
Query(t).mv_expand(f.to_type(t.arrayField, KustoType.STRING), f.to_type(t.arrayField2, KustoType.INT), t.arrayField3).render(),
Query(t).mv_expand(f.to_type(t.arrayField, _KustoType.STRING), f.to_type(t.arrayField2, _KustoType.INT), t.arrayField3).render(),
)
def test_mv_expand_args(self):
@ -442,13 +442,13 @@ class TestQuery(TestBase):
def test_udf(self):
self.assertEqual(
f"mock_table | evaluate python(typeof(*, StateZone:string), {STRINGIFIED})",
Query(t).evaluate_udf(func, StateZone=KustoType.STRING).render(),
Query(t).evaluate_udf(func, StateZone=_KustoType.STRING).render(),
)
def test_udf_no_extend(self):
self.assertEqual(
f"mock_table | evaluate python(typeof(StateZone:string), {STRINGIFIED})",
Query(t).evaluate_udf(func, extend=False, StateZone=KustoType.STRING).render(),
Query(t).evaluate_udf(func, extend=False, StateZone=_KustoType.STRING).render(),
)
def test_bag_unpack(self):
@ -467,7 +467,7 @@ class TestQuery(TestBase):
rows = (['foo', 10], ['bar', 20], ['baz', 30])
columns = ('stringField', 'numField')
client = PyKustoClient(MockKustoClient(
databases_response=mock_databases_response([('test_db', [('mock_table', [('stringField', KustoType.STRING), ('numField', KustoType.INT)])])]),
databases_response=mock_databases_response([('test_db', [('mock_table', [('stringField', _KustoType.STRING), ('numField', _KustoType.INT)])])]),
main_response=mock_response(rows, columns),
))
client.wait_for_items()

Просмотреть файл

@ -1,6 +1,6 @@
from pykusto.expressions import to_kql
from pykusto.kql_converters import KQL
from pykusto.type_utils import TypeRegistrar, KustoType
from pykusto.type_utils import _TypeRegistrar, _KustoType
from test.test_base import TestBase
@ -19,9 +19,9 @@ class TestUtils(TestBase):
)
def test_type_registrar_for_type(self):
test_annotation = TypeRegistrar("Test annotation")
test_annotation = _TypeRegistrar("Test annotation")
@test_annotation(KustoType.STRING)
@test_annotation(_KustoType.STRING)
def str_annotated(s: str) -> str:
return "response to " + s
@ -32,9 +32,9 @@ class TestUtils(TestBase):
)
def test_type_registrar_for_obj(self):
test_annotation = TypeRegistrar("Test annotation")
test_annotation = _TypeRegistrar("Test annotation")
@test_annotation(KustoType.STRING)
@test_annotation(_KustoType.STRING)
def str_annotated(s: str) -> str:
return "response to " + s
@ -44,9 +44,9 @@ class TestUtils(TestBase):
)
def test_type_registrar_for_type_not_found(self):
test_annotation = TypeRegistrar("Test annotation")
test_annotation = _TypeRegistrar("Test annotation")
@test_annotation(KustoType.STRING)
@test_annotation(_KustoType.STRING)
def str_annotated(s: str) -> str:
return "response to " + s
@ -57,9 +57,9 @@ class TestUtils(TestBase):
)
def test_type_registrar_for_obj_not_found(self):
test_annotation = TypeRegistrar("Test annotation")
test_annotation = _TypeRegistrar("Test annotation")
@test_annotation(KustoType.STRING)
@test_annotation(_KustoType.STRING)
def str_annotated(s: str) -> str:
return "response to " + s
@ -69,9 +69,9 @@ class TestUtils(TestBase):
)
def test_type_registrar_collision(self):
test_annotation = TypeRegistrar("Test annotation")
test_annotation = _TypeRegistrar("Test annotation")
@test_annotation(KustoType.STRING)
@test_annotation(_KustoType.STRING)
def str_annotated_1(s: str) -> KQL:
return KQL("response to " + s)
@ -80,5 +80,5 @@ class TestUtils(TestBase):
self.assertRaises(
TypeError("Test annotation: type already registered: string"),
lambda: test_annotation(KustoType.STRING)(str_annotated_2)
lambda: test_annotation(_KustoType.STRING)(str_annotated_2)
)