* Add assignment to mvexpand
* Make assignments type aware
Co-authored-by: Yonatan Most <>
This commit is contained in:
micky-amir 2020-04-22 17:41:07 +03:00 коммит произвёл GitHub
Родитель 3e72db2e13
Коммит a82f43c20d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 73 добавлений и 82 удалений

Просмотреть файл

@ -3,7 +3,7 @@ from typing import Any, List, Tuple, Mapping, Optional, Type
from typing import Union
from pykusto.type_utils import plain_expression, aggregation_expression, PythonTypes, kql_converter, KustoType, \
typed_column, KQL
typed_column, KQL, TypeRegistrar, get_base_types
ExpressionType = Union[PythonTypes, 'BaseExpression']
StringType = Union[str, 'StringExpression']
@ -586,8 +586,7 @@ class BaseColumn(BaseExpression):
_name: str
# We would prefer to use 'abc' to make the class abstract, but this can be done only if there is at least one
# abstract method, which we don't have here. We can't define "get_kusto_type" as abstract because at least one
# concrete subclass (NumberColumn) does not override it. Overriding __new___ is the next best solution.
# abstract method, which we don't have here. Overriding __new___ is the next best solution.
def __new__(cls, *args, **kwargs) -> 'BaseColumn':
assert cls is not BaseColumn, "BaseColumn is abstract"
return object.__new__(cls)
@ -605,9 +604,6 @@ class BaseColumn(BaseExpression):
def assign_to_single_column(self, column: 'AnyTypeColumn') -> 'AssignmentFromColumnToColumn':
return AssignmentFromColumnToColumn(column, self)
def get_kusto_type(self) -> KustoType:
raise NotImplementedError("BaseColumn has no type")
def __repr__(self) -> str:
return f'{self.__class__.__name__}({self._name})'
@ -622,48 +618,40 @@ class NumberColumn(BaseColumn, NumberExpression):
@typed_column(KustoType.BOOL)
class BooleanColumn(BaseColumn, BooleanExpression):
def get_kusto_type(self) -> KustoType:
return KustoType.BOOL
pass
@typed_column(KustoType.ARRAY)
class ArrayColumn(BaseColumn, ArrayExpression):
def get_kusto_type(self) -> KustoType:
return KustoType.ARRAY
pass
@typed_column(KustoType.MAPPING)
class MappingColumn(BaseColumn, MappingExpression):
def get_kusto_type(self) -> KustoType:
return KustoType.MAPPING
pass
class DynamicColumn(ArrayColumn, MappingColumn):
def get_kusto_type(self) -> KustoType:
raise ValueError("Column type unknown")
pass
@typed_column(KustoType.STRING)
class StringColumn(BaseColumn, StringExpression):
def get_kusto_type(self) -> KustoType:
return KustoType.STRING
pass
@typed_column(KustoType.DATETIME)
class DatetimeColumn(BaseColumn, DatetimeExpression):
def get_kusto_type(self) -> KustoType:
return KustoType.DATETIME
pass
@typed_column(KustoType.TIMESPAN)
class TimespanColumn(BaseColumn, TimespanExpression):
def get_kusto_type(self) -> KustoType:
return KustoType.TIMESPAN
pass
class AnyTypeColumn(NumberColumn, BooleanColumn, DynamicColumn, StringColumn, DatetimeColumn, TimespanColumn):
def get_kusto_type(self) -> KustoType:
raise ValueError("Column type unknown")
pass
class ColumnGenerator:
@ -697,3 +685,8 @@ def to_kql(obj: ExpressionType) -> KQL:
if isinstance(obj, BaseExpression):
return obj.kql
return kql_converter.for_obj(obj)
def expression_to_type(expression: ExpressionType, type_registrar: TypeRegistrar, fallback_type: Any) -> Any:
types = set(type_registrar.registry[base_type] for base_type in get_base_types(expression))
return next(iter(types)) if len(types) == 1 else fallback_type

Просмотреть файл

