зеркало из https://github.com/Azure/pykusto.git
Add functions: all_of, any_of; Allow providing multiple predicates to 'where' (#87)
This commit is contained in:
Родитель
b0d12cd6bf
Коммит
89235ad79c
|
@ -96,6 +96,7 @@
|
|||
<w>urlquery</w>
|
||||
<w>varianceif</w>
|
||||
<w>variancep</w>
|
||||
<w>versioning</w>
|
||||
<w>weekofyear</w>
|
||||
<w>welch</w>
|
||||
</words>
|
||||
|
|
|
@ -209,6 +209,7 @@ class BooleanExpression(BaseExpression):
|
|||
def __invert__(self) -> 'BooleanExpression':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/notfunction
|
||||
Note that using the Python 'not' does not have the desired effect, because unfortunately its behavior cannot be overridden.
|
||||
"""
|
||||
return BooleanExpression(KQL(f'not({self.kql})'))
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ from pykusto.expressions import AnyTypeColumn, NumberType, NumberExpression, Tim
|
|||
DatetimeExpression, TimespanExpression, ArrayType, DynamicType, DatetimeType, BaseExpression, BooleanType, \
|
||||
ExpressionType, StringType, StringExpression, BooleanExpression, \
|
||||
NumberAggregationExpression, MappingAggregationExpression, ArrayAggregationExpression, to_kql, DynamicExpression, \
|
||||
ArrayExpression, ColumnToType, BaseColumn, AnyExpression, AnyAggregationExpression, MappingExpression
|
||||
ArrayExpression, ColumnToType, BaseColumn, AnyExpression, AnyAggregationExpression, MappingExpression, _subexpr_to_kql
|
||||
from pykusto.kql_converters import KQL
|
||||
from pykusto.logger import logger
|
||||
from pykusto.type_utils import plain_expression, KustoType
|
||||
|
@ -109,6 +109,28 @@ class Functions:
|
|||
"""
|
||||
return expr.bin_auto()
|
||||
|
||||
@staticmethod
|
||||
def all_of(*predicates: BooleanType) -> BooleanExpression:
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/logicaloperators
|
||||
"""
|
||||
return BooleanExpression(KQL(' and '.join(_subexpr_to_kql(c) for c in predicates)))
|
||||
|
||||
@staticmethod
|
||||
def any_of(*predicates: BooleanType) -> BooleanExpression:
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/logicaloperators
|
||||
"""
|
||||
return BooleanExpression(KQL(' or '.join(_subexpr_to_kql(c) for c in predicates)))
|
||||
|
||||
@staticmethod
|
||||
def not_of(predicate: BooleanType) -> BooleanExpression:
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/logicaloperators
|
||||
Note that using the Python 'not' does not have the desired effect, because unfortunately its behavior cannot be overridden.
|
||||
"""
|
||||
return BooleanExpression(KQL(f'not({to_kql(predicate)})'))
|
||||
|
||||
# def binary_and(self): return
|
||||
#
|
||||
#
|
||||
|
|
|
@ -24,8 +24,6 @@ KUSTO_KEYWORDS = frozenset([
|
|||
'bag_keys',
|
||||
'barchart',
|
||||
'bin',
|
||||
'bin',
|
||||
'bin',
|
||||
'bin_at',
|
||||
'bin_auto',
|
||||
'card',
|
||||
|
@ -44,6 +42,9 @@ KUSTO_KEYWORDS = frozenset([
|
|||
'endofyear',
|
||||
'endswith',
|
||||
'exp',
|
||||
'false',
|
||||
'FALSE',
|
||||
'False',
|
||||
'floor',
|
||||
'format_datetime',
|
||||
'format_timespan',
|
||||
|
@ -80,6 +81,7 @@ KUSTO_KEYWORDS = frozenset([
|
|||
'none',
|
||||
'not',
|
||||
'now',
|
||||
'null',
|
||||
'or',
|
||||
'pack',
|
||||
'pack_array',
|
||||
|
@ -132,6 +134,9 @@ KUSTO_KEYWORDS = frozenset([
|
|||
'tolower',
|
||||
'tostring',
|
||||
'toupper',
|
||||
'true',
|
||||
'TRUE',
|
||||
'True',
|
||||
'unstacked',
|
||||
'variance',
|
||||
'varianceif',
|
||||
|
|
117
pykusto/query.py
117
pykusto/query.py
|
@ -1,9 +1,9 @@
|
|||
from abc import abstractmethod
|
||||
from copy import copy, deepcopy
|
||||
from itertools import chain
|
||||
from os import linesep
|
||||
from types import FunctionType
|
||||
from typing import Tuple, List, Union, Optional
|
||||
from os import linesep
|
||||
|
||||
from pykusto.client import Table, KustoResponse
|
||||
from pykusto.enums import Order, Nulls, JoinKind, Distribution, BagExpansion
|
||||
|
@ -11,6 +11,7 @@ from pykusto.expressions import BooleanType, ExpressionType, AggregationExpressi
|
|||
StringType, AssignmentBase, AssignmentFromAggregationToColumn, AssignmentToSingleColumn, AnyTypeColumn, \
|
||||
BaseExpression, \
|
||||
AssignmentFromColumnToColumn, AnyExpression, to_kql, expression_to_type, BaseColumn, NumberType
|
||||
from pykusto.functions import Functions as f
|
||||
from pykusto.kql_converters import KQL
|
||||
from pykusto.logger import logger
|
||||
from pykusto.type_utils import KustoType, typed_column, plain_expression
|
||||
|
@ -45,56 +46,123 @@ class Query:
|
|||
new_object._head = self._head.__deepcopy__(memo)
|
||||
return new_object
|
||||
|
||||
def where(self, predicate: BooleanType) -> 'WhereQuery':
|
||||
return WhereQuery(self, predicate)
|
||||
def where(self, *predicates: BooleanType) -> 'Query':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/whereoperator
|
||||
|
||||
Implicitly apply conjunction if multiple predicates are provided. You can use predicates which are calculated at runtime and result in boolean values, which are
|
||||
pre-processed: 'True' values ignored, and 'False' values cause all other predicates to be ignored. If the result of pre-processing is a single 'True' predicate, no 'where'
|
||||
clause will be generated.
|
||||
|
||||
Warning: to apply a logical 'not', do not use the Python 'not' operator, it will simply produce a 'False' boolean value. Use either the `~` operator or `f.not_of()`.
|
||||
"""
|
||||
filtered_predicates = []
|
||||
for predicate in predicates:
|
||||
if predicate is True:
|
||||
# This predicate has no effect on the outcome
|
||||
continue
|
||||
if predicate is False:
|
||||
# All other predicates have no effect on the outcome
|
||||
filtered_predicates = [False]
|
||||
break
|
||||
filtered_predicates.append(predicate)
|
||||
if len(filtered_predicates) == 0:
|
||||
# Do no generate 'where' clause
|
||||
return self
|
||||
return WhereQuery(self, *filtered_predicates)
|
||||
|
||||
def take(self, num_rows: int) -> 'TakeQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/takeoperator
|
||||
"""
|
||||
return TakeQuery(self, num_rows)
|
||||
|
||||
def limit(self, num_rows: int) -> 'LimitQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/limitoperator
|
||||
"""
|
||||
return LimitQuery(self, num_rows)
|
||||
|
||||
def sample(self, num_rows: int) -> 'SampleQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sampleoperator
|
||||
"""
|
||||
return SampleQuery(self, num_rows)
|
||||
|
||||
def count(self) -> 'CountQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/countoperator
|
||||
"""
|
||||
return CountQuery(self)
|
||||
|
||||
def sort_by(self, col: OrderedType, order: Order = None, nulls: Nulls = None) -> 'SortQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sortoperator
|
||||
"""
|
||||
return SortQuery(self, col, order, nulls)
|
||||
|
||||
def order_by(self, col: OrderedType, order: Order = None, nulls: Nulls = None) -> 'OrderQuery':
|
||||
return OrderQuery(self, col, order, nulls)
|
||||
def order_by(self, col: OrderedType, order: Order = None, nulls: Nulls = None) -> 'SortQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/orderoperator
|
||||
"""
|
||||
return self.sort_by(col, order, nulls)
|
||||
|
||||
def top(self, num_rows: int, col: AnyTypeColumn, order: Order = None, nulls: Nulls = None) -> 'TopQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/topoperator
|
||||
"""
|
||||
return TopQuery(self, num_rows, col, order, nulls)
|
||||
|
||||
def join(self, query: 'Query', kind: JoinKind = None) -> 'JoinQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/joinoperator
|
||||
"""
|
||||
return JoinQuery(self, query, kind)
|
||||
|
||||
def project(self, *args: Union[AssignmentBase, BaseExpression], **kwargs: ExpressionType) -> 'ProjectQuery':
|
||||
return ProjectQuery(self, self.extract_assignments(*args, **kwargs))
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/projectoperator
|
||||
"""
|
||||
return ProjectQuery(self, self._extract_assignments(*args, **kwargs))
|
||||
|
||||
def project_rename(self, *args: AssignmentFromColumnToColumn, **kwargs: AnyTypeColumn) -> 'ProjectRenameQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/projectrenameoperator
|
||||
"""
|
||||
assignments: List[AssignmentFromColumnToColumn] = list(args)
|
||||
for column_name, column in kwargs.items():
|
||||
assignments.append(AssignmentFromColumnToColumn(AnyTypeColumn(column_name), column))
|
||||
return ProjectRenameQuery(self, assignments)
|
||||
|
||||
def project_away(self, *columns: StringType) -> 'ProjectAwayQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/projectawayoperator
|
||||
"""
|
||||
return ProjectAwayQuery(self, columns)
|
||||
|
||||
def distinct(self, *columns: BaseColumn) -> 'DistinctQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/distinctoperator
|
||||
"""
|
||||
return DistinctQuery(self, columns)
|
||||
|
||||
def distinct_all(self) -> 'DistinctQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/distinctoperator
|
||||
"""
|
||||
return DistinctQuery(self, (AnyTypeColumn(KQL("*")),))
|
||||
|
||||
def extend(self, *args: Union[BaseExpression, AssignmentBase], **kwargs: ExpressionType) -> 'ExtendQuery':
|
||||
return ExtendQuery(self, *self.extract_assignments(*args, **kwargs))
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/extendoperator
|
||||
"""
|
||||
return ExtendQuery(self, *self._extract_assignments(*args, **kwargs))
|
||||
|
||||
def summarize(self, *args: Union[AggregationExpression, AssignmentFromAggregationToColumn],
|
||||
**kwargs: AggregationExpression) -> 'SummarizeQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/summarizeoperator
|
||||
"""
|
||||
assignments: List[AssignmentFromAggregationToColumn] = []
|
||||
for arg in args:
|
||||
if isinstance(arg, AggregationExpression):
|
||||
|
@ -110,7 +178,10 @@ class Query:
|
|||
self, *args: Union[BaseExpression, AssignmentBase], bag_expansion: BagExpansion = None,
|
||||
with_item_index: BaseColumn = None, limit: int = None, **kwargs: ExpressionType
|
||||
) -> 'MvExpandQuery':
|
||||
assignments = self.extract_assignments(*args, **kwargs)
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/mvexpandoperator
|
||||
"""
|
||||
assignments = self._extract_assignments(*args, **kwargs)
|
||||
if len(assignments) == 0:
|
||||
raise ValueError("Please specify one or more columns for mv-expand")
|
||||
return MvExpandQuery(self, bag_expansion, with_item_index, limit, *assignments)
|
||||
|
@ -119,11 +190,17 @@ class Query:
|
|||
return CustomQuery(self, custom_query)
|
||||
|
||||
def evaluate(self, plugin_name, *args: ExpressionType, distribution: Distribution = None) -> 'EvaluateQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/evaluateoperator
|
||||
"""
|
||||
return EvaluateQuery(self, plugin_name, *args, distribution=distribution)
|
||||
|
||||
def evaluate_udf(
|
||||
self, udf: FunctionType, extend: bool = True, distribution: Distribution = None, **type_specs: KustoType
|
||||
) -> 'EvaluateQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/pythonplugin
|
||||
"""
|
||||
return EvaluateQuery(
|
||||
self, 'python',
|
||||
AnyExpression(KQL(f'typeof({("*, " if extend else "") + ", ".join(field_name + ":" + kusto_type.primary_name for field_name, kusto_type in type_specs.items())})')),
|
||||
|
@ -132,6 +209,9 @@ class Query:
|
|||
)
|
||||
|
||||
def bag_unpack(self, col: AnyTypeColumn, prefix: str = None) -> 'EvaluateQuery':
|
||||
"""
|
||||
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/bag-unpackplugin
|
||||
"""
|
||||
if prefix is None:
|
||||
return EvaluateQuery(self, 'bag_unpack', col)
|
||||
return EvaluateQuery(self, 'bag_unpack', col, prefix)
|
||||
|
@ -197,7 +277,7 @@ class Query:
|
|||
return self.execute(table).to_dataframe()
|
||||
|
||||
@staticmethod
|
||||
def extract_assignments(*args: Union[AssignmentBase, BaseExpression], **kwargs: ExpressionType) -> List[AssignmentBase]:
|
||||
def _extract_assignments(*args: Union[AssignmentBase, BaseExpression], **kwargs: ExpressionType) -> List[AssignmentBase]:
|
||||
assignments: List[AssignmentBase] = []
|
||||
for arg in args:
|
||||
if isinstance(arg, BaseExpression):
|
||||
|
@ -317,14 +397,16 @@ class ExtendQuery(Query):
|
|||
|
||||
|
||||
class WhereQuery(Query):
|
||||
_predicate: BooleanType
|
||||
_predicates: Tuple[BooleanType, ...]
|
||||
|
||||
def __init__(self, head: Query, predicate: BooleanType):
|
||||
def __init__(self, head: Query, *predicates: BooleanType):
|
||||
super(WhereQuery, self).__init__(head)
|
||||
self._predicate = predicate
|
||||
self._predicates = predicates
|
||||
|
||||
def _compile(self) -> KQL:
|
||||
return KQL(f'where {self._predicate.kql}')
|
||||
if len(self._predicates) == 1:
|
||||
return KQL(f'where {to_kql(self._predicates[0])}')
|
||||
return KQL(f'where {f.all_of(*self._predicates)}')
|
||||
|
||||
|
||||
class _SingleNumberQuery(Query):
|
||||
|
@ -413,19 +495,14 @@ class SortQuery(_OrderQueryBase):
|
|||
super(SortQuery, self).__init__(head, "sort", col, order, nulls)
|
||||
|
||||
|
||||
class OrderQuery(_OrderQueryBase):
|
||||
def __init__(self, head: Query, col: OrderedType, order: Order, nulls: Nulls):
|
||||
super(OrderQuery, self).__init__(head, "order", col, order, nulls)
|
||||
|
||||
|
||||
class TopQuery(Query):
|
||||
_num_rows: int
|
||||
_order_spec: OrderQuery.OrderSpec
|
||||
_order_spec: _OrderQueryBase.OrderSpec
|
||||
|
||||
def __init__(self, head: Query, num_rows: int, col: AnyTypeColumn, order: Order, nulls: Nulls):
|
||||
super(TopQuery, self).__init__(head)
|
||||
self._num_rows = num_rows
|
||||
self._order_spec = OrderQuery.OrderSpec(col, order, nulls)
|
||||
self._order_spec = _OrderQueryBase.OrderSpec(col, order, nulls)
|
||||
|
||||
def _compile(self) -> KQL:
|
||||
# noinspection PyProtectedMember
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from concurrent.futures import Future
|
||||
from threading import Thread
|
||||
from threading import Thread, Lock
|
||||
|
||||
from pykusto.client import PyKustoClient, Database
|
||||
from pykusto.expressions import StringColumn, NumberColumn, AnyTypeColumn, BooleanColumn
|
||||
|
@ -60,10 +60,16 @@ class TestClientFetch(TestBase):
|
|||
mock_response_future.returned_queries = []
|
||||
mock_response_future.called = False
|
||||
mock_response_future.executed = False
|
||||
future_called_lock = Lock()
|
||||
|
||||
def upon_execute(query):
|
||||
if not mock_response_future.called:
|
||||
mock_response_future.called = True
|
||||
with future_called_lock:
|
||||
if mock_response_future.called:
|
||||
first_run = False
|
||||
else:
|
||||
mock_response_future.called = True
|
||||
first_run = True
|
||||
if first_run:
|
||||
mock_response_future.result()
|
||||
mock_response_future.executed = True
|
||||
mock_response_future.returned_queries.append(query)
|
||||
|
|
|
@ -923,3 +923,15 @@ class TestFunction(TestBase):
|
|||
' | extend ingestionTime = ingestion_time()',
|
||||
Query().extend(ingestionTime=f.ingestion_time()).render()
|
||||
)
|
||||
|
||||
def test_all_of(self):
|
||||
self.assertEqual(
|
||||
' | where boolField and (numField > numField2) and (stringField contains "hello")',
|
||||
Query().where(f.all_of(t.boolField, t.numField > t.numField2, t.stringField.contains('hello'))).render()
|
||||
)
|
||||
|
||||
def test_any_of(self):
|
||||
self.assertEqual(
|
||||
' | where boolField or (numField > numField2) or (stringField contains "hello")',
|
||||
Query().where(f.any_of(t.boolField, t.numField > t.numField2, t.stringField.contains('hello'))).render()
|
||||
)
|
||||
|
|
|
@ -113,6 +113,42 @@ class TestQuery(TestBase):
|
|||
Query(t).where(t.numField > 4).render(),
|
||||
)
|
||||
|
||||
def test_where_multiple_predicates(self):
|
||||
self.assertEqual(
|
||||
'test_table | where boolField and (numField > numField2) and (stringField contains "hello")',
|
||||
Query(t).where(t.boolField, t.numField > t.numField2, t.stringField.contains('hello')).render(),
|
||||
)
|
||||
|
||||
def test_where_no_predicates(self):
|
||||
self.assertEqual(
|
||||
'test_table | project numField',
|
||||
Query(t).where().project(t.numField).render(),
|
||||
)
|
||||
|
||||
def test_where_true_predicate(self):
|
||||
self.assertEqual(
|
||||
'test_table | where boolField | project numField',
|
||||
Query(t).where(t.boolField, True).project(t.numField).render(),
|
||||
)
|
||||
|
||||
def test_where_only_true_predicate(self):
|
||||
self.assertEqual(
|
||||
'test_table | project numField',
|
||||
Query(t).where(True).project(t.numField).render(),
|
||||
)
|
||||
|
||||
def test_where_false_predicate(self):
|
||||
self.assertEqual(
|
||||
'test_table | where false | project numField',
|
||||
Query(t).where(t.boolField, False).project(t.numField).render(),
|
||||
)
|
||||
|
||||
def test_where_not(self):
|
||||
self.assertEqual(
|
||||
"test_table | where not(boolField)",
|
||||
Query(t).where(f.not_of(t.boolField)).render(),
|
||||
)
|
||||
|
||||
def test_take(self):
|
||||
self.assertEqual(
|
||||
"test_table | take 3",
|
||||
|
@ -127,13 +163,13 @@ class TestQuery(TestBase):
|
|||
|
||||
def test_order(self):
|
||||
self.assertEqual(
|
||||
"test_table | order by numField desc nulls first",
|
||||
"test_table | sort by numField desc nulls first",
|
||||
Query(t).order_by(t.numField, order=Order.DESC, nulls=Nulls.FIRST).render(),
|
||||
)
|
||||
|
||||
def test_order_expression_in_arg(self):
|
||||
self.assertEqual(
|
||||
"test_table | order by strlen(stringField) desc nulls first",
|
||||
"test_table | sort by strlen(stringField) desc nulls first",
|
||||
Query(t).order_by(f.strlen(t.stringField), order=Order.DESC, nulls=Nulls.FIRST).render(),
|
||||
)
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче