lib updates
This commit is contained in:
Родитель
a412e74aed
Коммит
9e44043464
|
@ -24,6 +24,8 @@ class DateOp(Literal):
|
|||
return object.__new__(cls)
|
||||
|
||||
def __init__(self, term):
|
||||
if is_data(term):
|
||||
term = term['date'] # FOR WHEN WE MIGHT DO Literal({"date":term})
|
||||
self.date = term
|
||||
Literal.__init__(self, float(Date(self.date)))
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ from jx_base.expressions.false_op import FALSE
|
|||
from mo_imports import expect
|
||||
from mo_json import BOOLEAN
|
||||
|
||||
NotOp, = expect("NotOp")
|
||||
NotOp = expect("NotOp")
|
||||
|
||||
|
||||
class ExistsOp(Expression):
|
||||
|
|
|
@ -14,7 +14,7 @@ from jx_base.expressions.literal import Literal
|
|||
from mo_imports import export, expect
|
||||
from mo_json import BOOLEAN
|
||||
|
||||
TRUE, = expect("TRUE")
|
||||
TRUE = expect("TRUE")
|
||||
|
||||
|
||||
class FalseOp(Literal):
|
||||
|
|
|
@ -10,17 +10,18 @@
|
|||
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from collections import OrderedDict
|
||||
from jx_base.expressions.null_op import NULL
|
||||
|
||||
from jx_base.expressions.false_op import FALSE
|
||||
|
||||
from jx_base.expressions._utils import simplified
|
||||
|
||||
from jx_base.expressions.expression import Expression
|
||||
from jx_base.expressions.or_op import OrOp
|
||||
from jx_base.language import is_op
|
||||
from mo_dots import startswith_field
|
||||
from mo_future import sort_using_key
|
||||
from mo_json import BOOLEAN
|
||||
from mo_logs import Log
|
||||
from mo_math import UNION
|
||||
|
||||
|
||||
class OuterJoinOp(Expression):
|
||||
|
@ -35,18 +36,16 @@ class OuterJoinOp(Expression):
|
|||
self.nests = nests
|
||||
last = "."
|
||||
for n in reversed(nests):
|
||||
path = n.path
|
||||
path = n.path.var
|
||||
if not startswith_field(path, last):
|
||||
Log.error("Expecting nests to be reverse nested order")
|
||||
last = path
|
||||
|
||||
def __data__(self):
|
||||
return {
|
||||
"outerjoin": {
|
||||
"from": self.frum.__data__(),
|
||||
"nests": [n.__data__() for n in self.nests],
|
||||
}
|
||||
}
|
||||
return {"outerjoin": {
|
||||
"from": self.frum.__data__(),
|
||||
"nests": [n.__data__() for n in self.nests],
|
||||
}}
|
||||
|
||||
def __eq__(self, other):
|
||||
return (
|
||||
|
@ -56,12 +55,9 @@ class OuterJoinOp(Expression):
|
|||
)
|
||||
|
||||
def vars(self):
|
||||
return (
|
||||
self.frum.vars()
|
||||
| self.nests.vars()
|
||||
| self.where.vars()
|
||||
| self.sort.vars()
|
||||
| self.limit.vars()
|
||||
return UNION(
|
||||
[self.frum.vars(), self.where.vars(), self.sort.vars(), self.limit.vars()]
|
||||
+ [n.vars() for n in self.nests.vars()]
|
||||
)
|
||||
|
||||
def map(self, mapping):
|
||||
|
@ -79,4 +75,14 @@ class OuterJoinOp(Expression):
|
|||
|
||||
@simplified
|
||||
def partial_eval(self):
|
||||
return OuterJoinOp(frum=self.frum.partial_eval(), nests=self.nests.partial_eval())
|
||||
|
||||
nests = []
|
||||
for n in self.nests:
|
||||
n = n.partial_eval()
|
||||
if n.where is FALSE:
|
||||
return NULL
|
||||
nests.append(n)
|
||||
|
||||
return OuterJoinOp(
|
||||
frum=self.frum.partial_eval(), nests=nests
|
||||
)
|
||||
|
|
|
@ -33,7 +33,7 @@ from mo_json.typed_encoder import untype_path
|
|||
from mo_logs import Log
|
||||
from mo_math import AND, UNION, is_number
|
||||
|
||||
Column, = expect("Column")
|
||||
Column = expect("Column")
|
||||
|
||||
|
||||
BAD_SELECT = "Expecting `value` or `aggregate` in select clause not {{select}}"
|
||||
|
|
|
@ -17,7 +17,7 @@ from math import isnan
|
|||
|
||||
from mo_dots import Data, data_types, listwrap, NullType, startswith_field
|
||||
from mo_dots.lists import list_types, is_many
|
||||
from mo_future import boolean_type, long, none_type, text, transpose
|
||||
from mo_future import boolean_type, long, none_type, text, transpose, function_type
|
||||
from mo_logs import Log
|
||||
from mo_times import Date
|
||||
|
||||
|
@ -158,6 +158,9 @@ def value_compare(left, right, ordering=1):
|
|||
:return: The return value is negative if x < y, zero if x == y and strictly positive if x > y.
|
||||
"""
|
||||
|
||||
if left is right:
|
||||
return 0
|
||||
|
||||
try:
|
||||
ltype = left.__class__
|
||||
rtype = right.__class__
|
||||
|
@ -210,6 +213,8 @@ def value_compare(left, right, ordering=1):
|
|||
if c != 0:
|
||||
return c
|
||||
return 0
|
||||
elif ltype is function_type:
|
||||
return 0
|
||||
elif left > right:
|
||||
return ordering
|
||||
elif left < right:
|
||||
|
|
|
@ -29,7 +29,7 @@ from mo_dots import (
|
|||
to_data,
|
||||
)
|
||||
from mo_future import binary_type, items, long, none_type, reduce, text
|
||||
from mo_json import INTEGER, NUMBER, STRING, python_type_to_json_type
|
||||
from mo_json import INTEGER, NUMBER, STRING, python_type_to_json_type, OBJECT
|
||||
from mo_times.dates import Date
|
||||
|
||||
DEBUG = False
|
||||
|
@ -73,52 +73,62 @@ def _get_schema_from_list(
|
|||
full_name = parent
|
||||
column = columns[full_name]
|
||||
if not column:
|
||||
es_type = d.__class__
|
||||
|
||||
column = Column(
|
||||
name=concat_field(table_name, full_name),
|
||||
es_column=full_name,
|
||||
es_index=".",
|
||||
es_type=d.__class__.__name__,
|
||||
jx_type=None, # WILL BE SET BELOW
|
||||
es_type=es_type,
|
||||
jx_type=native_type_to_json_type[es_type],
|
||||
last_updated=Date.now(),
|
||||
nested_path=nested_path,
|
||||
multi=1,
|
||||
)
|
||||
columns.add(column)
|
||||
column.es_type = _merge_python_type(column.es_type, d.__class__)
|
||||
column.jx_type = native_type_to_json_type[column.es_type]
|
||||
else:
|
||||
column.es_type = _merge_python_type(column.es_type, d.__class__)
|
||||
column.jx_type = native_type_to_json_type[column.es_type]
|
||||
else:
|
||||
for name, value in d.items():
|
||||
full_name = concat_field(parent, name)
|
||||
column = columns[full_name]
|
||||
|
||||
if is_container(value): # GET TYPE OF MULTIVALUE
|
||||
v = list(value)
|
||||
if len(v) == 0:
|
||||
es_type = none_type.__name__
|
||||
elif len(v) == 1:
|
||||
es_type = v[0].__class__.__name__
|
||||
else:
|
||||
es_type = reduce(
|
||||
_merge_python_type, (vi.__class__.__name__ for vi in value)
|
||||
)
|
||||
else:
|
||||
es_type = value.__class__.__name__
|
||||
|
||||
if not column:
|
||||
jx_type = native_type_to_json_type[es_type]
|
||||
column = Column(
|
||||
name=concat_field(table_name, full_name),
|
||||
es_column=full_name,
|
||||
es_index=".",
|
||||
es_type=value.__class__.__name__,
|
||||
jx_type=None, # WILL BE SET BELOW
|
||||
es_type=es_type,
|
||||
jx_type=jx_type,
|
||||
last_updated=Date.now(),
|
||||
nested_path=nested_path,
|
||||
cardinality=1 if jx_type == OBJECT else None,
|
||||
multi=1
|
||||
)
|
||||
columns.add(column)
|
||||
if is_container(value): # GET TYPE OF MULTIVALUE
|
||||
v = list(value)
|
||||
if len(v) == 0:
|
||||
this_type_name = none_type.__name__
|
||||
elif len(v) == 1:
|
||||
this_type_name = v[0].__class__.__name__
|
||||
else:
|
||||
this_type_name = reduce(
|
||||
_merge_python_type, (vi.__class__.__name__ for vi in value)
|
||||
)
|
||||
else:
|
||||
this_type_name = value.__class__.__name__
|
||||
column.es_type = _merge_python_type(column.es_type, this_type_name)
|
||||
try:
|
||||
column.jx_type = native_type_to_json_type[column.es_type]
|
||||
except Exception as e:
|
||||
raise e
|
||||
column.es_type = _merge_python_type(column.es_type, es_type)
|
||||
try:
|
||||
column.jx_type = native_type_to_json_type[column.es_type]
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
if this_type_name in {"object", "dict", "Mapping", "Data"}:
|
||||
if es_type in {"object", "dict", "Mapping", "Data"}:
|
||||
_get_schema_from_list(
|
||||
[value],
|
||||
table_name,
|
||||
|
@ -127,7 +137,7 @@ def _get_schema_from_list(
|
|||
columns,
|
||||
native_type_to_json_type,
|
||||
)
|
||||
elif this_type_name in {"list", "FlatList"}:
|
||||
elif es_type in {"list", "FlatList"}:
|
||||
np = listwrap(nested_path)
|
||||
newpath = unwraplist([join_field(split_field(np[0]) + [name])] + np)
|
||||
_get_schema_from_list(
|
||||
|
|
|
@ -21,4 +21,6 @@ class Table(object):
|
|||
def __data__(self):
|
||||
return self.name
|
||||
|
||||
def partial_eval(self):
|
||||
return self
|
||||
|
||||
|
|
|
@ -43,14 +43,13 @@ def is_aggsop(es, query):
|
|||
return False
|
||||
|
||||
|
||||
def get_decoders_by_path(query):
|
||||
def get_decoders_by_path(query, schema):
|
||||
"""
|
||||
RETURN MAP FROM QUERY PATH TO LIST OF DECODER ARRAYS
|
||||
|
||||
:param query:
|
||||
:return:
|
||||
"""
|
||||
schema = query.frum.schema
|
||||
output = {}
|
||||
|
||||
if query.edges:
|
||||
|
@ -152,9 +151,8 @@ def extract_aggs(select, query_path, schema):
|
|||
def aggop_to_es_queries(select, query_path, schema, query):
|
||||
base_agg = extract_aggs(select, query_path, schema)
|
||||
base_agg = NestedAggs(query_path).add(base_agg)
|
||||
split_decoders = get_decoders_by_path(query)
|
||||
|
||||
new_select, all_paths, split_select, var_to_columns = pre_process(query)
|
||||
new_select, all_paths, split_select, split_decoders, var_to_columns = pre_process(query)
|
||||
|
||||
# WE LET EACH DIMENSION ADD ITS OWN CODE FOR HANDLING INNER JOINS
|
||||
union_outer = query_to_outer_joins(query, all_paths, split_select, var_to_columns)
|
||||
|
@ -162,7 +160,7 @@ def aggop_to_es_queries(select, query_path, schema, query):
|
|||
start = 0
|
||||
decoders = [None] * (len(query.edges) + len(query.groupby))
|
||||
output = NestedAggs(".")
|
||||
for i, inner in enumerate(union_outer.terms):
|
||||
for i, outer in enumerate(union_outer.terms):
|
||||
acc = base_agg
|
||||
for p, path in enumerate(all_paths):
|
||||
decoder = split_decoders.get(path, Null)
|
||||
|
@ -172,7 +170,7 @@ def aggop_to_es_queries(select, query_path, schema, query):
|
|||
acc = d.append_query(path, acc)
|
||||
start += d.num_columns
|
||||
|
||||
where = first(nest.where for nest in inner.nests if nest.path == path)
|
||||
where = first(nest.where for nest in outer.nests if nest.path == path).partial_eval()
|
||||
if where is FALSE:
|
||||
continue
|
||||
elif not where or where is TRUE:
|
||||
|
|
|
@ -1,4 +1,8 @@
|
|||
from jx_elasticsearch.es52.expressions.utils import ES52, split_expression_by_path, split_expression_by_depth
|
||||
from jx_elasticsearch.es52.expressions.utils import (
|
||||
ES52,
|
||||
split_expression_by_path,
|
||||
split_expression_by_depth,
|
||||
)
|
||||
from jx_elasticsearch.es52.expressions.and_op import AndOp, es_and
|
||||
from jx_elasticsearch.es52.expressions.basic_eq_op import BasicEqOp
|
||||
from jx_elasticsearch.es52.expressions.basic_starts_with_op import BasicStartsWithOp
|
||||
|
|
|
@ -9,12 +9,13 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from jx_base.expressions import AndOp as AndOp_
|
||||
from jx_elasticsearch.es52.expressions.utils import ES52
|
||||
from jx_elasticsearch.es52.expressions.true_op import MATCH_ALL
|
||||
from mo_dots import dict_to_data
|
||||
from mo_imports import export
|
||||
|
||||
from jx_base.expressions import AndOp as AndOp_
|
||||
from jx_elasticsearch.es52.expressions.true_op import MATCH_ALL
|
||||
from jx_elasticsearch.es52.expressions.utils import ES52
|
||||
from mo_dots import dict_to_data
|
||||
|
||||
|
||||
class AndOp(AndOp_):
|
||||
def to_es(self, schema):
|
||||
|
@ -28,5 +29,5 @@ def es_and(terms):
|
|||
return dict_to_data({"bool": {"filter": terms}})
|
||||
|
||||
|
||||
export("jx_elasticsearch.es52.expressions.utils", AndOp)
|
||||
export("jx_elasticsearch.es52.expressions.or_op", es_and)
|
||||
export("jx_elasticsearch.es52.expressions.utils", AndOp)
|
||||
|
|
|
@ -14,7 +14,9 @@ from jx_base.expressions import (
|
|||
Variable as Variable_,
|
||||
is_literal,
|
||||
NestedOp,
|
||||
IDENTITY, AndOp)
|
||||
IDENTITY,
|
||||
AndOp,
|
||||
)
|
||||
from jx_base.language import is_op
|
||||
from jx_elasticsearch.es52.painless import Painless
|
||||
from mo_dots import is_many
|
||||
|
@ -27,9 +29,12 @@ class BasicEqOp(BasicEqOp_):
|
|||
return self.lang[NestedOp(
|
||||
path=self.lhs.frum.partial_eval(),
|
||||
select=IDENTITY,
|
||||
where=AndOp([self.lhs.where, BasicEqOp(self.lhs.select, self.rhs)]).partial_eval(),
|
||||
where=AndOp([
|
||||
self.lhs.where,
|
||||
BasicEqOp(self.lhs.select, self.rhs),
|
||||
]).partial_eval(),
|
||||
sort=self.lhs.sort.partial_eval(),
|
||||
limit=self.limit.partial_eval()
|
||||
limit=self.limit.partial_eval(),
|
||||
)]
|
||||
return self.lang[BasicEqOp([self.lhs.partial_eval(), self.rhs.partial_eval()])]
|
||||
|
||||
|
|
|
@ -19,7 +19,9 @@ from jx_elasticsearch.es52.expressions.false_op import MATCH_NONE
|
|||
from jx_elasticsearch.es52.expressions.true_op import MATCH_ALL
|
||||
from jx_elasticsearch.es52.painless import false_script
|
||||
from mo_future import first
|
||||
from jx_elasticsearch.es52.painless import BasicStartsWithOp as PainlessBasicStartsWithOp
|
||||
from jx_elasticsearch.es52.painless import (
|
||||
BasicStartsWithOp as PainlessBasicStartsWithOp,
|
||||
)
|
||||
|
||||
|
||||
class BasicStartsWithOp(BasicStartsWithOp_):
|
||||
|
|
|
@ -16,7 +16,7 @@ from jx_elasticsearch.es52.expressions.exists_op import es_exists
|
|||
from jx_elasticsearch.es52.painless import Painless
|
||||
from mo_imports import expect
|
||||
|
||||
FindOp, = expect("FindOp")
|
||||
FindOp = expect("FindOp")
|
||||
|
||||
|
||||
class BooleanOp(BooleanOp_):
|
||||
|
|
|
@ -35,7 +35,7 @@ from mo_json import BOOLEAN, python_type_to_json_type, NUMBER_TYPES, same_json_t
|
|||
from mo_logs import Log
|
||||
from pyLibrary.convert import string2boolean
|
||||
|
||||
NestedOp, = expect("NestedOp")
|
||||
NestedOp = expect("NestedOp")
|
||||
|
||||
|
||||
class EqOp(EqOp_):
|
||||
|
@ -59,8 +59,7 @@ class EqOp(EqOp_):
|
|||
return EqOp([lhs, rhs])
|
||||
if is_op(lhs, NestedOp):
|
||||
return self.lang[NestedOp(
|
||||
path=lhs.frum,
|
||||
where=AndOp([lhs.where, EqOp([lhs.select, rhs])])
|
||||
path=lhs.frum, where=AndOp([lhs.where, EqOp([lhs.select, rhs])])
|
||||
)]
|
||||
|
||||
return EqOp([lhs, rhs])
|
||||
|
@ -92,12 +91,9 @@ class EqOp(EqOp_):
|
|||
return FALSE.to_es(schema)
|
||||
else:
|
||||
return (
|
||||
OrOp(
|
||||
[
|
||||
EqOp([self.lhs, values])
|
||||
for t, values in types.items()
|
||||
]
|
||||
)
|
||||
OrOp([
|
||||
EqOp([self.lhs, values]) for t, values in types.items()
|
||||
])
|
||||
.partial_eval()
|
||||
.to_es(schema)
|
||||
)
|
||||
|
@ -106,20 +102,16 @@ class EqOp(EqOp_):
|
|||
if c.jx_type == BOOLEAN:
|
||||
rhs = pull_functions[c.jx_type](rhs)
|
||||
rhs_type = python_type_to_json_type[rhs.__class__]
|
||||
if rhs_type == c.jx_type or (rhs_type in NUMBER_TYPES and c.jx_type in NUMBER_TYPES):
|
||||
if rhs_type == c.jx_type or (
|
||||
rhs_type in NUMBER_TYPES and c.jx_type in NUMBER_TYPES
|
||||
):
|
||||
return {"term": {c.es_column: rhs}}
|
||||
return FALSE.to_es(schema)
|
||||
else:
|
||||
return (
|
||||
ES52[
|
||||
CaseOp(
|
||||
[
|
||||
WhenOp(self.lhs.missing(), **{"then": self.rhs.missing()}),
|
||||
WhenOp(self.rhs.missing(), **{"then": FALSE}),
|
||||
BasicEqOp([self.lhs, self.rhs]),
|
||||
]
|
||||
)
|
||||
.partial_eval()
|
||||
]
|
||||
.to_es(schema)
|
||||
)
|
||||
return ES52[
|
||||
CaseOp([
|
||||
WhenOp(self.lhs.missing(), **{"then": self.rhs.missing()}),
|
||||
WhenOp(self.rhs.missing(), **{"then": FALSE}),
|
||||
BasicEqOp([self.lhs, self.rhs]),
|
||||
]).partial_eval()
|
||||
].to_es(schema)
|
||||
|
|
|
@ -17,7 +17,8 @@ from jx_base.expressions import (
|
|||
Variable as Variable_,
|
||||
is_literal,
|
||||
simplified,
|
||||
BooleanOp)
|
||||
BooleanOp,
|
||||
)
|
||||
from jx_base.language import is_op
|
||||
from jx_elasticsearch.es52.expressions.utils import ES52
|
||||
from jx_elasticsearch.es52.expressions.not_op import NotOp
|
||||
|
@ -37,11 +38,9 @@ class FindOp(FindOp_):
|
|||
):
|
||||
columns = [c for c in schema.leaves(self.value.var) if c.jx_type == STRING]
|
||||
if len(columns) == 1:
|
||||
return {
|
||||
"regexp": {
|
||||
columns[0].es_column: ".*" + re.escape(self.find.value) + ".*"
|
||||
}
|
||||
}
|
||||
return {"regexp": {
|
||||
columns[0].es_column: ".*" + re.escape(self.find.value) + ".*"
|
||||
}}
|
||||
# CONVERT TO SCRIPT, SIMPLIFY, AND THEN BACK TO FILTER
|
||||
self.simplified = False
|
||||
return ES52[Painless[self].partial_eval()].to_es(schema)
|
||||
|
|
|
@ -41,17 +41,17 @@ class InOp(InOp_):
|
|||
if is_literal(self.superset) and not is_many(self.superset.value):
|
||||
return {"term": {var: value2boolean(self.superset.value)}}
|
||||
else:
|
||||
return {"terms": {var: list(map(value2boolean, self.superset.value))}}
|
||||
return {"terms": {var: list(map(
|
||||
value2boolean, self.superset.value
|
||||
))}}
|
||||
else:
|
||||
if is_literal(self.superset) and not is_many(self.superset.value):
|
||||
return {"term": {var: self.superset.value}}
|
||||
else:
|
||||
return {"terms": {var: self.superset.value}}
|
||||
elif is_op(self.superset, TupleOp):
|
||||
return (
|
||||
OrOp([EqOp([self.value, s]) for s in self.superset.terms])
|
||||
.partial_eval()
|
||||
.to_es(schema)
|
||||
)
|
||||
return OrOp([
|
||||
EqOp([self.value, s]) for s in self.superset.terms
|
||||
]).partial_eval().to_es(schema)
|
||||
# THE HARD WAY
|
||||
return Painless[self].to_es_script(schema).to_es(schema)
|
||||
|
|
|
@ -15,7 +15,6 @@ from jx_elasticsearch.es52.expressions.utils import ES52
|
|||
|
||||
|
||||
class InnerJoinOp(InnerJoinOp_):
|
||||
|
||||
def to_es(self, schema):
|
||||
acc = None
|
||||
for nest in self.nests:
|
||||
|
@ -23,6 +22,6 @@ class InnerJoinOp(InnerJoinOp_):
|
|||
if not acc:
|
||||
acc = es
|
||||
else:
|
||||
es['query'] = es_and([es.query, acc])
|
||||
es["query"] = es_and([es.query, acc])
|
||||
acc = es
|
||||
return acc
|
||||
|
|
|
@ -30,6 +30,6 @@ class MissingOp(MissingOp_):
|
|||
else:
|
||||
return PainlessMissingOp.to_es_script(self, schema).to_es(schema)
|
||||
|
||||
|
||||
def es_missing(term):
|
||||
return {"bool": {"must_not": {"exists": {"field": term}}}}
|
||||
|
||||
|
|
|
@ -34,38 +34,28 @@ class NeOp(NeOp_):
|
|||
|
||||
if lhs.many:
|
||||
if rhs.many:
|
||||
return es_not(
|
||||
ScriptOp(
|
||||
(
|
||||
"("
|
||||
+ lhs.expr
|
||||
+ ").size()==("
|
||||
+ rhs.expr
|
||||
+ ").size() && "
|
||||
+ "("
|
||||
+ rhs.expr
|
||||
+ ").containsAll("
|
||||
+ lhs.expr
|
||||
+ ")"
|
||||
)
|
||||
).to_es(schema)
|
||||
)
|
||||
return es_not(ScriptOp((
|
||||
"("
|
||||
+ lhs.expr
|
||||
+ ").size()==("
|
||||
+ rhs.expr
|
||||
+ ").size() && "
|
||||
+ "("
|
||||
+ rhs.expr
|
||||
+ ").containsAll("
|
||||
+ lhs.expr
|
||||
+ ")"
|
||||
)).to_es(schema))
|
||||
else:
|
||||
return es_not(
|
||||
ScriptOp(
|
||||
"(" + lhs.expr + ").contains(" + rhs.expr + ")"
|
||||
).to_es(schema)
|
||||
)
|
||||
return es_not(ScriptOp(
|
||||
"(" + lhs.expr + ").contains(" + rhs.expr + ")"
|
||||
).to_es(schema))
|
||||
else:
|
||||
if rhs.many:
|
||||
return es_not(
|
||||
ScriptOp(
|
||||
"(" + rhs.expr + ").contains(" + lhs.expr + ")"
|
||||
).to_es(schema)
|
||||
)
|
||||
return es_not(ScriptOp(
|
||||
"(" + rhs.expr + ").contains(" + lhs.expr + ")"
|
||||
).to_es(schema))
|
||||
else:
|
||||
return es_not(
|
||||
ScriptOp(
|
||||
"(" + lhs.expr + ") != (" + rhs.expr + ")"
|
||||
).to_es(schema)
|
||||
)
|
||||
return es_not(ScriptOp(
|
||||
"(" + lhs.expr + ") != (" + rhs.expr + ")"
|
||||
).to_es(schema))
|
||||
|
|
|
@ -17,22 +17,21 @@ from mo_imports import export
|
|||
class NestedOp(_NestedOp):
|
||||
def to_es(self, schema):
|
||||
if self.path.var == ".":
|
||||
return ES52[self.select].to_es() | {"query": ES52[self.where].to_es(schema), "from": 0}
|
||||
return ES52[self.select].to_es() | {
|
||||
"query": ES52[self.where].to_es(schema),
|
||||
"from": 0,
|
||||
}
|
||||
elif self.select is not NULL and bool(self.select):
|
||||
return {
|
||||
"nested": {
|
||||
"path": self.path.var,
|
||||
"query": ES52[self.where].to_es(schema),
|
||||
"inner_hits": (ES52[self.select].to_es() | {"size": 100000})
|
||||
}
|
||||
}
|
||||
return {"nested": {
|
||||
"path": self.path.var,
|
||||
"query": ES52[self.where].to_es(schema),
|
||||
"inner_hits": (ES52[self.select].to_es() | {"size": 100000}),
|
||||
}}
|
||||
else:
|
||||
return {
|
||||
"nested": {
|
||||
"path": self.path.var,
|
||||
"query": ES52[self.where].to_es(schema)
|
||||
}
|
||||
}
|
||||
return {"nested": {
|
||||
"path": self.path.var,
|
||||
"query": ES52[self.where].to_es(schema),
|
||||
}}
|
||||
|
||||
|
||||
export("jx_elasticsearch.es52.expressions.utils", NestedOp)
|
||||
|
|
|
@ -15,14 +15,15 @@ from jx_base.expressions import (
|
|||
Variable as Variable_,
|
||||
)
|
||||
from jx_base.language import is_op
|
||||
from jx_elasticsearch.es52.expressions.utils import ES52
|
||||
from jx_elasticsearch.es52.expressions.false_op import MATCH_NONE
|
||||
from jx_elasticsearch.es52.expressions.or_op import es_or
|
||||
from jx_elasticsearch.es52.expressions.utils import ES52
|
||||
from mo_dots import dict_to_data
|
||||
from mo_future import first
|
||||
from mo_imports import export
|
||||
from mo_imports import expect
|
||||
from mo_json import STRUCT
|
||||
|
||||
es_or = expect("es_or")
|
||||
|
||||
|
||||
class NotOp(NotOp_):
|
||||
def to_es(self, schema):
|
||||
|
@ -43,7 +44,3 @@ class NotOp(NotOp_):
|
|||
|
||||
def es_not(term):
|
||||
return dict_to_data({"bool": {"must_not": term}})
|
||||
|
||||
|
||||
export("jx_elasticsearch.es52.expressions.or_op", es_not)
|
||||
export("jx_elasticsearch.es52.expressions.or_op", NotOp)
|
||||
|
|
|
@ -10,11 +10,12 @@
|
|||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from jx_base.expressions import OrOp as OrOp_
|
||||
from jx_elasticsearch.es52.expressions.not_op import es_not, NotOp
|
||||
from jx_elasticsearch.es52.expressions.utils import ES52
|
||||
from mo_dots import dict_to_data
|
||||
from mo_imports import expect, export
|
||||
|
||||
NotOp, es_not, es_and = expect("NotOp", "es_not", "es_and")
|
||||
es_and = expect("es_and")
|
||||
|
||||
|
||||
class OrOp(OrOp_):
|
||||
|
@ -27,21 +28,18 @@ class OrOp(OrOp_):
|
|||
# {"bool":{"must_not":[a, b, c]}} ALSO RUNS IN PARALLEL
|
||||
|
||||
# OR(x) == NOT(AND(NOT(xi) for xi in x))
|
||||
output = es_not(
|
||||
es_and(
|
||||
[NotOp(t).partial_eval().to_es(schema) for t in self.terms]
|
||||
)
|
||||
)
|
||||
output = es_not(es_and([
|
||||
NotOp(t).partial_eval().to_es(schema) for t in self.terms
|
||||
]))
|
||||
return output
|
||||
else:
|
||||
# VERSION 6.2+
|
||||
return es_or(
|
||||
[ES52[t].partial_eval().to_es(schema) for t in self.terms]
|
||||
)
|
||||
return es_or([ES52[t].partial_eval().to_es(schema) for t in self.terms])
|
||||
|
||||
|
||||
def es_or(terms):
|
||||
return dict_to_data({"bool": {"should": terms}})
|
||||
|
||||
|
||||
export("jx_elasticsearch.es52.expressions.not_op", es_or)
|
||||
export("jx_elasticsearch.es52.expressions.utils", OrOp)
|
||||
|
|
|
@ -65,7 +65,8 @@ class PrefixOp(PrefixOp_):
|
|||
acc.append({"prefix": {col.es_column: self.prefix.value}})
|
||||
else:
|
||||
Log.error(
|
||||
'do not know how to {"prefix":{{column|quote}}} of type {{type}}',
|
||||
'do not know how to {"prefix":{{column|quote}}} of type'
|
||||
" {{type}}",
|
||||
column=col.name,
|
||||
type=col.jx_type,
|
||||
)
|
||||
|
|
|
@ -16,6 +16,7 @@ from mo_future import first
|
|||
from pyLibrary.convert import string2regexp
|
||||
from jx_elasticsearch.es52.painless import SuffixOp as PainlessSuffixOp
|
||||
|
||||
|
||||
class SuffixOp(SuffixOp_):
|
||||
def to_es(self, schema):
|
||||
if not self.suffix:
|
||||
|
|
|
@ -29,15 +29,17 @@ from jx_elasticsearch.es52.painless import Painless
|
|||
from jx_elasticsearch.es52.painless.es_script import es_script
|
||||
from mo_dots import Null, to_data, join_field, split_field, coalesce, startswith_field
|
||||
from mo_future import first
|
||||
from mo_imports import expect
|
||||
from mo_imports import expect, delay_import
|
||||
from mo_json import EXISTS
|
||||
from mo_json.typed_encoder import EXISTS_TYPE, NESTED_TYPE
|
||||
from mo_logs import Log
|
||||
from mo_math import MAX
|
||||
|
||||
MATCH_NONE, MATCH_ALL, AndOp, OrOp, NestedOp, = expect(
|
||||
|
||||
MATCH_NONE, MATCH_ALL, AndOp, OrOp, NestedOp = expect(
|
||||
"MATCH_NONE", "MATCH_ALL", "AndOp", "OrOp", "NestedOp",
|
||||
)
|
||||
get_decoders_by_path = delay_import("jx_elasticsearch.es52.agg_op.get_decoders_by_path")
|
||||
|
||||
|
||||
def _inequality_to_esfilter(self, schema):
|
||||
|
@ -135,17 +137,11 @@ def split_nested_inner_variables(where, focal_path, var_to_columns):
|
|||
if startswith_field(focal_path, deepest):
|
||||
more_exprs.append(e.map({v: Variable(c.es_column)}))
|
||||
else:
|
||||
more_exprs.append(
|
||||
e.map(
|
||||
{
|
||||
v: NestedOp(
|
||||
path=Variable(deepest),
|
||||
select=Variable(c.es_column),
|
||||
where=Variable(c.es_column).exists(),
|
||||
)
|
||||
}
|
||||
)
|
||||
)
|
||||
more_exprs.append(e.map({v: NestedOp(
|
||||
path=Variable(deepest),
|
||||
select=Variable(c.es_column),
|
||||
where=Variable(c.es_column).exists(),
|
||||
)}))
|
||||
wheres = more_exprs
|
||||
var_to_columns = {
|
||||
c.es_column: [c] for cs in var_to_columns.values() for c in cs
|
||||
|
@ -206,9 +202,9 @@ def setop_to_inner_joins(query, all_paths, split_select, var_to_columns):
|
|||
deeper_conditions = FALSE
|
||||
else:
|
||||
# ENSURE THIS IS NOT "OPTIMIZED" TO FALSE
|
||||
deeper_conditions = NotOp(
|
||||
NestedOp(path=Variable(nest_path), where=TRUE)
|
||||
)
|
||||
deeper_conditions = NotOp(NestedOp(
|
||||
path=Variable(nest_path), where=TRUE
|
||||
))
|
||||
deeper_conditions.simplified = True
|
||||
|
||||
inner_join = inner_join.partial_eval()
|
||||
|
@ -234,20 +230,18 @@ def pre_process(query):
|
|||
new_select, split_select = get_selects(query)
|
||||
where_vars = query.where.vars()
|
||||
var_to_columns = {v.var: schema.values(v.var) for v in where_vars}
|
||||
split_decoders = get_decoders_by_path(query, schema)
|
||||
|
||||
# FROM DEEPEST TO SHALLOWEST
|
||||
all_paths = list(
|
||||
reversed(
|
||||
sorted(
|
||||
set(c.nested_path[0] for v in where_vars for c in var_to_columns[v.var])
|
||||
| {"."}
|
||||
| set(schema.query_path)
|
||||
| set(split_select.keys())
|
||||
)
|
||||
)
|
||||
)
|
||||
all_paths = list(reversed(sorted(
|
||||
set(c.nested_path[0] for v in where_vars for c in var_to_columns[v.var])
|
||||
| {"."}
|
||||
| set(schema.query_path)
|
||||
| set(split_select.keys())
|
||||
| split_decoders.keys()
|
||||
)))
|
||||
|
||||
return new_select, all_paths, split_select, var_to_columns
|
||||
return new_select, all_paths, split_select, split_decoders, var_to_columns
|
||||
|
||||
|
||||
def setop_to_es_queries(query, all_paths, split_select, var_to_columns):
|
||||
|
@ -293,11 +287,9 @@ def query_to_outer_joins(query, all_paths, split_select, var_to_columns):
|
|||
exclude.append(NotOp(t))
|
||||
return output
|
||||
|
||||
all_nests = list(
|
||||
set(
|
||||
c.nested_path[0] for v in expr.vars() for c in frum.schema.values(v.var)
|
||||
)
|
||||
)
|
||||
all_nests = list(set(
|
||||
c.nested_path[0] for v in expr.vars() for c in frum.schema.values(v.var)
|
||||
))
|
||||
|
||||
if len(all_nests) > 1:
|
||||
Log.error("do not know how to handle")
|
||||
|
@ -323,7 +315,7 @@ def query_to_outer_joins(query, all_paths, split_select, var_to_columns):
|
|||
outer.nests.append(NestedOp(Variable(p), select=select, where=AndOp(nest)))
|
||||
output.terms.append(outer)
|
||||
|
||||
return output
|
||||
return output.partial_eval()
|
||||
|
||||
|
||||
def split_expression_by_path(expr, schema, lang=Language):
|
||||
|
@ -427,9 +419,9 @@ def split_expression_by_path(expr, schema, lang=Language):
|
|||
|
||||
acc = {}
|
||||
for e in exprs:
|
||||
nestings = list(
|
||||
set(c.nested_path[0] for v in e.vars() for c in var_to_columns[v])
|
||||
)
|
||||
nestings = list(set(
|
||||
c.nested_path[0] for v in e.vars() for c in var_to_columns[v]
|
||||
))
|
||||
if not nestings:
|
||||
a = acc.get(".")
|
||||
if not a:
|
||||
|
|
|
@ -31,11 +31,9 @@ class Variable(Variable_):
|
|||
else es_exists(c.es_column)
|
||||
)
|
||||
else:
|
||||
return es_and(
|
||||
[
|
||||
{"term": {c.es_column: True}}
|
||||
if c.es_type == BOOLEAN
|
||||
else es_exists(c.es_column)
|
||||
for c in cols
|
||||
]
|
||||
)
|
||||
return es_and([
|
||||
{"term": {c.es_column: True}}
|
||||
if c.es_type == BOOLEAN
|
||||
else es_exists(c.es_column)
|
||||
for c in cols
|
||||
])
|
||||
|
|
|
@ -18,11 +18,9 @@ from jx_elasticsearch.es52.expressions.or_op import OrOp
|
|||
|
||||
class WhenOp(WhenOp_):
|
||||
def to_es(self, schema):
|
||||
output = OrOp(
|
||||
[
|
||||
AndOp([self.when, BooleanOp(self.then)]),
|
||||
AndOp([NotOp(self.when), BooleanOp(self.els_)]),
|
||||
]
|
||||
).partial_eval()
|
||||
output = OrOp([
|
||||
AndOp([self.when, BooleanOp(self.then)]),
|
||||
AndOp([NotOp(self.when), BooleanOp(self.els_)]),
|
||||
]).partial_eval()
|
||||
|
||||
return output.to_es(schema)
|
||||
|
|
|
@ -19,7 +19,7 @@ from jx_elasticsearch.es52.painless.es_script import EsScript
|
|||
from mo_dots import Null
|
||||
from mo_json import BOOLEAN, NUMBER, STRING
|
||||
|
||||
AndOp, Literal, NumberOp, OrOp, WhenOp = [None]*5
|
||||
AndOp, Literal, NumberOp, OrOp, WhenOp = [None] * 5
|
||||
|
||||
|
||||
MAX_INT32 = 2147483647
|
||||
|
@ -53,7 +53,6 @@ return output.toString()
|
|||
"""
|
||||
|
||||
|
||||
|
||||
def _binary_to_es_script(self, schema, not_null=False, boolean=False, many=True):
|
||||
op, identity = _painless_operators[self.op]
|
||||
lhs = NumberOp(self.lhs).partial_eval().to_es_script(schema)
|
||||
|
@ -62,12 +61,7 @@ def _binary_to_es_script(self, schema, not_null=False, boolean=False, many=True)
|
|||
missing = OrOp([self.lhs.missing(), self.rhs.missing()])
|
||||
|
||||
return EsScript(
|
||||
type=NUMBER,
|
||||
miss=missing,
|
||||
frum=self,
|
||||
expr=script,
|
||||
schema=schema,
|
||||
many=False
|
||||
type=NUMBER, miss=missing, frum=self, expr=script, schema=schema, many=False
|
||||
)
|
||||
|
||||
|
||||
|
@ -77,17 +71,13 @@ def _inequality_to_es_script(self, schema, not_null=False, boolean=False, many=T
|
|||
rhs = NumberOp(self.rhs).partial_eval().to_es_script(schema).expr
|
||||
script = "(" + lhs + ") " + op + " (" + rhs + ")"
|
||||
|
||||
output = (
|
||||
WhenOp(
|
||||
OrOp([self.lhs.missing(), self.rhs.missing()]),
|
||||
**{
|
||||
"then": FALSE,
|
||||
"else": EsScript(type=BOOLEAN, expr=script, frum=self, schema=schema),
|
||||
}
|
||||
)
|
||||
.partial_eval()
|
||||
.to_es_script(schema)
|
||||
)
|
||||
output = WhenOp(
|
||||
OrOp([self.lhs.missing(), self.rhs.missing()]),
|
||||
**{
|
||||
"then": FALSE,
|
||||
"else": EsScript(type=BOOLEAN, expr=script, frum=self, schema=schema),
|
||||
}
|
||||
).partial_eval().to_es_script(schema)
|
||||
return output
|
||||
|
||||
|
||||
|
@ -126,32 +116,24 @@ def _multi_to_es_script(self, schema, not_null=False, boolean=False, many=True):
|
|||
+ "))"
|
||||
for t in self.terms
|
||||
)
|
||||
return (
|
||||
WhenOp(
|
||||
AndOp([t.missing() for t in self.terms]),
|
||||
**{
|
||||
"then": self.default,
|
||||
"else": EsScript(type=NUMBER, expr=calc, frum=self, schema=schema),
|
||||
}
|
||||
)
|
||||
.partial_eval()
|
||||
.to_es_script(schema)
|
||||
)
|
||||
return WhenOp(
|
||||
AndOp([t.missing() for t in self.terms]),
|
||||
**{
|
||||
"then": self.default,
|
||||
"else": EsScript(type=NUMBER, expr=calc, frum=self, schema=schema),
|
||||
}
|
||||
).partial_eval().to_es_script(schema)
|
||||
else:
|
||||
calc = op.join(
|
||||
"(" + NumberOp(t).to_es_script(schema).expr + ")" for t in self.terms
|
||||
)
|
||||
return (
|
||||
WhenOp(
|
||||
OrOp([t.missing() for t in self.terms]),
|
||||
**{
|
||||
"then": self.default,
|
||||
"else": EsScript(type=NUMBER, expr=calc, frum=self, schema=schema),
|
||||
}
|
||||
)
|
||||
.partial_eval()
|
||||
.to_es_script(schema)
|
||||
)
|
||||
return WhenOp(
|
||||
OrOp([t.missing() for t in self.terms]),
|
||||
**{
|
||||
"then": self.default,
|
||||
"else": EsScript(type=NUMBER, expr=calc, frum=self, schema=schema),
|
||||
}
|
||||
).partial_eval().to_es_script(schema)
|
||||
|
||||
|
||||
Painless = Language("Painless")
|
||||
|
@ -181,4 +163,3 @@ _painless_operators = {
|
|||
empty_string_script = EsScript(
|
||||
miss=TRUE, type=STRING, expr='""', frum=NULL, schema=Null
|
||||
)
|
||||
|
||||
|
|
|
@ -26,22 +26,20 @@ class BasicEqOp(BasicEqOp_):
|
|||
|
||||
if lhs.many:
|
||||
if rhs.many:
|
||||
return AndOp(
|
||||
[
|
||||
EsScript(
|
||||
type=BOOLEAN,
|
||||
expr="(" + lhs.expr + ").size()==(" + rhs.expr + ").size()",
|
||||
frum=self,
|
||||
schema=schema,
|
||||
),
|
||||
EsScript(
|
||||
type=BOOLEAN,
|
||||
expr="(" + rhs.expr + ").containsAll(" + lhs.expr + ")",
|
||||
frum=self,
|
||||
schema=schema,
|
||||
),
|
||||
]
|
||||
).to_es_script(schema)
|
||||
return AndOp([
|
||||
EsScript(
|
||||
type=BOOLEAN,
|
||||
expr="(" + lhs.expr + ").size()==(" + rhs.expr + ").size()",
|
||||
frum=self,
|
||||
schema=schema,
|
||||
),
|
||||
EsScript(
|
||||
type=BOOLEAN,
|
||||
expr="(" + rhs.expr + ").containsAll(" + lhs.expr + ")",
|
||||
frum=self,
|
||||
schema=schema,
|
||||
),
|
||||
]).to_es_script(schema)
|
||||
else:
|
||||
if lhs.type == BOOLEAN:
|
||||
if is_literal(simple_rhs) and simple_rhs.value in ("F", False):
|
||||
|
|
|
@ -23,22 +23,18 @@ class BooleanOp(BooleanOp_):
|
|||
except Exception as e:
|
||||
raise e
|
||||
if value.many:
|
||||
return BooleanOp(
|
||||
EsScript(
|
||||
miss=value.miss,
|
||||
type=value.type,
|
||||
expr="(" + value.expr + ")[0]",
|
||||
frum=value.frum,
|
||||
schema=schema,
|
||||
)
|
||||
).to_es_script(schema)
|
||||
return BooleanOp(EsScript(
|
||||
miss=value.miss,
|
||||
type=value.type,
|
||||
expr="(" + value.expr + ")[0]",
|
||||
frum=value.frum,
|
||||
schema=schema,
|
||||
)).to_es_script(schema)
|
||||
elif value.type == BOOLEAN:
|
||||
miss = value.miss
|
||||
value.miss = FALSE
|
||||
return (
|
||||
WhenOp(miss, **{"then": FALSE, "else": value})
|
||||
.partial_eval()
|
||||
.to_es_script(schema)
|
||||
)
|
||||
return WhenOp(
|
||||
miss, **{"then": FALSE, "else": value}
|
||||
).partial_eval().to_es_script(schema)
|
||||
else:
|
||||
return NotOp(value.miss).partial_eval().to_es_script(schema)
|
||||
|
|
|
@ -18,9 +18,7 @@ class CaseOp(CaseOp_):
|
|||
def to_es_script(self, schema, not_null=False, boolean=False, many=True):
|
||||
acc = Painless[self.whens[-1]].partial_eval().to_es_script(schema)
|
||||
for w in reversed(self.whens[0:-1]):
|
||||
acc = (
|
||||
WhenOp(w.when, **{"then": w.then, "else": acc})
|
||||
.partial_eval()
|
||||
.to_es_script(schema)
|
||||
)
|
||||
acc = WhenOp(
|
||||
w.when, **{"then": w.then, "else": acc}
|
||||
).partial_eval().to_es_script(schema)
|
||||
return acc
|
||||
|
|
|
@ -30,18 +30,12 @@ class DivOp(DivOp_):
|
|||
+ ")"
|
||||
)
|
||||
|
||||
output = (
|
||||
WhenOp(
|
||||
OrOp([lhs.missing(), rhs.missing(), EqOp([rhs, ZERO])]),
|
||||
**{
|
||||
"then": self.default,
|
||||
"else": EsScript(
|
||||
type=NUMBER, expr=script, frum=self, schema=schema
|
||||
),
|
||||
}
|
||||
)
|
||||
.partial_eval()
|
||||
.to_es_script(schema)
|
||||
)
|
||||
output = WhenOp(
|
||||
OrOp([lhs.missing(), rhs.missing(), EqOp([rhs, ZERO])]),
|
||||
**{
|
||||
"then": self.default,
|
||||
"else": EsScript(type=NUMBER, expr=script, frum=self, schema=schema),
|
||||
}
|
||||
).partial_eval().to_es_script(schema)
|
||||
|
||||
return output
|
||||
|
|
|
@ -17,14 +17,8 @@ from jx_elasticsearch.es52.painless.when_op import WhenOp
|
|||
|
||||
class EqOp(EqOp_):
|
||||
def to_es_script(self, schema, not_null=False, boolean=False, many=True):
|
||||
return (
|
||||
CaseOp(
|
||||
[
|
||||
WhenOp(self.lhs.missing(), **{"then": self.rhs.missing()}),
|
||||
WhenOp(self.rhs.missing(), **{"then": FALSE}),
|
||||
BasicEqOp([self.lhs, self.rhs]),
|
||||
]
|
||||
)
|
||||
.partial_eval()
|
||||
.to_es_script(schema)
|
||||
)
|
||||
return CaseOp([
|
||||
WhenOp(self.lhs.missing(), **{"then": self.rhs.missing()}),
|
||||
WhenOp(self.rhs.missing(), **{"then": FALSE}),
|
||||
BasicEqOp([self.lhs, self.rhs]),
|
||||
]).partial_eval().to_es_script(schema)
|
||||
|
|
|
@ -102,5 +102,3 @@ def box(script):
|
|||
|
||||
def es_script(term):
|
||||
return dict_to_data({"script": {"lang": "painless", "source": term}})
|
||||
|
||||
|
||||
|
|
|
@ -22,4 +22,3 @@ def to_es_script(self, schema, not_null=False, boolean=False, many=True):
|
|||
|
||||
|
||||
false_script = EsScript(type=BOOLEAN, expr="false", frum=FALSE, schema=Null)
|
||||
|
||||
|
|
|
@ -22,40 +22,32 @@ from jx_elasticsearch.es52.painless.when_op import WhenOp
|
|||
class FindOp(FindOp_):
|
||||
@simplified
|
||||
def partial_eval(self):
|
||||
index = self.lang[
|
||||
BasicIndexOfOp([self.value, self.find, self.start])
|
||||
].partial_eval()
|
||||
index = self.lang[BasicIndexOfOp([
|
||||
self.value,
|
||||
self.find,
|
||||
self.start,
|
||||
])].partial_eval()
|
||||
|
||||
output = self.lang[
|
||||
WhenOp(
|
||||
OrOp(
|
||||
[
|
||||
self.value.missing(),
|
||||
self.find.missing(),
|
||||
BasicEqOp([index, Literal(-1)]),
|
||||
]
|
||||
),
|
||||
**{"then": self.default, "else": index}
|
||||
)
|
||||
].partial_eval()
|
||||
output = self.lang[WhenOp(
|
||||
OrOp([
|
||||
self.value.missing(),
|
||||
self.find.missing(),
|
||||
BasicEqOp([index, Literal(-1)]),
|
||||
]),
|
||||
**{"then": self.default, "else": index}
|
||||
)].partial_eval()
|
||||
return output
|
||||
|
||||
def missing(self):
|
||||
output = AndOp(
|
||||
[
|
||||
self.default.missing(),
|
||||
OrOp(
|
||||
[
|
||||
self.value.missing(),
|
||||
self.find.missing(),
|
||||
EqOp(
|
||||
[
|
||||
BasicIndexOfOp([self.value, self.find, self.start]),
|
||||
Literal(-1),
|
||||
]
|
||||
),
|
||||
]
|
||||
),
|
||||
]
|
||||
).partial_eval()
|
||||
output = AndOp([
|
||||
self.default.missing(),
|
||||
OrOp([
|
||||
self.value.missing(),
|
||||
self.find.missing(),
|
||||
EqOp([
|
||||
BasicIndexOfOp([self.value, self.find, self.start]),
|
||||
Literal(-1),
|
||||
]),
|
||||
]),
|
||||
]).partial_eval()
|
||||
return output
|
||||
|
|
|
@ -19,7 +19,7 @@ from jx_elasticsearch.es52.painless import Painless
|
|||
from jx_elasticsearch.es52.painless.null_op import null_script
|
||||
from jx_elasticsearch.es52.painless.es_script import EsScript
|
||||
|
||||
CoalesceOp, Variable = [None]*2
|
||||
CoalesceOp, Variable = [None] * 2
|
||||
|
||||
|
||||
class FirstOp(FirstOp_):
|
||||
|
@ -41,12 +41,9 @@ class FirstOp(FirstOp_):
|
|||
term = Painless[self.term].to_es_script(schema)
|
||||
|
||||
if is_op(term.frum, CoalesceOp_):
|
||||
return CoalesceOp(
|
||||
[
|
||||
FirstOp(t.partial_eval().to_es_script(schema))
|
||||
for t in term.frum.terms
|
||||
]
|
||||
).to_es_script(schema)
|
||||
return CoalesceOp([
|
||||
FirstOp(t.partial_eval().to_es_script(schema)) for t in term.frum.terms
|
||||
]).to_es_script(schema)
|
||||
|
||||
if term.many:
|
||||
return EsScript(
|
||||
|
|
|
@ -37,17 +37,13 @@ class FloorOp(FloorOp_):
|
|||
+ ")"
|
||||
)
|
||||
|
||||
output = (
|
||||
WhenOp(
|
||||
OrOp([lhs.missing(), rhs.missing(), EqOp([self.rhs, ZERO])]),
|
||||
**{
|
||||
"then": self.default,
|
||||
"else": EsScript(
|
||||
type=NUMBER, expr=script, frum=self, miss=FALSE, schema=schema
|
||||
),
|
||||
}
|
||||
)
|
||||
.partial_eval()
|
||||
.to_es_script(schema)
|
||||
)
|
||||
output = WhenOp(
|
||||
OrOp([lhs.missing(), rhs.missing(), EqOp([self.rhs, ZERO])]),
|
||||
**{
|
||||
"then": self.default,
|
||||
"else": EsScript(
|
||||
type=NUMBER, expr=script, frum=self, miss=FALSE, schema=schema
|
||||
),
|
||||
}
|
||||
).partial_eval().to_es_script(schema)
|
||||
return output
|
||||
|
|
|
@ -19,15 +19,13 @@ class IntegerOp(IntegerOp_):
|
|||
def to_es_script(self, schema, not_null=False, boolean=False, many=True):
|
||||
value = Painless[self.term].to_es_script(schema)
|
||||
if value.many:
|
||||
return IntegerOp(
|
||||
EsScript(
|
||||
miss=value.missing(),
|
||||
type=value.type,
|
||||
expr="(" + value.expr + ")[0]",
|
||||
frum=value.frum,
|
||||
schema=schema,
|
||||
)
|
||||
).to_es_script(schema)
|
||||
return IntegerOp(EsScript(
|
||||
miss=value.missing(),
|
||||
type=value.type,
|
||||
expr="(" + value.expr + ")[0]",
|
||||
frum=value.frum,
|
||||
schema=schema,
|
||||
)).to_es_script(schema)
|
||||
elif value.type == BOOLEAN:
|
||||
return EsScript(
|
||||
miss=value.missing(),
|
||||
|
|
|
@ -28,21 +28,15 @@ class MissingOp(MissingOp_):
|
|||
return EsScript(type=BOOLEAN, expr="false", frum=self, schema=schema)
|
||||
else:
|
||||
columns = schema.leaves(self.expr.var)
|
||||
return (
|
||||
AndOp(
|
||||
[
|
||||
EsScript(
|
||||
type=BOOLEAN,
|
||||
expr="doc[" + quote(c.es_column) + "].empty",
|
||||
frum=self,
|
||||
schema=schema,
|
||||
)
|
||||
for c in columns
|
||||
]
|
||||
return AndOp([
|
||||
EsScript(
|
||||
type=BOOLEAN,
|
||||
expr="doc[" + quote(c.es_column) + "].empty",
|
||||
frum=self,
|
||||
schema=schema,
|
||||
)
|
||||
.partial_eval()
|
||||
.to_es_script(schema)
|
||||
)
|
||||
for c in columns
|
||||
]).partial_eval().to_es_script(schema)
|
||||
elif is_literal(self.expr):
|
||||
return self.expr.missing().to_es_script(schema)
|
||||
else:
|
||||
|
|
|
@ -18,14 +18,8 @@ from jx_elasticsearch.es52.painless.when_op import WhenOp
|
|||
|
||||
class NeOp(NeOp_):
|
||||
def to_es_script(self, schema, not_null=False, boolean=False, many=True):
|
||||
return (
|
||||
CaseOp(
|
||||
[
|
||||
WhenOp(self.lhs.missing(), **{"then": NotOp(self.rhs.missing())}),
|
||||
WhenOp(self.rhs.missing(), **{"then": NotOp(self.lhs.missing())}),
|
||||
NotOp(BasicEqOp([self.lhs, self.rhs])),
|
||||
]
|
||||
)
|
||||
.partial_eval()
|
||||
.to_es_script(schema)
|
||||
)
|
||||
return CaseOp([
|
||||
WhenOp(self.lhs.missing(), **{"then": NotOp(self.rhs.missing())}),
|
||||
WhenOp(self.rhs.missing(), **{"then": NotOp(self.lhs.missing())}),
|
||||
NotOp(BasicEqOp([self.lhs, self.rhs])),
|
||||
]).partial_eval().to_es_script(schema)
|
||||
|
|
|
@ -30,10 +30,5 @@ class NotOp(NotOp_):
|
|||
return null_script
|
||||
|
||||
return EsScript(
|
||||
type=BOOLEAN,
|
||||
expr="!("
|
||||
+ value.expr
|
||||
+ ")",
|
||||
frum=self,
|
||||
schema=schema,
|
||||
type=BOOLEAN, expr="!(" + value.expr + ")", frum=self, schema=schema,
|
||||
)
|
||||
|
|
|
@ -22,4 +22,3 @@ def to_es_script(self, schema, not_null=False, boolean=False, many=True):
|
|||
|
||||
|
||||
null_script = EsScript(type=IS_NULL, expr="null", frum=NULL, miss=TRUE, schema=Null)
|
||||
|
||||
|
|
|
@ -29,12 +29,10 @@ class NumberOp(NumberOp_):
|
|||
value = term.to_es_script(schema)
|
||||
|
||||
if is_op(value.frum, CoalesceOp_):
|
||||
return CoalesceOp(
|
||||
[
|
||||
NumberOp(t).partial_eval().to_es_script(schema)
|
||||
for t in value.frum.terms
|
||||
]
|
||||
).to_es_script(schema)
|
||||
return CoalesceOp([
|
||||
NumberOp(t).partial_eval().to_es_script(schema)
|
||||
for t in value.frum.terms
|
||||
]).to_es_script(schema)
|
||||
|
||||
if value is null_script:
|
||||
return Literal(0).to_es_script(schema)
|
||||
|
@ -90,4 +88,4 @@ class NumberOp(NumberOp_):
|
|||
)
|
||||
|
||||
|
||||
_utils.NumberOp=NumberOp
|
||||
_utils.NumberOp = NumberOp
|
||||
|
|
|
@ -30,4 +30,4 @@ class OrOp(OrOp_):
|
|||
)
|
||||
|
||||
|
||||
_utils.OrOp=OrOp
|
||||
_utils.OrOp = OrOp
|
||||
|
|
|
@ -25,9 +25,9 @@ class StringOp(StringOp_):
|
|||
value = term.to_es_script(schema)
|
||||
|
||||
if is_op(value.frum, CoalesceOp_):
|
||||
return CoalesceOp(
|
||||
[StringOp(t).partial_eval() for t in value.frum.terms]
|
||||
).to_es_script(schema)
|
||||
return CoalesceOp([
|
||||
StringOp(t).partial_eval() for t in value.frum.terms
|
||||
]).to_es_script(schema)
|
||||
|
||||
if value.miss is TRUE:
|
||||
return empty_string_script
|
||||
|
|
|
@ -22,9 +22,10 @@ class SuffixOp(SuffixOp_):
|
|||
return true_script
|
||||
else:
|
||||
return EsScript(
|
||||
miss=OrOp(
|
||||
[MissingOp(self.expr), MissingOp(self.suffix)]
|
||||
).partial_eval(),
|
||||
miss=OrOp([
|
||||
MissingOp(self.expr),
|
||||
MissingOp(self.suffix),
|
||||
]).partial_eval(),
|
||||
expr="("
|
||||
+ self.expr.to_es_script(schema)
|
||||
+ ").endsWith("
|
||||
|
|
|
@ -22,4 +22,3 @@ def to_es_script(self, schema, not_null=False, boolean=False, many=True):
|
|||
|
||||
|
||||
true_script = EsScript(type=BOOLEAN, expr="true", frum=TRUE, schema=Null)
|
||||
|
||||
|
|
|
@ -40,27 +40,23 @@ class Variable(Variable_):
|
|||
frum = Variable(c.es_column)
|
||||
q = quote(varname)
|
||||
if c.multi > 1:
|
||||
acc.append(
|
||||
EsScript(
|
||||
miss=frum.missing(),
|
||||
type=c.jx_type,
|
||||
expr="doc[" + q + "].values",
|
||||
frum=frum,
|
||||
schema=schema,
|
||||
many=True
|
||||
)
|
||||
)
|
||||
acc.append(EsScript(
|
||||
miss=frum.missing(),
|
||||
type=c.jx_type,
|
||||
expr="doc[" + q + "].values",
|
||||
frum=frum,
|
||||
schema=schema,
|
||||
many=True,
|
||||
))
|
||||
else:
|
||||
acc.append(
|
||||
EsScript(
|
||||
miss=frum.missing(),
|
||||
type=c.jx_type,
|
||||
expr="doc[" + q + "].value",
|
||||
frum=frum,
|
||||
schema=schema,
|
||||
many=False
|
||||
)
|
||||
)
|
||||
acc.append(EsScript(
|
||||
miss=frum.missing(),
|
||||
type=c.jx_type,
|
||||
expr="doc[" + q + "].value",
|
||||
frum=frum,
|
||||
schema=schema,
|
||||
many=False,
|
||||
))
|
||||
|
||||
if len(acc) == 0:
|
||||
return NULL.to_es_script(schema)
|
||||
|
@ -70,4 +66,4 @@ class Variable(Variable_):
|
|||
return CoalesceOp(acc).to_es_script(schema)
|
||||
|
||||
|
||||
first_op.Variable=Variable
|
||||
first_op.Variable = Variable
|
||||
|
|
|
@ -81,4 +81,4 @@ class WhenOp(WhenOp_):
|
|||
return self.partial_eval().to_es_script(schema)
|
||||
|
||||
|
||||
_utils.WhenOp=WhenOp
|
||||
_utils.WhenOp = WhenOp
|
||||
|
|
|
@ -9,15 +9,15 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from itertools import chain
|
||||
|
||||
from jx_base.domains import ALGEBRAIC
|
||||
from jx_base.expressions import LeavesOp, Variable, IDENTITY, TRUE
|
||||
from jx_base.language import is_op
|
||||
from jx_base.expressions.query_op import DEFAULT_LIMIT
|
||||
from jx_base.language import is_op
|
||||
from jx_elasticsearch.es52.expressions import (
|
||||
split_expression_by_path,
|
||||
NestedOp, ESSelectOp)
|
||||
NestedOp,
|
||||
ESSelectOp,
|
||||
)
|
||||
from jx_elasticsearch.es52.expressions.utils import setop_to_es_queries, pre_process
|
||||
from jx_elasticsearch.es52.painless import Painless
|
||||
from jx_elasticsearch.es52.set_format import set_formatters
|
||||
|
@ -36,7 +36,8 @@ from mo_dots import (
|
|||
unwrap,
|
||||
unwraplist,
|
||||
Null,
|
||||
list_to_data)
|
||||
list_to_data,
|
||||
)
|
||||
from mo_future import text
|
||||
from mo_json import NESTED, INTERNAL
|
||||
from mo_json.typed_encoder import decode_property, unnest_path, untype_path, untyped
|
||||
|
@ -51,12 +52,10 @@ def is_setop(es, query):
|
|||
select = listwrap(query.select)
|
||||
|
||||
if not query.edges:
|
||||
isDeep = (
|
||||
len(split_field(query.frum.name)) > 1
|
||||
) # LOOKING INTO NESTED WILL REQUIRE A SCRIPT
|
||||
simpleAgg = AND(
|
||||
[s.aggregate in ("count", "none") for s in select]
|
||||
) # CONVERTING esfilter DEFINED PARTS WILL REQUIRE SCRIPT
|
||||
isDeep = len(split_field(query.frum.name)) > 1 # LOOKING INTO NESTED WILL REQUIRE A SCRIPT
|
||||
simpleAgg = AND([
|
||||
s.aggregate in ("count", "none") for s in select
|
||||
]) # CONVERTING esfilter DEFINED PARTS WILL REQUIRE SCRIPT
|
||||
|
||||
# NO EDGES IMPLIES SIMPLER QUERIES: EITHER A SET OPERATION, OR RETURN SINGLE AGGREGATE
|
||||
if simpleAgg or isDeep:
|
||||
|
@ -96,32 +95,28 @@ def get_selects(query):
|
|||
)
|
||||
if c.jx_type == NESTED:
|
||||
get_select(".").get_source = True
|
||||
new_select.append(
|
||||
{
|
||||
"name": full_name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": literal_field(full_name),
|
||||
"index": put_index,
|
||||
"child": ".",
|
||||
},
|
||||
"pull": get_pull_source(c.es_column),
|
||||
}
|
||||
)
|
||||
new_select.append({
|
||||
"name": full_name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": literal_field(full_name),
|
||||
"index": put_index,
|
||||
"child": ".",
|
||||
},
|
||||
"pull": get_pull_source(c.es_column),
|
||||
})
|
||||
put_index += 1
|
||||
else:
|
||||
get_select(c.nested_path[0]).fields.append(c.es_column)
|
||||
new_select.append(
|
||||
{
|
||||
"name": full_name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": literal_field(full_name),
|
||||
"index": put_index,
|
||||
"child": ".",
|
||||
},
|
||||
}
|
||||
)
|
||||
new_select.append({
|
||||
"name": full_name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": literal_field(full_name),
|
||||
"index": put_index,
|
||||
"child": ".",
|
||||
},
|
||||
})
|
||||
put_index += 1
|
||||
elif is_op(select.value, Variable):
|
||||
s_column = select.value.var
|
||||
|
@ -129,14 +124,12 @@ def get_selects(query):
|
|||
if s_column == ".":
|
||||
# PULL ALL SOURCE
|
||||
get_select(".").get_source = True
|
||||
new_select.append(
|
||||
{
|
||||
"name": select.name,
|
||||
"value": select.value,
|
||||
"put": {"name": select.name, "index": put_index, "child": "."},
|
||||
"pull": get_pull_source("."),
|
||||
}
|
||||
)
|
||||
new_select.append({
|
||||
"name": select.name,
|
||||
"value": select.value,
|
||||
"put": {"name": select.name, "index": put_index, "child": "."},
|
||||
"pull": get_pull_source("."),
|
||||
})
|
||||
continue
|
||||
|
||||
leaves = schema.leaves(s_column) # LEAVES OF OBJECT
|
||||
|
@ -146,117 +139,103 @@ def get_selects(query):
|
|||
# PULL WHOLE NESTED ARRAYS
|
||||
get_select(".").get_source = True
|
||||
for c in leaves:
|
||||
if (
|
||||
len(c.nested_path) == 1
|
||||
): # NESTED PROPERTIES ARE IGNORED, CAPTURED BY THESE FIRST LEVEL PROPERTIES
|
||||
if len(c.nested_path) == 1: # NESTED PROPERTIES ARE IGNORED, CAPTURED BY THESE FIRST LEVEL PROPERTIES
|
||||
pre_child = join_field(
|
||||
decode_property(n) for n in split_field(c.name)
|
||||
)
|
||||
new_select.append(
|
||||
{
|
||||
new_select.append({
|
||||
"name": select.name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": select.name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": select.name,
|
||||
"index": put_index,
|
||||
"child": untype_path(
|
||||
relative_field(pre_child, s_column)
|
||||
),
|
||||
},
|
||||
"pull": get_pull_source(c.es_column),
|
||||
}
|
||||
)
|
||||
"index": put_index,
|
||||
"child": untype_path(relative_field(
|
||||
pre_child, s_column
|
||||
)),
|
||||
},
|
||||
"pull": get_pull_source(c.es_column),
|
||||
})
|
||||
else:
|
||||
# PULL ONLY WHAT'S NEEDED
|
||||
for c in leaves:
|
||||
c_nested_path = c.nested_path[0]
|
||||
if c_nested_path == ".":
|
||||
if c.es_column == "_id":
|
||||
new_select.append(
|
||||
{
|
||||
new_select.append({
|
||||
"name": select.name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": select.name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": select.name,
|
||||
"index": put_index,
|
||||
"child": ".",
|
||||
},
|
||||
"pull": lambda row: row._id,
|
||||
}
|
||||
)
|
||||
"index": put_index,
|
||||
"child": ".",
|
||||
},
|
||||
"pull": lambda row: row._id,
|
||||
})
|
||||
elif c.jx_type == NESTED:
|
||||
get_select(".").get_source = True
|
||||
pre_child = join_field(
|
||||
decode_property(n) for n in split_field(c.name)
|
||||
)
|
||||
new_select.append(
|
||||
{
|
||||
new_select.append({
|
||||
"name": select.name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": select.name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": select.name,
|
||||
"index": put_index,
|
||||
"child": untype_path(
|
||||
relative_field(pre_child, s_column)
|
||||
),
|
||||
},
|
||||
"pull": get_pull_source(c.es_column),
|
||||
}
|
||||
)
|
||||
"index": put_index,
|
||||
"child": untype_path(relative_field(
|
||||
pre_child, s_column
|
||||
)),
|
||||
},
|
||||
"pull": get_pull_source(c.es_column),
|
||||
})
|
||||
else:
|
||||
get_select(c_nested_path).fields.append(c.es_column)
|
||||
pre_child = join_field(
|
||||
decode_property(n) for n in split_field(c.name)
|
||||
)
|
||||
new_select.append(
|
||||
{
|
||||
new_select.append({
|
||||
"name": select.name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": select.name,
|
||||
"value": Variable(c.es_column),
|
||||
"put": {
|
||||
"name": select.name,
|
||||
"index": put_index,
|
||||
"child": untype_path(
|
||||
relative_field(pre_child, s_column)
|
||||
),
|
||||
},
|
||||
}
|
||||
)
|
||||
"index": put_index,
|
||||
"child": untype_path(relative_field(
|
||||
pre_child, s_column
|
||||
)),
|
||||
},
|
||||
})
|
||||
else:
|
||||
es_select = get_select(c_nested_path)
|
||||
es_select.fields.append(c.es_column)
|
||||
|
||||
child = relative_field(
|
||||
untype_path(
|
||||
relative_field(c.name, schema.query_path[0])
|
||||
),
|
||||
untype_path(relative_field(
|
||||
c.name, schema.query_path[0]
|
||||
)),
|
||||
s_column,
|
||||
)
|
||||
pull = accumulate_nested_doc(
|
||||
c_nested_path,
|
||||
Variable(
|
||||
relative_field(s_column, unnest_path(c_nested_path))
|
||||
),
|
||||
Variable(relative_field(
|
||||
s_column, unnest_path(c_nested_path)
|
||||
)),
|
||||
)
|
||||
new_select.append(
|
||||
{
|
||||
new_select.append({
|
||||
"name": select.name,
|
||||
"value": select.value,
|
||||
"put": {
|
||||
"name": select.name,
|
||||
"value": select.value,
|
||||
"put": {
|
||||
"name": select.name,
|
||||
"index": put_index,
|
||||
"child": child,
|
||||
},
|
||||
"pull": pull,
|
||||
}
|
||||
)
|
||||
"index": put_index,
|
||||
"child": child,
|
||||
},
|
||||
"pull": pull,
|
||||
})
|
||||
else:
|
||||
new_select.append(
|
||||
{
|
||||
"name": select.name,
|
||||
"value": Variable("$dummy"),
|
||||
"put": {"name": select.name, "index": put_index, "child": "."},
|
||||
}
|
||||
)
|
||||
new_select.append({
|
||||
"name": select.name,
|
||||
"value": Variable("$dummy"),
|
||||
"put": {"name": select.name, "index": put_index, "child": "."},
|
||||
})
|
||||
put_index += 1
|
||||
else:
|
||||
op, split_scripts = split_expression_by_path(
|
||||
|
@ -264,20 +243,14 @@ def get_selects(query):
|
|||
)
|
||||
for p, script in split_scripts.items():
|
||||
es_select = get_select(p)
|
||||
es_select.scripts[select.name] = {
|
||||
"script": text(
|
||||
Painless[script].partial_eval().to_es_script(schema)
|
||||
)
|
||||
}
|
||||
new_select.append(
|
||||
{
|
||||
"name": select.name,
|
||||
"pull": jx_expression_to_function(
|
||||
"fields." + literal_field(select.name)
|
||||
),
|
||||
"put": {"name": select.name, "index": put_index, "child": "."},
|
||||
}
|
||||
)
|
||||
es_select.scripts[select.name] = {"script": text(Painless[script].partial_eval().to_es_script(schema))}
|
||||
new_select.append({
|
||||
"name": select.name,
|
||||
"pull": jx_expression_to_function(
|
||||
"fields." + literal_field(select.name)
|
||||
),
|
||||
"put": {"name": select.name, "index": put_index, "child": "."},
|
||||
})
|
||||
put_index += 1
|
||||
for n in new_select:
|
||||
if n.pull:
|
||||
|
@ -288,9 +261,9 @@ def get_selects(query):
|
|||
elif n.value == "_id":
|
||||
n.pull = jx_expression_to_function("_id")
|
||||
else:
|
||||
n.pull = jx_expression_to_function(
|
||||
concat_field("fields", literal_field(n.value.var))
|
||||
)
|
||||
n.pull = jx_expression_to_function(concat_field(
|
||||
"fields", literal_field(n.value.var)
|
||||
))
|
||||
else:
|
||||
Log.error("Do not know what to do")
|
||||
return new_select, split_select
|
||||
|
@ -376,20 +349,16 @@ def get_pull_source(es_column):
|
|||
|
||||
|
||||
def get_pull_stats():
|
||||
return jx_expression_to_function(
|
||||
{
|
||||
"select": [
|
||||
{"name": "count", "value": "count"},
|
||||
{"name": "sum", "value": "sum"},
|
||||
{"name": "min", "value": "min"},
|
||||
{"name": "max", "value": "max"},
|
||||
{"name": "avg", "value": "avg"},
|
||||
{"name": "sos", "value": "sum_of_squares"},
|
||||
{"name": "std", "value": "std_deviation"},
|
||||
{"name": "var", "value": "variance"},
|
||||
]
|
||||
}
|
||||
)
|
||||
return jx_expression_to_function({"select": [
|
||||
{"name": "count", "value": "count"},
|
||||
{"name": "sum", "value": "sum"},
|
||||
{"name": "min", "value": "min"},
|
||||
{"name": "max", "value": "max"},
|
||||
{"name": "avg", "value": "avg"},
|
||||
{"name": "sos", "value": "sum_of_squares"},
|
||||
{"name": "std", "value": "std_deviation"},
|
||||
{"name": "var", "value": "variance"},
|
||||
]})
|
||||
|
||||
|
||||
def es_query_proto(selects, op, wheres, schema):
|
||||
|
@ -408,6 +377,8 @@ def es_query_proto(selects, op, wheres, schema):
|
|||
es_where = op([es_query, where])
|
||||
es_query = NestedOp(path=Variable(p), query=es_where, select=select)
|
||||
return es_query.partial_eval().to_es(schema)
|
||||
|
||||
|
||||
#
|
||||
# expected = {
|
||||
# "_source": False,
|
||||
|
|
|
@ -49,7 +49,8 @@ from mo_dots import (
|
|||
tail_field,
|
||||
listwrap,
|
||||
unwrap,
|
||||
to_data)
|
||||
to_data,
|
||||
)
|
||||
from mo_dots.lists import last
|
||||
from mo_future import first, long, none_type, text
|
||||
from mo_json import BOOLEAN, EXISTS, OBJECT, INTERNAL, STRUCT
|
||||
|
@ -175,8 +176,8 @@ class ElasticsearchMetadata(Namespace):
|
|||
|
||||
# CONFIRM ALL COLUMNS ARE SAME, FIX IF NOT
|
||||
dirty = 0
|
||||
all_comparisions = list(jx.pairwise(props)) + list(
|
||||
jx.pairwise(jx.reverse(props))
|
||||
all_comparisions = (
|
||||
list(jx.pairwise(props)) + list(jx.pairwise(jx.reverse(props)))
|
||||
)
|
||||
# NOTICE THE SAME (index, type, properties) TRIPLE FROM ABOVE
|
||||
for (i1, t1, p1), (i2, t2, p2) in all_comparisions:
|
||||
|
@ -195,9 +196,10 @@ class ElasticsearchMetadata(Namespace):
|
|||
try:
|
||||
# TODO: THIS TAKES A LONG TIME, CACHE IN THE COLUMN METADATA?
|
||||
# MAY NOT WORK - COLUMN METADATA IS FOR ALIASES, NOT INDEXES
|
||||
result = i.search(
|
||||
{"query": {"exists": {"field": name}}, "size": 0}
|
||||
)
|
||||
result = i.search({
|
||||
"query": {"exists": {"field": name}},
|
||||
"size": 0,
|
||||
})
|
||||
if result.hits.total > 0:
|
||||
dirty += 1
|
||||
i1.add_property(name, es_details)
|
||||
|
@ -213,9 +215,9 @@ class ElasticsearchMetadata(Namespace):
|
|||
es_column=name,
|
||||
es_index=alias,
|
||||
es_type=es_details.type,
|
||||
jx_type=es_type_to_json_type[
|
||||
coalesce(es_details.type, "object")
|
||||
],
|
||||
jx_type=es_type_to_json_type[coalesce(
|
||||
es_details.type, "object"
|
||||
)],
|
||||
nested_path=get_nested_path(name),
|
||||
count=0,
|
||||
cardinality=0, # MARKED AS DELETED
|
||||
|
@ -258,7 +260,11 @@ class ElasticsearchMetadata(Namespace):
|
|||
# BACKLOG CURRENTLY IN THE todo QUEUE
|
||||
self.todo.push_all(rescan)
|
||||
self.todo_priority = True
|
||||
DEBUG and Log.note("asked for {{num}} columns to be rescanned for {{alias}}", num=len(rescan), alias=alias)
|
||||
DEBUG and Log.note(
|
||||
"asked for {{num}} columns to be rescanned for {{alias}}",
|
||||
num=len(rescan),
|
||||
alias=alias,
|
||||
)
|
||||
return columns
|
||||
|
||||
def _parse_properties(self, alias, mapping):
|
||||
|
@ -274,7 +280,8 @@ class ElasticsearchMetadata(Namespace):
|
|||
)
|
||||
if DEBUG and any(c.cardinality == 0 and c.name != "_id" for c in abs_columns):
|
||||
Log.note(
|
||||
"Some columns are always missing in {{url}} {{index|quote}} table:\n{{names}}",
|
||||
"Some columns are always missing in {{url}} {{index|quote}}"
|
||||
" table:\n{{names}}",
|
||||
url=self.es_cluster.url,
|
||||
index=alias,
|
||||
names=[
|
||||
|
@ -317,14 +324,9 @@ class ElasticsearchMetadata(Namespace):
|
|||
return canonicals
|
||||
|
||||
def query(self, _query):
|
||||
return self.meta.columns.query(
|
||||
QueryOp(
|
||||
set_default(
|
||||
{"from": self.meta.columns, "sort": ["table", "name"]},
|
||||
_query.__data__(),
|
||||
)
|
||||
)
|
||||
)
|
||||
return self.meta.columns.query(QueryOp(set_default(
|
||||
{"from": self.meta.columns, "sort": ["table", "name"]}, _query.__data__(),
|
||||
)))
|
||||
|
||||
def _find_alias(self, name):
|
||||
indices = self.es_cluster.get_metadata().indices
|
||||
|
@ -412,7 +414,8 @@ class ElasticsearchMetadata(Namespace):
|
|||
)
|
||||
else:
|
||||
Log.note(
|
||||
"waiting for columns to update by {{timestamp}}; {{columns|json}}",
|
||||
"waiting for columns to update by {{timestamp}};"
|
||||
" {{columns|json}}",
|
||||
timestamp=after,
|
||||
columns=[
|
||||
concat_field(c.es_index, c.es_column)
|
||||
|
@ -440,30 +443,24 @@ class ElasticsearchMetadata(Namespace):
|
|||
Log.error("not supported")
|
||||
try:
|
||||
if column.es_index == META_TABLES_NAME:
|
||||
partitions = jx.sort(
|
||||
[
|
||||
g[column.es_column]
|
||||
for g, _ in jx.groupby(self.meta.tables, column.es_column)
|
||||
if g[column.es_column] != None
|
||||
]
|
||||
)
|
||||
self.meta.columns.update(
|
||||
{
|
||||
"set": {
|
||||
"partitions": partitions,
|
||||
"count": len(self.meta.tables),
|
||||
"cardinality": len(partitions),
|
||||
"multi": 1,
|
||||
"last_updated": now,
|
||||
},
|
||||
"where": {
|
||||
"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
partitions = jx.sort([
|
||||
g[column.es_column]
|
||||
for g, _ in jx.groupby(self.meta.tables, column.es_column)
|
||||
if g[column.es_column] != None
|
||||
])
|
||||
self.meta.columns.update({
|
||||
"set": {
|
||||
"partitions": partitions,
|
||||
"count": len(self.meta.tables),
|
||||
"cardinality": len(partitions),
|
||||
"multi": 1,
|
||||
"last_updated": now,
|
||||
},
|
||||
"where": {"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}},
|
||||
})
|
||||
return
|
||||
if column.es_index == META_COLUMNS_NAME:
|
||||
DEBUG and Log.note(
|
||||
|
@ -519,81 +516,55 @@ class ElasticsearchMetadata(Namespace):
|
|||
field=column.es_column,
|
||||
num=cardinality,
|
||||
)
|
||||
self.meta.columns.update(
|
||||
{
|
||||
"set": {
|
||||
"count": count,
|
||||
"cardinality": cardinality,
|
||||
"partitions": [False, True],
|
||||
"multi": 1,
|
||||
"last_updated": now,
|
||||
},
|
||||
"clear": ["partitions"],
|
||||
"where": {
|
||||
"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
self.meta.columns.update({
|
||||
"set": {
|
||||
"count": count,
|
||||
"cardinality": cardinality,
|
||||
"partitions": [False, True],
|
||||
"multi": 1,
|
||||
"last_updated": now,
|
||||
},
|
||||
"clear": ["partitions"],
|
||||
"where": {"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}},
|
||||
})
|
||||
return
|
||||
elif "_covered." in column.es_column or "_uncovered." in column.es_column:
|
||||
# DO NOT EVEN LOOK AT THESE COLUMNS
|
||||
self.meta.columns.update(
|
||||
{
|
||||
"set": {
|
||||
"count": 1000 * 1000,
|
||||
"cardinality": 10000,
|
||||
"multi": 10000,
|
||||
"last_updated": now,
|
||||
},
|
||||
"clear": ["partitions"],
|
||||
"where": {
|
||||
"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
self.meta.columns.update({
|
||||
"set": {
|
||||
"count": 1000 * 1000,
|
||||
"cardinality": 10000,
|
||||
"multi": 10000,
|
||||
"last_updated": now,
|
||||
},
|
||||
"clear": ["partitions"],
|
||||
"where": {"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}},
|
||||
})
|
||||
return
|
||||
else:
|
||||
es_query = {
|
||||
"aggs": {
|
||||
"count": _counting_query(column),
|
||||
"_filter": {
|
||||
"aggs": {
|
||||
"multi": {
|
||||
"max": {
|
||||
"script": "doc["
|
||||
+ quote(column.es_column)
|
||||
+ "].values.size()"
|
||||
}
|
||||
}
|
||||
},
|
||||
"filter": {
|
||||
"bool": {
|
||||
"should": [
|
||||
{
|
||||
"range": {
|
||||
"etl.timestamp.~n~": {
|
||||
"gte": (Date.today() - WEEK)
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"bool": {
|
||||
"must_not": {
|
||||
"exists": {
|
||||
"field": "etl.timestamp.~n~"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
]
|
||||
}
|
||||
},
|
||||
"aggs": {"multi": {"max": {
|
||||
"script": "doc["
|
||||
+ quote(column.es_column)
|
||||
+ "].values.size()"
|
||||
}}},
|
||||
"filter": {"bool": {"should": [
|
||||
{"range": {"etl.timestamp.~n~": {"gte": (
|
||||
Date.today() - WEEK
|
||||
)}}},
|
||||
{"bool": {"must_not": {"exists": {
|
||||
"field": "etl.timestamp.~n~"
|
||||
}}}},
|
||||
]}},
|
||||
},
|
||||
},
|
||||
"size": 0,
|
||||
|
@ -616,23 +587,19 @@ class ElasticsearchMetadata(Namespace):
|
|||
query = Data(size=0)
|
||||
|
||||
if column.es_column == "_id":
|
||||
self.meta.columns.update(
|
||||
{
|
||||
"set": {
|
||||
"count": cardinality,
|
||||
"cardinality": cardinality,
|
||||
"multi": 1,
|
||||
"last_updated": now,
|
||||
},
|
||||
"clear": ["partitions"],
|
||||
"where": {
|
||||
"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
self.meta.columns.update({
|
||||
"set": {
|
||||
"count": cardinality,
|
||||
"cardinality": cardinality,
|
||||
"multi": 1,
|
||||
"last_updated": now,
|
||||
},
|
||||
"clear": ["partitions"],
|
||||
"where": {"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}},
|
||||
})
|
||||
return
|
||||
elif (
|
||||
cardinality > 1000
|
||||
|
@ -645,23 +612,19 @@ class ElasticsearchMetadata(Namespace):
|
|||
field=column.es_column,
|
||||
num=cardinality,
|
||||
)
|
||||
self.meta.columns.update(
|
||||
{
|
||||
"set": {
|
||||
"count": count,
|
||||
"cardinality": cardinality,
|
||||
"multi": multi,
|
||||
"last_updated": now,
|
||||
},
|
||||
"clear": ["partitions"],
|
||||
"where": {
|
||||
"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
self.meta.columns.update({
|
||||
"set": {
|
||||
"count": count,
|
||||
"cardinality": cardinality,
|
||||
"multi": multi,
|
||||
"last_updated": now,
|
||||
},
|
||||
"clear": ["partitions"],
|
||||
"where": {"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}},
|
||||
})
|
||||
return
|
||||
elif column.es_type in elasticsearch.ES_NUMERIC_TYPES and cardinality > 30:
|
||||
DEBUG and Log.note(
|
||||
|
@ -670,23 +633,19 @@ class ElasticsearchMetadata(Namespace):
|
|||
field=column.es_column,
|
||||
num=cardinality,
|
||||
)
|
||||
self.meta.columns.update(
|
||||
{
|
||||
"set": {
|
||||
"count": count,
|
||||
"cardinality": cardinality,
|
||||
"multi": multi,
|
||||
"last_updated": now,
|
||||
},
|
||||
"clear": ["partitions"],
|
||||
"where": {
|
||||
"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
self.meta.columns.update({
|
||||
"set": {
|
||||
"count": count,
|
||||
"cardinality": cardinality,
|
||||
"multi": multi,
|
||||
"last_updated": now,
|
||||
},
|
||||
"clear": ["partitions"],
|
||||
"where": {"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}},
|
||||
})
|
||||
return
|
||||
elif len(column.nested_path) != 1:
|
||||
query.aggs["_"] = {
|
||||
|
@ -696,9 +655,10 @@ class ElasticsearchMetadata(Namespace):
|
|||
elif cardinality == 0: # WHEN DOES THIS HAPPEN?
|
||||
query.aggs["_"] = {"terms": {"field": column.es_column}}
|
||||
else:
|
||||
query.aggs["_"] = {
|
||||
"terms": {"field": column.es_column, "size": cardinality}
|
||||
}
|
||||
query.aggs["_"] = {"terms": {
|
||||
"field": column.es_column,
|
||||
"size": cardinality,
|
||||
}}
|
||||
|
||||
result = self.es_cluster.post("/" + es_index + "/_search", data=query)
|
||||
|
||||
|
@ -709,29 +669,26 @@ class ElasticsearchMetadata(Namespace):
|
|||
parts = jx.sort(aggs.buckets.key)
|
||||
|
||||
DEBUG and Log.note(
|
||||
"update metadata for {{column.es_index}}.{{column.es_column}} (id={{id}}) card={{card}} at {{time}}",
|
||||
"update metadata for {{column.es_index}}.{{column.es_column}}"
|
||||
" (id={{id}}) card={{card}} at {{time}}",
|
||||
id=id(column),
|
||||
column=column,
|
||||
card=cardinality,
|
||||
time=now,
|
||||
)
|
||||
self.meta.columns.update(
|
||||
{
|
||||
"set": {
|
||||
"count": count,
|
||||
"cardinality": cardinality,
|
||||
"multi": multi,
|
||||
"partitions": parts,
|
||||
"last_updated": now,
|
||||
},
|
||||
"where": {
|
||||
"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
self.meta.columns.update({
|
||||
"set": {
|
||||
"count": count,
|
||||
"cardinality": cardinality,
|
||||
"multi": multi,
|
||||
"partitions": parts,
|
||||
"last_updated": now,
|
||||
},
|
||||
"where": {"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}},
|
||||
})
|
||||
META_COLUMNS_DESC.last_updated = now
|
||||
except Exception as e:
|
||||
# CAN NOT IMPORT: THE TEST MODULES SETS UP LOGGING
|
||||
|
@ -759,18 +716,14 @@ class ElasticsearchMetadata(Namespace):
|
|||
cause=e,
|
||||
)
|
||||
else:
|
||||
self.meta.columns.update(
|
||||
{
|
||||
"set": {"last_updated": now},
|
||||
"clear": ["count", "cardinality", "multi", "partitions"],
|
||||
"where": {
|
||||
"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
self.meta.columns.update({
|
||||
"set": {"last_updated": now},
|
||||
"clear": ["count", "cardinality", "multi", "partitions"],
|
||||
"where": {"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}},
|
||||
})
|
||||
Log.warning(
|
||||
"Could not get {{col.es_index}}.{{col.es_column}} info",
|
||||
col=column,
|
||||
|
@ -800,7 +753,8 @@ class ElasticsearchMetadata(Namespace):
|
|||
Log.note("{{num}} old columns", num=len(old_columns))
|
||||
else:
|
||||
Log.note(
|
||||
"Old columns {{names|json}} last updated {{dates|json}}",
|
||||
"Old columns {{names|json}} last updated"
|
||||
" {{dates|json}}",
|
||||
names=to_data(old_columns).es_column,
|
||||
dates=[
|
||||
Date(t).format()
|
||||
|
@ -810,7 +764,11 @@ class ElasticsearchMetadata(Namespace):
|
|||
else:
|
||||
Log.note("no more metatdata to update")
|
||||
|
||||
with Timer("Review {{num}} old columns", param={"num": len(old_columns)}, verbose=DEBUG):
|
||||
with Timer(
|
||||
"Review {{num}} old columns",
|
||||
param={"num": len(old_columns)},
|
||||
verbose=DEBUG,
|
||||
):
|
||||
for g, index_columns in jx.groupby(old_columns, "es_index"):
|
||||
if self.todo_priority:
|
||||
# WE GOT OTHER WORK TO DO
|
||||
|
@ -822,9 +780,14 @@ class ElasticsearchMetadata(Namespace):
|
|||
self.get_columns(g.es_index)
|
||||
except Exception as cause:
|
||||
if TABLE_DOES_NOT_EXIST in cause:
|
||||
DEBUG and Log.note("removing {{index}} from metadata", index=g.es_index)
|
||||
DEBUG and Log.note(
|
||||
"removing {{index}} from metadata",
|
||||
index=g.es_index,
|
||||
)
|
||||
self.meta.columns.clear(g.es_index, after=now)
|
||||
self.meta.columns.delete_from_es(g.es_index, after=now)
|
||||
self.meta.columns.delete_from_es(
|
||||
g.es_index, after=now
|
||||
)
|
||||
continue
|
||||
Log.warning(
|
||||
"problem getting column info on {{table}}",
|
||||
|
@ -858,7 +821,8 @@ class ElasticsearchMetadata(Namespace):
|
|||
]
|
||||
if column.es_index not in all_tables:
|
||||
DEBUG and Log.note(
|
||||
"{{column.es_column}} of {{column.es_index}} does not exist",
|
||||
"{{column.es_column}} of {{column.es_index}} does not"
|
||||
" exist",
|
||||
column=column,
|
||||
)
|
||||
self.meta.columns.clear(column.es_index, after=now)
|
||||
|
@ -959,18 +923,14 @@ class ElasticsearchMetadata(Namespace):
|
|||
continue
|
||||
|
||||
# SET THE REST TO UNKNOWN
|
||||
self.meta.columns.update(
|
||||
{
|
||||
"set": {"last_updated": Date.now()},
|
||||
"clear": ["count", "cardinality", "multi", "partitions"],
|
||||
"where": {
|
||||
"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
self.meta.columns.update({
|
||||
"set": {"last_updated": Date.now()},
|
||||
"clear": ["count", "cardinality", "multi", "partitions"],
|
||||
"where": {"eq": {
|
||||
"es_index": column.es_index,
|
||||
"es_column": column.es_column,
|
||||
}},
|
||||
})
|
||||
|
||||
def get_table(self, name):
|
||||
if name == META_COLUMNS_NAME:
|
||||
|
@ -1022,11 +982,9 @@ class Snowflake(object):
|
|||
"""
|
||||
RETURN A LIST OF ALL SCHEMA'S IN DEPTH-FIRST TOPOLOGICAL ORDER
|
||||
"""
|
||||
return list(
|
||||
reversed(
|
||||
sorted(p[0] for p in self.namespace.alias_to_query_paths.get(self.name))
|
||||
)
|
||||
)
|
||||
return list(reversed(sorted(
|
||||
p[0] for p in self.namespace.alias_to_query_paths.get(self.name)
|
||||
)))
|
||||
|
||||
@property
|
||||
def columns(self):
|
||||
|
@ -1044,11 +1002,14 @@ class Schema(jx_base.Schema):
|
|||
def __init__(self, query_path, snowflake):
|
||||
if not is_list(snowflake.query_paths[0]):
|
||||
Log.error(
|
||||
"Snowflake query paths should be a list of string tuples (well, technically, a list of lists of strings)"
|
||||
"Snowflake query paths should be a list of string tuples (well,"
|
||||
" technically, a list of lists of strings)"
|
||||
)
|
||||
self.snowflake = snowflake
|
||||
try:
|
||||
path = first(p for p in snowflake.query_paths if untype_path(p[0]) == query_path)
|
||||
path = first(
|
||||
p for p in snowflake.query_paths if untype_path(p[0]) == query_path
|
||||
)
|
||||
if path:
|
||||
# WE DO NOT NEED TO LOOK INTO MULTI-VALUED FIELDS AS A TABLE
|
||||
self.multi = None
|
||||
|
@ -1056,24 +1017,22 @@ class Schema(jx_base.Schema):
|
|||
else:
|
||||
# LOOK INTO A SPECIFIC MULTI VALUED COLUMN
|
||||
try:
|
||||
self.multi = first(
|
||||
[
|
||||
c
|
||||
for c in self.snowflake.columns
|
||||
if (
|
||||
untype_path(c.name) == query_path
|
||||
and (
|
||||
c.multi > 1
|
||||
or last(split_field(c.es_column))
|
||||
== NESTED_TYPE # THIS IS TO COMPENSATE FOR BAD c.multi
|
||||
)
|
||||
self.multi = first([
|
||||
c
|
||||
for c in self.snowflake.columns
|
||||
if (
|
||||
untype_path(c.name) == query_path
|
||||
and (
|
||||
c.multi > 1
|
||||
or last(split_field(c.es_column))
|
||||
== NESTED_TYPE # THIS IS TO COMPENSATE FOR BAD c.multi
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
])
|
||||
if not self.multi:
|
||||
Log.error("expecting a nested column")
|
||||
self.query_path = [self.multi.name] + unwrap(
|
||||
listwrap(self.multi.nested_path)
|
||||
self.query_path = (
|
||||
[self.multi.name] + unwrap(listwrap(self.multi.nested_path))
|
||||
)
|
||||
except Exception as e:
|
||||
# PROBLEM WITH METADATA UPDATE
|
||||
|
@ -1081,7 +1040,8 @@ class Schema(jx_base.Schema):
|
|||
self.query_path = (query_path, ".")
|
||||
|
||||
Log.warning(
|
||||
"Problem getting query path {{path|quote}} in snowflake {{sf|quote}}",
|
||||
"Problem getting query path {{path|quote}} in snowflake"
|
||||
" {{sf|quote}}",
|
||||
path=query_path,
|
||||
sf=snowflake.name,
|
||||
cause=e,
|
||||
|
@ -1244,16 +1204,12 @@ def _counting_query(c):
|
|||
elif len(c.nested_path) != 1:
|
||||
return {
|
||||
"nested": {"path": c.nested_path[0]}, # FIRST ONE IS LONGEST
|
||||
"aggs": {
|
||||
"_nested": {
|
||||
"cardinality": {
|
||||
"field": c.es_column,
|
||||
"precision_threshold": 10
|
||||
if c.es_type in elasticsearch.ES_NUMERIC_TYPES
|
||||
else 100,
|
||||
}
|
||||
}
|
||||
},
|
||||
"aggs": {"_nested": {"cardinality": {
|
||||
"field": c.es_column,
|
||||
"precision_threshold": 10
|
||||
if c.es_type in elasticsearch.ES_NUMERIC_TYPES
|
||||
else 100,
|
||||
}}},
|
||||
}
|
||||
else:
|
||||
return {"cardinality": {"field": c.es_column}}
|
||||
|
|
|
@ -29,7 +29,7 @@ from mo_imports import export, expect
|
|||
from mo_logs import Log
|
||||
from mo_threads import Lock
|
||||
|
||||
jx, = expect("jx")
|
||||
jx = expect("jx")
|
||||
|
||||
|
||||
class ListContainer(Container, Namespace, Table):
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from mo_logs import Log
|
||||
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ from mo_dots.utils import CLASS, OBJ
|
|||
from mo_future import is_binary, text, none_type
|
||||
from mo_imports import expect, export
|
||||
|
||||
to_data, = expect("to_data")
|
||||
to_data = expect("to_data")
|
||||
|
||||
_get = object.__getattribute__
|
||||
_set = object.__setattr__
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from collections import Mapping
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
|
||||
|
@ -17,7 +16,7 @@ from mo_dots.datas import register_data, Data, SLOT
|
|||
from mo_dots.lists import FlatList
|
||||
from mo_dots.nones import NullType
|
||||
from mo_dots.utils import CLASS, OBJ
|
||||
from mo_future import binary_type, generator_types, get_function_arguments, get_function_defaults, none_type, text
|
||||
from mo_future import binary_type, generator_types, get_function_arguments, get_function_defaults, none_type, text, Mapping
|
||||
from mo_imports import export, expect
|
||||
|
||||
get_attr, set_attr, list_to_data, to_data, from_data = expect("get_attr", "set_attr", "list_to_data", "to_data", "from_data")
|
||||
|
|
|
@ -8,19 +8,19 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from datetime import datetime
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
from fabric2 import Config, Connection as _Connection, Result
|
||||
from mo_logs.exceptions import Except
|
||||
|
||||
from mo_dots import set_default, unwrap, wrap, listwrap, coalesce
|
||||
from mo_dots import set_default, unwrap, wrap, listwrap
|
||||
from mo_files import File
|
||||
from mo_future import text, is_text
|
||||
from mo_kwargs import override
|
||||
from mo_logs import Log, exceptions, machine_metadata
|
||||
from mo_math.randoms import Random
|
||||
from mo_logs.exceptions import Except
|
||||
from mo_math import randoms
|
||||
from mo_threads import Thread
|
||||
from mo_threads.threads import RegisterThread
|
||||
|
||||
|
@ -105,7 +105,7 @@ class Connection(object):
|
|||
remote = self.conn.command_cwds[-1].rstrip("/'") + "/" + remote
|
||||
|
||||
if use_sudo:
|
||||
filename = "/tmp/" + Random.filename()
|
||||
filename = "/tmp/" + randoms.filename()
|
||||
self.sudo("cp " + remote + " " + filename)
|
||||
self.sudo("chmod a+r " + filename)
|
||||
self.conn.get(filename, File(local).abspath)
|
||||
|
@ -118,7 +118,7 @@ class Connection(object):
|
|||
remote = self.conn.command_cwds[-1].rstrip("/'") + "/" + remote
|
||||
|
||||
if use_sudo:
|
||||
filename = "/tmp/" + Random.filename()
|
||||
filename = "/tmp/" + randoms.filename()
|
||||
self.conn.put(File(local).abspath, filename)
|
||||
self.sudo("cp " + filename + " " + remote)
|
||||
self.sudo("rm " + filename)
|
||||
|
|
|
@ -302,6 +302,8 @@ else: # PY2
|
|||
d[key] = value
|
||||
return d
|
||||
|
||||
function_type = (lambda: None).__class__
|
||||
|
||||
|
||||
class decorate(object):
|
||||
def __init__(self, func):
|
||||
|
|
|
@ -156,7 +156,6 @@ def export(module, name, value=_nothing):
|
|||
frame = inspect.stack()[1]
|
||||
value = inspect.getmodule(frame[0])
|
||||
|
||||
|
||||
desc = getattr(module, name, None)
|
||||
if isinstance(desc, Expecting):
|
||||
with _locker:
|
||||
|
@ -210,12 +209,10 @@ def _error(description):
|
|||
|
||||
|
||||
def delay_import(module):
|
||||
globals = sys._getframe(1).f_globals
|
||||
caller_name = globals["__name__"]
|
||||
|
||||
# GET MODULE OF THE CALLER
|
||||
caller_frame = inspect.stack()[1]
|
||||
caller = inspect.getmodule(caller_frame[0])
|
||||
|
||||
return DelayedImport(caller, module)
|
||||
return DelayedImport(caller_name, module)
|
||||
|
||||
|
||||
class DelayedImport(object):
|
||||
|
@ -228,7 +225,8 @@ class DelayedImport(object):
|
|||
|
||||
def _import_now(self):
|
||||
# FIND MODULE VARIABLE THAT HOLDS self
|
||||
caller = _get(self, "caller")
|
||||
caller_name = _get(self, "caller")
|
||||
caller = importlib.import_module(caller_name)
|
||||
names = []
|
||||
for n in dir(caller):
|
||||
try:
|
||||
|
@ -238,26 +236,24 @@ class DelayedImport(object):
|
|||
pass
|
||||
|
||||
if not names:
|
||||
_error(
|
||||
"Can not find variable holding a " + self.__class__.__name__
|
||||
)
|
||||
_error("Can not find variable holding a " + self.__class__.__name__)
|
||||
|
||||
module = _get(self, "module")
|
||||
path = module.split(".")
|
||||
module_name, short_name = "".join(path[:-1]), path[-1]
|
||||
module_name, short_name = ".".join(path[:-1]), path[-1]
|
||||
try:
|
||||
m = importlib.import_module(module_name)
|
||||
val = getattr(m, short_name)
|
||||
|
||||
for n in names:
|
||||
setattr(caller, n, val)
|
||||
return m
|
||||
return val
|
||||
except Exception as cause:
|
||||
_error("Can not load " + _get(self, "module") + " caused by " + text(cause))
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
m = DelayedImport._import_now(self)
|
||||
return m()
|
||||
return m(*args, **kwargs)
|
||||
|
||||
def __getitem__(self, item):
|
||||
m = DelayedImport._import_now(self)
|
||||
|
|
|
@ -330,10 +330,7 @@ def pretty_json(value):
|
|||
|
||||
if len(value) == 1:
|
||||
j = pretty_json(value[0])
|
||||
if j.find("\n") >= 0:
|
||||
return "[\n" + indent(j) + "\n]"
|
||||
else:
|
||||
return "[" + j + "]"
|
||||
return "[" + j + "]"
|
||||
|
||||
js = [pretty_json(v) for v in value]
|
||||
max_len = max(*[len(j) for j in js])
|
||||
|
|
|
@ -18,7 +18,7 @@ from mo_imports import expect
|
|||
from mo_logs.log_usingNothing import StructuredLogger
|
||||
from mo_logs.strings import expand_template
|
||||
|
||||
Log, = expect("Log")
|
||||
Log = expect("Log")
|
||||
|
||||
|
||||
class StructuredLogger_usingFile(StructuredLogger):
|
||||
|
|
|
@ -13,7 +13,7 @@ from __future__ import absolute_import, division, unicode_literals
|
|||
|
||||
import logging
|
||||
|
||||
from mo_logs.log_usingThreadedStream import StructuredLogger_usingThreadedStream
|
||||
from mo_kwargs import override
|
||||
|
||||
from mo_dots import unwrap, Null
|
||||
from mo_logs import Log
|
||||
|
@ -23,6 +23,7 @@ from mo_logs.strings import expand_template
|
|||
|
||||
# WRAP PYTHON CLASSIC logger OBJECTS
|
||||
class StructuredLogger_usingHandler(StructuredLogger):
|
||||
@override("setings")
|
||||
def __init__(self, settings):
|
||||
if not _Thread:
|
||||
_late_import()
|
||||
|
@ -52,11 +53,7 @@ def make_log_from_settings(settings):
|
|||
temp = __import__(path, globals(), locals(), [class_name], 0)
|
||||
constructor = object.__getattribute__(temp, class_name)
|
||||
except Exception as e:
|
||||
if settings.stream and not constructor:
|
||||
# PROVIDE A DEFAULT STREAM HANLDER
|
||||
constructor = StructuredLogger_usingThreadedStream
|
||||
else:
|
||||
Log.error("Can not find class {{class}}", {"class": path}, cause=e)
|
||||
Log.error("Can not find class {{class}}", {"class": path}, cause=e)
|
||||
|
||||
# IF WE NEED A FILE, MAKE SURE DIRECTORY EXISTS
|
||||
if settings.filename != None:
|
||||
|
@ -67,8 +64,14 @@ def make_log_from_settings(settings):
|
|||
f.parent.create()
|
||||
|
||||
settings['class'] = None
|
||||
settings['cls'] = None
|
||||
settings['log_type'] = None
|
||||
settings['settings'] = None
|
||||
params = unwrap(settings)
|
||||
log_instance = constructor(**params)
|
||||
try:
|
||||
log_instance = constructor(**params)
|
||||
except Exception as cause:
|
||||
Log.error("problem with making handler", cause=cause)
|
||||
return log_instance
|
||||
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ from mo_imports import expect
|
|||
from mo_logs.exceptions import suppress_exception, Except
|
||||
from mo_logs.log_usingNothing import StructuredLogger
|
||||
|
||||
Log, = expect("Log")
|
||||
Log = expect("Log")
|
||||
|
||||
|
||||
class StructuredLogger_usingMulti(StructuredLogger):
|
||||
|
|
|
@ -16,7 +16,7 @@ from mo_logs import Except, Log
|
|||
from mo_logs.log_usingNothing import StructuredLogger
|
||||
from mo_threads import Queue, THREAD_STOP, Thread, Till
|
||||
|
||||
Log, = expect("Log")
|
||||
Log = expect("Log")
|
||||
|
||||
DEBUG = False
|
||||
PERIOD = 0.3
|
||||
|
|
|
@ -18,7 +18,7 @@ import tempfile
|
|||
from mo_dots import coalesce, listwrap, unwrap, to_data
|
||||
from mo_imports import expect
|
||||
|
||||
Log, = expect("Log")
|
||||
Log = expect("Log")
|
||||
|
||||
# PARAMETERS MATCH argparse.ArgumentParser.add_argument()
|
||||
# https://docs.python.org/dev/library/argparse.html#the-add-argument-method
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
def stats2tab(acc, separator="\t"):
|
||||
stats = [
|
||||
{
|
||||
"num_calls": d[1],
|
||||
"self_time": d[2],
|
||||
"total_time": d[3],
|
||||
"self_time_per_call": d[2] / d[1],
|
||||
"total_time_per_call": d[3] / d[1],
|
||||
"file": (f[0] if f[0] != "~" else "").replace("\\", "/"),
|
||||
"line": f[1],
|
||||
"method": f[2].lstrip("<").rstrip(">")
|
||||
}
|
||||
for f, d, in acc.stats.items()
|
||||
]
|
||||
|
||||
return list2tab(stats, separator=separator)
|
||||
|
||||
|
||||
def list2tab(rows, separator="\t"):
|
||||
from mo_json import value2json
|
||||
|
||||
columns = set()
|
||||
for r in rows:
|
||||
columns |= set(r.keys())
|
||||
keys = list(columns)
|
||||
|
||||
output = []
|
||||
for r in rows:
|
||||
output.append(separator.join(value2json(r.get(k)) for k in keys))
|
||||
|
||||
return separator.join(keys) + "\n" + "\n".join(output)
|
|
@ -12,9 +12,8 @@ from __future__ import absolute_import, division, unicode_literals
|
|||
import cProfile
|
||||
import pstats
|
||||
|
||||
from mo_dots import to_data
|
||||
from mo_future import iteritems
|
||||
from mo_logs import Log
|
||||
from mo_threads.profile_utils import stats2tab
|
||||
|
||||
FILENAME = "profile.tab"
|
||||
|
||||
|
@ -83,6 +82,7 @@ def write_profiles(main_thread_profile):
|
|||
return
|
||||
|
||||
from mo_files import File
|
||||
from mo_times import Date
|
||||
|
||||
cprofiler_stats.add(pstats.Stats(main_thread_profile.cprofiler))
|
||||
stats = cprofiler_stats.pop_all()
|
||||
|
@ -92,36 +92,7 @@ def write_profiles(main_thread_profile):
|
|||
for s in stats[1:]:
|
||||
acc.add(s)
|
||||
|
||||
stats = [
|
||||
{
|
||||
"num_calls": d[1],
|
||||
"self_time": d[2],
|
||||
"total_time": d[3],
|
||||
"self_time_per_call": d[2] / d[1],
|
||||
"total_time_per_call": d[3] / d[1],
|
||||
"file": (f[0] if f[0] != "~" else "").replace("\\", "/"),
|
||||
"line": f[1],
|
||||
"method": f[2].lstrip("<").rstrip(">")
|
||||
}
|
||||
for f, d, in iteritems(acc.stats)
|
||||
]
|
||||
from mo_times import Date
|
||||
tab = stats2tab(acc)
|
||||
|
||||
stats_file = File(FILENAME, suffix=Date.now().format("_%Y%m%d_%H%M%S"))
|
||||
stats_file.write(list2tab(stats))
|
||||
stats_file = File(FILENAME, suffix=Date.now().format("_%Y%m%d_%H%M%S")).write(tab)
|
||||
Log.note("profile written to {{filename}}", filename=stats_file.abspath)
|
||||
|
||||
|
||||
def list2tab(rows, separator="\t"):
|
||||
from mo_json import value2json
|
||||
|
||||
columns = set()
|
||||
for r in to_data(rows):
|
||||
columns |= set(k for k, v in r.leaves())
|
||||
keys = list(columns)
|
||||
|
||||
output = []
|
||||
for r in to_data(rows):
|
||||
output.append(separator.join(value2json(r[k]) for k in keys))
|
||||
|
||||
return separator.join(keys) + "\n" + "\n".join(output)
|
||||
|
|
Загрузка…
Ссылка в новой задаче