@ -9,9 +9,9 @@ from pykusto.client import Table, KustoResponse
from pykusto.expressions import BooleanType, ExpressionType, AggregationExpression, OrderedType, \
StringType, AssignmentBase, AssignmentFromAggregationToColumn, AssignmentToSingleColumn, AnyTypeColumn, \
BaseExpression, \
AssignmentFromColumnToColumn, AnyExpression, to_kql, ColumnToType
AssignmentFromColumnToColumn, AnyExpression, to_kql, expression_to_type, BaseColumn
from pykusto.logger import logger
from pykusto.type_utils import KustoType, KQL
from pykusto.type_utils import KustoType, KQL, typed_column, plain_expression
from pykusto.udf import stringify_python_func
@ -104,22 +104,8 @@ class Query:
def join(self, query: 'Query', kind: JoinKind = None) -> 'JoinQuery':
return JoinQuery(self, query, kind)
def project(
self, *args: Union[AnyTypeColumn, AssignmentBase, BaseExpression], **kwargs: ExpressionType
) -> 'ProjectQuery':
columns: List[AnyTypeColumn] = []
assignments: List[AssignmentBase] = []
for arg in args:
if isinstance(arg, AnyTypeColumn):
columns.append(arg)
elif isinstance(arg, AssignmentBase):
assignments.append(arg)
else:
assert isinstance(arg, BaseExpression), "Invalid assignment"
assignments.append(arg.assign_to())
for column_name, expression in kwargs.items():
assignments.append(AssignmentToSingleColumn(AnyTypeColumn(column_name), expression))
return ProjectQuery(self, columns, assignments)
def project(self, *args: Union[AssignmentBase, BaseExpression], **kwargs: ExpressionType) -> 'ProjectQuery':
return ProjectQuery(self, self.extract_assignments(*args, **kwargs))
def project_rename(self, *args: AssignmentFromColumnToColumn, **kwargs: AnyTypeColumn) -> 'ProjectRenameQuery':
assignments: List[AssignmentFromColumnToColumn] = list(args)
@ -137,18 +123,7 @@ class Query:
return DistinctQuery(self, (AnyExpression(KQL("*")),))
def extend(self, *args: Union[BaseExpression, AssignmentBase], **kwargs: ExpressionType) -> 'ExtendQuery':
assignments: List[AssignmentBase] = []
for arg in args:
if isinstance(arg, BaseExpression):
assignments.append(arg.assign_to())
else:
assignments.append(arg)
for column_name, expression in kwargs.items():
if isinstance(expression, BaseExpression):
assignments.append(expression.assign_to(AnyTypeColumn(column_name)))
else:
assignments.append(AnyExpression(to_kql(expression)).assign_to(AnyTypeColumn(column_name)))
return ExtendQuery(self, *assignments)
return ExtendQuery(self, *self.extract_assignments(*args, **kwargs))
def summarize(self, *args: Union[AggregationExpression, AssignmentFromAggregationToColumn],
**kwargs: AggregationExpression) -> 'SummarizeQuery':
@ -164,12 +139,13 @@ class Query:
return SummarizeQuery(self, assignments)
def mv_expand(
self, *columns: Union[AnyTypeColumn, ColumnToType], bag_expansion: BagExpansion = None,
with_item_index: AnyTypeColumn = None, limit: int = None
self, *args: Union[BaseExpression, AssignmentBase], bag_expansion: BagExpansion = None,
with_item_index: BaseColumn = None, limit: int = None, **kwargs: ExpressionType
) -> 'MvExpandQuery':
if len(columns) == 0:
assignments = self.extract_assignments(*args, **kwargs)
if len(assignments) == 0:
raise ValueError("Please specify one or more columns for mv-expand")
return MvExpandQuery(self, columns, bag_expansion, with_item_index, limit)
return MvExpandQuery(self, bag_expansion, with_item_index, limit, *assignments)
def custom(self, custom_query: str) -> 'CustomQuery':
return CustomQuery(self, custom_query)
@ -241,21 +217,34 @@ class Query:
def to_dataframe(self, table: Table = None):
return self.execute(table).to_dataframe()
@staticmethod
def extract_assignments(*args: Union[AssignmentBase, BaseExpression], **kwargs: ExpressionType) -> List[AssignmentBase]:
assignments: List[AssignmentBase] = []
for arg in args:
if isinstance(arg, BaseExpression):
assignments.append(arg.assign_to())
else:
assignments.append(arg)
for column_name, expression in kwargs.items():
column_type = expression_to_type(expression, typed_column, AnyTypeColumn)
if isinstance(expression, BaseExpression):
assignments.append(expression.assign_to(column_type(column_name)))
else:
expression_type = expression_to_type(expression, plain_expression, AnyExpression)
assignments.append(expression_type(to_kql(expression)).assign_to(column_type(column_name)))
return assignments
class ProjectQuery(Query):
_columns: List[AnyTypeColumn]
_assignments: List[AssignmentBase]
def __init__(self, head: 'Query', columns: List[AnyTypeColumn], assignments: List[AssignmentBase]) -> None:
def __init__(self, head: 'Query', assignments: List[AssignmentBase]) -> None:
super().__init__(head)
self._columns = columns
self._assignments = assignments
def _compile(self) -> KQL:
return KQL('project {}'.format(', '.join(chain(
(c.kql for c in self._columns),
(a.to_kql() for a in self._assignments)
))))
return KQL('project {}'.format(', '.join(a.to_kql() for a in self._assignments)))
class ProjectRenameQuery(Query):
@ -495,17 +484,14 @@ class SummarizeQuery(Query):
class MvExpandQuery(Query):
_columns: Tuple[Union[AnyTypeColumn, ColumnToType]]
_assignments: Tuple[AssignmentBase]
_bag_expansion: BagExpansion
_with_item_index: AnyTypeColumn
_with_item_index: BaseColumn
_limit: int
def __init__(
self, head: Query, columns: Tuple[Union[AnyTypeColumn, ColumnToType]], bag_expansion: BagExpansion,
with_item_index: AnyTypeColumn, limit: int
):
def __init__(self, head: Query, bag_expansion: BagExpansion, with_item_index: BaseColumn, limit: int, *assignments: AssignmentBase):
super(MvExpandQuery, self).__init__(head)
self._columns = columns
self._assignments = assignments
self._bag_expansion = bag_expansion
self._with_item_index = with_item_index
self._limit = limit
@ -516,7 +502,7 @@ class MvExpandQuery(Query):
res += "bagexpansion={} ".format(self._bag_expansion.value)
if self._with_item_index is not None:
res += "with_itemindex={} ".format(self._with_item_index.kql)
res += ", ".join([c.kql for c in self._columns])
res += ", ".join(a.to_kql() for a in self._assignments)
if self._limit:
res += " limit {}".format(self._limit)
return KQL(res)

