Add anyif, percentiles_array, sample-distinct, top-hitters, countof, ingestion_time (#78)

* Added any_if, fixed some type hints, improved style

* Added percentiles_array

* Add tests

* Add support for sample-distinct + test

* Add support for top-hitters + test

* Implemented countof
closes #79

* #80 Add ingestion_time method

* Add link and fix now() return type

Co-authored-by: Yonatan Most <>
This commit is contained in:
Yonatan Most 2020-05-31 11:44:08 +03:00 коммит произвёл GitHub
Родитель 97ab6162de
Коммит 325b81b456
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
6 изменённых файлов: 197 добавлений и 85 удалений

Просмотреть файл

@ -2,6 +2,7 @@
<dictionary name="pykusto">
<words>
<w>acos</w>
<w>anyif</w>
<w>asctime</w>
<w>asin</w>
<w>atan</w>

42
pykusto/enums.py Normal file
Просмотреть файл

@ -0,0 +1,42 @@
from enum import Enum
class Order(Enum):
ASC = "asc"
DESC = "desc"
class Nulls(Enum):
FIRST = "first"
LAST = "last"
class JoinKind(Enum):
INNERUNIQUE = "innerunique"
INNER = "inner"
LEFTOUTER = "leftouter"
RIGHTOUTER = "rightouter"
FULLOUTER = "fullouter"
LEFTANTI = "leftanti"
ANTI = "anti"
LEFTANTISEMI = "leftantisemi"
RIGHTANTI = "rightanti"
RIGHTANTISEMI = "rightantisemi"
LEFTSEMI = "leftsemi"
RIGHTSEMI = "rightsemi"
class Distribution(Enum):
SINGLE = 'single'
PER_NODE = 'per_node'
PER_SHARD = 'per_shard'
class BagExpansion(Enum):
BAG = "bag"
ARRAY = "array"
class Kind(Enum):
NORMAL = 'normal'
REGEX = 'regex'

Просмотреть файл

@ -2,9 +2,10 @@ import json
from itertools import chain
from typing import Union
from pykusto.enums import Kind
from pykusto.expressions import AnyTypeColumn, NumberType, NumberExpression, TimespanType, \
DatetimeExpression, TimespanExpression, ArrayType, DynamicType, DatetimeType, BaseExpression, BooleanType, \
ExpressionType, AggregationExpression, StringType, StringExpression, BooleanExpression, \
ExpressionType, StringType, StringExpression, BooleanExpression, \
NumberAggregationExpression, MappingAggregationExpression, ArrayAggregationExpression, to_kql, DynamicExpression, \
ArrayExpression, ColumnToType, BaseColumn, AnyExpression, AnyAggregationExpression, MappingExpression
from pykusto.kql_converters import KQL
@ -130,8 +131,7 @@ class Functions:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/casefunction
"""
res = f"case({to_kql(predicate)}, {to_kql(val)}, {', '.join([to_kql(arg) for arg in args])})"
return AnyExpression(KQL(res))
return AnyExpression(KQL(f"case({to_kql(predicate)}, {to_kql(val)}, {', '.join(to_kql(arg) for arg in args)})"))
@staticmethod
def ceiling(expr: NumberType) -> NumberExpression:
@ -156,9 +156,13 @@ class Functions:
# def cot(self): return
# def countof(self): return
#
#
@staticmethod
def count_of(text: StringType, search: StringType, kind: Kind = Kind.NORMAL) -> NumberExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/countoffunction
"""
return NumberExpression(KQL(f'countof({to_kql(text)}, {to_kql(search)}, {to_kql(kind.value)})'))
# def current_cluster_endpoint(self): return
#
#
@ -368,11 +372,15 @@ class Functions:
# def indexof_regex(self): return
#
#
# def ingestion_time(self): return
#
#
# def isascii(self): return
@staticmethod
def ingestion_time() -> DatetimeExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/ingestiontimefunction
"""
return DatetimeExpression(KQL('ingestion_time()'))
@staticmethod
def is_empty(expr: ExpressionType) -> BooleanExpression:
"""
@ -506,13 +514,13 @@ class Functions:
raise NotImplemented # pragma: no cover
@staticmethod
def now(offset: TimespanType = None) -> StringExpression:
def now(offset: TimespanType = None) -> DatetimeExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/nowfunction
"""
if offset:
return StringExpression(KQL(f'now({to_kql(offset)})'))
return StringExpression(KQL('now()'))
return DatetimeExpression(KQL(f'now({to_kql(offset)})'))
return DatetimeExpression(KQL('now()'))
@staticmethod
def pack(**kwargs: ExpressionType) -> MappingExpression:
@ -883,7 +891,7 @@ class Functions:
"""
res = f'strcat_delim({to_kql(delimiter)}, {to_kql(expr1)}, {to_kql(expr2)}'
if len(expressions) > 0:
res = res + ', ' + ', '.join([to_kql(expr) for expr in expressions])
res = res + ', ' + ', '.join(to_kql(expr) for expr in expressions)
return StringExpression(KQL(res + ')'))
@staticmethod
@ -1091,24 +1099,28 @@ class Functions:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/any-aggfunction
"""
res = f"any({', '.join([arg.kql for arg in args])})"
return AnyAggregationExpression(KQL(res))
return AnyAggregationExpression(KQL(f"any({', '.join(arg.kql for arg in args)})"))
@staticmethod
def any_if(expr: ExpressionType, predicate: BooleanType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/anyif-aggfunction
"""
return AnyAggregationExpression(KQL(f"anyif({to_kql(expr)}, {to_kql(predicate)})"))
@staticmethod
def arg_max(*args: ExpressionType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/arg-max-aggfunction
"""
res = f"arg_max({', '.join([arg.kql for arg in args])})"
return AnyAggregationExpression(KQL(res))
return AnyAggregationExpression(KQL(f"arg_max({', '.join(arg.kql for arg in args)})"))
@staticmethod
def arg_min(*args: ExpressionType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/arg-min-aggfunction
"""
res = f"arg_min({', '.join([arg.kql for arg in args])})"
return AnyAggregationExpression(KQL(res))
return AnyAggregationExpression(KQL(f"arg_min({', '.join(arg.kql for arg in args)})"))
@staticmethod
def avg(expr: ExpressionType) -> NumberAggregationExpression:
@ -1132,8 +1144,7 @@ class Functions:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/count-aggfunction
"""
res = "count()" if col is None else f"count({col.kql})"
return NumberAggregationExpression(KQL(res))
return NumberAggregationExpression(KQL("count()" if col is None else f"count({col.kql})"))
@staticmethod
def count_if(predicate: BooleanType) -> NumberAggregationExpression:
@ -1162,7 +1173,7 @@ class Functions:
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/make-bag-aggfunction
"""
if max_size:
return MappingAggregationExpression(KQL(f'make_bag({expr}, {max_size})'))
return MappingAggregationExpression(KQL(f'make_bag({to_kql(expr)}, {to_kql(max_size)})'))
return MappingAggregationExpression(KQL(f'make_bag({to_kql(expr)})'))
@staticmethod
@ -1171,7 +1182,7 @@ class Functions:
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/makelist-aggfunction
"""
if max_size:
return ArrayAggregationExpression(KQL(f'make_list({expr}, {max_size})'))
return ArrayAggregationExpression(KQL(f'make_list({to_kql(expr)}, {to_kql(max_size)})'))
return ArrayAggregationExpression(KQL(f'make_list({to_kql(expr)})'))
@staticmethod
@ -1180,83 +1191,88 @@ class Functions:
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/makeset-aggfunction
"""
if max_size:
return ArrayAggregationExpression(KQL(f'make_set({expr}, {max_size})'))
return ArrayAggregationExpression(KQL(f'make_set({to_kql(expr)}, {to_kql(max_size)})'))
return ArrayAggregationExpression(KQL(f'make_set({to_kql(expr)})'))
@staticmethod
def max(expr: ExpressionType) -> AggregationExpression:
def max(expr: ExpressionType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/makeset-aggfunction
"""
return AnyAggregationExpression(KQL(f'max({to_kql(expr)})'))
@staticmethod
def min(expr: ExpressionType) -> AggregationExpression:
def min(expr: ExpressionType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/min-aggfunction
"""
return AnyAggregationExpression(KQL(f'min({to_kql(expr)})'))
@staticmethod
def max_if(expr: ExpressionType, predicate: BooleanType) -> AggregationExpression:
def max_if(expr: ExpressionType, predicate: BooleanType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/maxif-aggfunction
"""
return AnyAggregationExpression(KQL(f'maxif({to_kql(expr)}, {to_kql(predicate)})'))
@staticmethod
def min_if(expr: ExpressionType, predicate: BooleanType) -> AggregationExpression:
def min_if(expr: ExpressionType, predicate: BooleanType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/minif-aggfunction
"""
return AnyAggregationExpression(KQL(f'minif({to_kql(expr)}, {to_kql(predicate)})'))
@staticmethod
def percentile(expr: ExpressionType, per: NumberType) -> AggregationExpression:
def percentile(expr: ExpressionType, per: NumberType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/percentiles-aggfunction
"""
res = f'percentiles({expr}, {to_kql(per)})'
return AnyAggregationExpression(KQL(res))
return AnyAggregationExpression(KQL(f'percentiles({to_kql(expr)}, {to_kql(per)})'))
@staticmethod
def percentiles(expr: ExpressionType, *pers: NumberType) -> AggregationExpression:
def percentiles(expr: ExpressionType, *pers: NumberType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/percentiles-aggfunction
"""
res = f"percentiles({expr.kql}, {', '.join([str(to_kql(per)) for per in pers])})"
return AnyAggregationExpression(KQL(res))
return AnyAggregationExpression(KQL(f"percentiles({to_kql(expr)}, {', '.join(str(to_kql(per)) for per in pers)})"))
@staticmethod
def stdev(expr: ExpressionType) -> AggregationExpression:
def percentiles_array(expr: ExpressionType, *pers: NumberType) -> ArrayAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/percentiles-aggfunction
"""
return ArrayAggregationExpression(KQL(f"percentiles_array({to_kql(expr)}, {', '.join(str(to_kql(per)) for per in pers)})"))
@staticmethod
def stdev(expr: ExpressionType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/stdev-aggfunction
"""
return AnyAggregationExpression(KQL(f'stdev({to_kql(expr)})'))
@staticmethod
def stdevif(expr: ExpressionType, predicate: BooleanType) -> AggregationExpression:
def stdevif(expr: ExpressionType, predicate: BooleanType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/stdevif-aggfunction
"""
return AnyAggregationExpression(KQL(f'stdevif({to_kql(expr)}, {to_kql(predicate)})'))
@staticmethod
def stdevp(expr: ExpressionType) -> AggregationExpression:
def stdevp(expr: ExpressionType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/stdevp-aggfunction
"""
return AnyAggregationExpression(KQL(f'stdevp({to_kql(expr)})'))
@staticmethod
def sum(expr: ExpressionType) -> AggregationExpression:
def sum(expr: ExpressionType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sum-aggfunction
"""
return AnyAggregationExpression(KQL(f'sum({to_kql(expr)})'))
@staticmethod
def sum_if(expr: ExpressionType, predicate: BooleanType) -> AggregationExpression:
def sum_if(expr: ExpressionType, predicate: BooleanType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sumif-aggfunction
"""
@ -1270,21 +1286,21 @@ class Functions:
# return
@staticmethod
def variance(expr: ExpressionType) -> AggregationExpression:
def variance(expr: ExpressionType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/variance-aggfunction
"""
return AnyAggregationExpression(KQL(f'variance({to_kql(expr)})'))
@staticmethod
def variance_if(expr: ExpressionType, predicate: BooleanType) -> AggregationExpression:
def variance_if(expr: ExpressionType, predicate: BooleanType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/varianceif-aggfunction
"""
return AnyAggregationExpression(KQL(f'varianceif({to_kql(expr)}, {to_kql(predicate)})'))
@staticmethod
def variancep(expr: ExpressionType) -> AggregationExpression:
def variancep(expr: ExpressionType) -> AnyAggregationExpression:
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/variancep-aggfunction
"""

Просмотреть файл

@ -1,57 +1,21 @@
from abc import abstractmethod
from copy import copy, deepcopy
from enum import Enum
from itertools import chain
from types import FunctionType
from typing import Tuple, List, Union, Optional
from pykusto.client import Table, KustoResponse
from pykusto.enums import Order, Nulls, JoinKind, Distribution, BagExpansion
from pykusto.expressions import BooleanType, ExpressionType, AggregationExpression, OrderedType, \
StringType, AssignmentBase, AssignmentFromAggregationToColumn, AssignmentToSingleColumn, AnyTypeColumn, \
BaseExpression, \
AssignmentFromColumnToColumn, AnyExpression, to_kql, expression_to_type, BaseColumn
AssignmentFromColumnToColumn, AnyExpression, to_kql, expression_to_type, BaseColumn, NumberType
from pykusto.kql_converters import KQL
from pykusto.logger import logger
from pykusto.type_utils import KustoType, typed_column, plain_expression
from pykusto.udf import stringify_python_func
class Order(Enum):
ASC = "asc"
DESC = "desc"
class Nulls(Enum):
FIRST = "first"
LAST = "last"
class JoinKind(Enum):
INNERUNIQUE = "innerunique"
INNER = "inner"
LEFTOUTER = "leftouter"
RIGHTOUTER = "rightouter"
FULLOUTER = "fullouter"
LEFTANTI = "leftanti"
ANTI = "anti"
LEFTANTISEMI = "leftantisemi"
RIGHTANTI = "rightanti"
RIGHTANTISEMI = "rightantisemi"
LEFTSEMI = "leftsemi"
RIGHTSEMI = "rightsemi"
class Distribution(Enum):
SINGLE = 'single'
PER_NODE = 'per_node'
PER_SHARD = 'per_shard'
class BagExpansion(Enum):
BAG = "bag"
ARRAY = "array"
class Query:
_head: Optional['Query']
_table: Optional[Table]
@ -117,11 +81,11 @@ class Query:
def project_away(self, *columns: StringType) -> 'ProjectAwayQuery':
return ProjectAwayQuery(self, columns)
def distinct(self, *columns: BaseExpression) -> 'DistinctQuery':
def distinct(self, *columns: BaseColumn) -> 'DistinctQuery':
return DistinctQuery(self, columns)
def distinct_all(self) -> 'DistinctQuery':
return DistinctQuery(self, (AnyExpression(KQL("*")),))
return DistinctQuery(self, (AnyTypeColumn(KQL("*")),))
def extend(self, *args: Union[BaseExpression, AssignmentBase], **kwargs: ExpressionType) -> 'ExtendQuery':
return ExtendQuery(self, *self.extract_assignments(*args, **kwargs))
@ -267,16 +231,62 @@ class ProjectAwayQuery(Query):
class DistinctQuery(Query):
_columns: Tuple[BaseExpression, ...]
_columns: Tuple[BaseColumn, ...]
def __init__(self, head: 'Query', columns: Tuple[BaseExpression]) -> None:
def __init__(self, head: 'Query', columns: Tuple[BaseColumn]) -> None:
super().__init__(head)
self._columns = columns
def sample(self, number_of_values: NumberType) -> 'SampleDistinctQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/sampledistinctoperator
"""
assert len(self._columns) == 1, "sample-distinct supports only one column"
return SampleDistinctQuery(self._head, self._columns[0], number_of_values)
def top_hitters(self, number_of_values: NumberType) -> 'TopHittersQuery':
"""
https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/tophittersoperator
"""
assert len(self._columns) == 1, "top-hitters supports only one column"
return TopHittersQuery(self._head, self._columns[0], number_of_values)
def _compile(self) -> KQL:
return KQL(f"distinct {', '.join(c.kql for c in self._columns)}")
class SampleDistinctQuery(Query):
_number_of_values: NumberType
_column: BaseColumn
def __init__(self, head: 'Query', column: BaseColumn, number_of_values: NumberType) -> None:
super().__init__(head)
self._column = column
self._number_of_values = number_of_values
def _compile(self) -> KQL:
return KQL(f"sample-distinct {to_kql(self._number_of_values)} of {self._column.kql}")
class TopHittersQuery(Query):
_number_of_values: NumberType
_column: BaseColumn
_by_expression: Optional[NumberType]
def __init__(self, head: 'Query', column: BaseColumn, number_of_values: NumberType, by_expression: Optional[NumberType] = None) -> None:
super().__init__(head)
self._column = column
self._number_of_values = number_of_values
self._by_expression = by_expression
def by(self, by_expression: NumberType) -> 'TopHittersQuery':
assert self._by_expression is None, "duplicate 'by' clause"
return TopHittersQuery(self._head, self._column, self._number_of_values, by_expression)
def _compile(self) -> KQL:
return KQL(f"top-hitters {to_kql(self._number_of_values)} of {self._column.kql}{'' if self._by_expression is None else f' by {to_kql(self._by_expression)}'}")
class ExtendQuery(Query):
_assignments: Tuple[AssignmentBase, ...]

Просмотреть файл

@ -454,6 +454,12 @@ class TestFunction(TestBase):
Query().extend(foo=f.split("1_2", "_")[3]).render()
)
def test_count_of(self):
self.assertEqual(
' | where (countof(stringField, "abc", "normal")) == 2',
Query().where(f.count_of(t.stringField, "abc") == 2).render()
)
def test_tobool(self):
self.assertEqual(
" | where tobool(stringField)",
@ -524,6 +530,12 @@ class TestFunction(TestBase):
Query().summarize(f.any(t.stringField, t.numField, t.boolField)).render()
)
def test_any_if(self):
self.assertEqual(
" | summarize anyif(stringField, boolField)",
Query().summarize(f.any_if(t.stringField, t.boolField)).render()
)
def test_aggregation_assign_to(self):
self.assertEqual(
" | summarize foo = any(stringField)",
@ -711,6 +723,12 @@ class TestFunction(TestBase):
Query().summarize(f.percentiles(t.numField, 5, 50, 95)).render()
)
def test_percentiles_array(self):
self.assertEqual(
" | summarize percentiles_array(numField, 5, 50, 95)",
Query().summarize(f.percentiles_array(t.numField, 5, 50, 95)).render()
)
def test_stdev(self):
self.assertEqual(
" | summarize stdev(numField)",
@ -905,3 +923,9 @@ class TestFunction(TestBase):
' | extend foo = array_split(arrayField, numField)',
Query().extend(foo=f.array_split(t.arrayField, t.numField)).render()
)
def test_ingestion_time(self):
self.assertEqual(
' | extend ingestionTime = ingestion_time()',
Query().extend(ingestionTime=f.ingestion_time()).render()
)

Просмотреть файл

@ -1,9 +1,10 @@
import pandas as pd
from pykusto.client import PyKustoClient
from pykusto.enums import Order, Nulls, JoinKind, Distribution, BagExpansion
from pykusto.expressions import column_generator as col
from pykusto.functions import Functions as f
from pykusto.query import Query, Order, Nulls, JoinKind, JoinException, BagExpansion, Distribution
from pykusto.query import Query, JoinException
from pykusto.type_utils import KustoType
from test.test_base import TestBase, mock_databases_response, MockKustoClient, mock_response
from test.test_base import test_table as t, mock_columns_response
@ -321,6 +322,24 @@ class TestQuery(TestBase):
Query(t).distinct(t.stringField, t.numField * 2).render(),
)
def test_distinct_sample(self):
self.assertEqual(
"test_table | sample-distinct 5 of stringField",
Query(t).distinct(t.stringField).sample(5).render(),
)
def test_top_hitters(self):
self.assertEqual(
"test_table | top-hitters 5 of stringField",
Query(t).distinct(t.stringField).top_hitters(5).render(),
)
def test_top_hitters_by(self):
self.assertEqual(
"test_table | top-hitters 5 of stringField by numField",
Query(t).distinct(t.stringField).top_hitters(5).by(t.numField).render(),
)
def test_distinct_all(self):
self.assertEqual(
"test_table | distinct *",