Просмотреть файл

@ -2,7 +2,6 @@ from datetime import timedelta, datetime
from pykusto.expressions import column_generator as col, AnyTypeColumn
from pykusto.query import Query
from pykusto.type_utils import KustoType
from test.test_base import TestBase, test_table as t
@ -326,14 +325,3 @@ class TestExpressions(TestBase):
self.assertIsInstance(field2, AnyTypeColumn)
self.assertEqual('foo', field1.get_name())
self.assertEqual('foo.bar', field2.get_name())
def test_kusto_type(self):
self.assertEqual(KustoType.BOOL, t.boolField.get_kusto_type())
self.assertEqual(KustoType.ARRAY, t.arrayField.get_kusto_type())
self.assertEqual(KustoType.MAPPING, t.mapField.get_kusto_type())
self.assertEqual(KustoType.STRING, t.stringField.get_kusto_type())
self.assertEqual(KustoType.DATETIME, t.dateField.get_kusto_type())
self.assertEqual(KustoType.TIMESPAN, t.timespanField.get_kusto_type())
self.assertRaises(ValueError("Column type unknown"), t.dynamicField.get_kusto_type)
self.assertRaises(ValueError("Column type unknown"), col.anyTypeField.get_kusto_type)
self.assertRaises(NotImplementedError("BaseColumn has no type"), t.numField.get_kusto_type)

Просмотреть файл

@ -207,6 +207,30 @@ class TestQuery(TestBase):
Query(t).mv_expand(t.arrayField, t.arrayField2, t.arrayField3).render(),
)
def test_mv_expand_assign(self):
self.assertEqual(
"test_table | mv-expand expanded_field = arrayField",
Query(t).mv_expand(expanded_field=t.arrayField).render(),
)
def test_mv_expand_assign_to(self):
self.assertEqual(
"test_table | mv-expand expanded_field = arrayField",
Query(t).mv_expand(t.arrayField.assign_to(col.expanded_field)).render(),
)
def test_mv_expand_assign_to_with_assign_other_params(self):
self.assertEqual(
"test_table | mv-expand bagexpansion=bag with_itemindex=foo expanded_field = arrayField, expanded_field2 = arrayField2 limit 4",
Query(t).mv_expand(t.arrayField.assign_to(col.expanded_field), expanded_field2=t.arrayField2, bag_expansion=BagExpansion.BAG, with_item_index=col.foo, limit=4).render(),
)
def test_mv_expand_assign_multiple(self):
self.assertEqual(
"test_table | mv-expand expanded_field = arrayField, expanded_field2 = arrayField2",
Query(t).mv_expand(expanded_field=t.arrayField, expanded_field2=t.arrayField2).render(),
)
def test_mv_expand_to_type(self):
self.assertEqual(
"test_table | mv-expand arrayField to typeof(string), arrayField2 to typeof(int), arrayField3",