Merge branch 'update' into dev
This commit is contained in:
Коммит
c25ee6a7d7
|
@ -133,6 +133,15 @@ set PYTHONPATH=.;vendor
|
|||
python -m unittest discover -v -s tests
|
||||
```
|
||||
|
||||
## Fixing tests
|
||||
|
||||
Test runs are compared to documents found in the reference files at `tests/resources/reference`. They may need updating after changing the code.
|
||||
|
||||
python -m unittest test_examples
|
||||
|
||||
The output file is found in `tests/results`, and can replace the reference file. Be sure to review the `git diff`; it will show the change in the reference file, just to be sure nothing went wrong.
|
||||
|
||||
|
||||
## Upgrades
|
||||
|
||||
There may be enhancements from time to time. To get them
|
||||
|
|
|
@ -233,12 +233,24 @@ class AliasAnalyzer(object):
|
|||
try:
|
||||
if self.kwargs.elasticsearch:
|
||||
cluster= Cluster(self.kwargs.elasticsearch)
|
||||
self.es = cluster.get_or_create_index(
|
||||
kwargs=self.kwargs.elasticsearch,
|
||||
schema=ALIAS_SCHEMA,
|
||||
limit_replicas=True
|
||||
)
|
||||
self.es.add_alias(self.kwargs.elasticsearch.index)
|
||||
if cluster.get_best_matching_index(index=self.kwargs.elasticsearch.index, alias=self.kwargs.elasticsearch.alias):
|
||||
self.es = cluster.get_index(
|
||||
kwargs=self.kwargs.elasticsearch,
|
||||
schema=ALIAS_SCHEMA,
|
||||
read_only=False,
|
||||
limit_replicas=True
|
||||
)
|
||||
else:
|
||||
self.es = cluster.create_index(
|
||||
kwargs=self.kwargs.elasticsearch,
|
||||
schema=ALIAS_SCHEMA,
|
||||
limit_replicas=True
|
||||
)
|
||||
self.es.add_alias(self.kwargs.elasticsearch.index)
|
||||
cluster.delete_all_but(self.es.settings.alias, self.es.settings.index)
|
||||
self._load_aliases_from_file()
|
||||
self.save_aliases()
|
||||
return
|
||||
|
||||
file_date = os.path.getmtime(File(self.kwargs.file).abspath)
|
||||
index_date = float(cluster.get_metadata().indices[self.es.settings.index].settings.index.creation_date)/1000
|
||||
|
@ -253,6 +265,7 @@ class AliasAnalyzer(object):
|
|||
self.es.add_alias(self.kwargs.elasticsearch.index)
|
||||
cluster.delete_all_but(self.es.settings.alias, self.es.settings.index)
|
||||
self._load_aliases_from_file()
|
||||
self.save_aliases()
|
||||
return
|
||||
|
||||
esq = jx_elasticsearch.new_instance(self.es.settings)
|
||||
|
@ -296,10 +309,15 @@ class AliasAnalyzer(object):
|
|||
for k, v in self.aliases.items():
|
||||
if v["dirty"]:
|
||||
records.append({"id": k, "value": {"canonical": v["canonical"], "alias": k}})
|
||||
v['dirty'] = False
|
||||
|
||||
if records:
|
||||
Log.note("Net new aliases saved: {{num}}", num=len(records))
|
||||
self.es.extend(records)
|
||||
|
||||
for k, v in self.aliases.items():
|
||||
v['dirty'] = False
|
||||
|
||||
elif self.kwargs.file:
|
||||
def compact():
|
||||
return {
|
||||
|
|
|
@ -46,20 +46,20 @@ import re
|
|||
from bugzilla_etl.alias_analysis import AliasAnalyzer
|
||||
from bugzilla_etl.extract_bugzilla import MAX_TIMESTAMP
|
||||
from bugzilla_etl.transform_bugzilla import normalize, NUMERIC_FIELDS, MULTI_FIELDS, DIFF_FIELDS, NULL_VALUES, TIME_FIELDS, LONG_FIELDS
|
||||
from jx_base import meta_columns
|
||||
from jx_elasticsearch.meta import python_type_to_es_type
|
||||
from jx_python import jx, meta
|
||||
from jx_python import jx
|
||||
from mo_dots import inverse, coalesce, wrap, unwrap, literal_field, listwrap
|
||||
from mo_dots.datas import Data
|
||||
from mo_dots.lists import FlatList
|
||||
from mo_dots.nones import Null
|
||||
from mo_future import text_type, long, PYPY, PY2
|
||||
from mo_json import value2json
|
||||
from mo_json import value2json, python_type_to_json_type, STRING
|
||||
from mo_logs import Log, strings, Except
|
||||
from mo_logs.strings import apply_diff
|
||||
from mo_math import MIN, is_integer
|
||||
from mo_times import Date
|
||||
from pyLibrary import convert
|
||||
|
||||
# Used to split a flag into (type, status [,requestee])
|
||||
# Example: "review?(mreid@mozilla.com)" -> (review, ?, mreid@mozilla.com)
|
||||
# Example: "review-" -> (review, -)
|
||||
|
@ -1168,7 +1168,7 @@ class ApplyDiff(object):
|
|||
|
||||
text = self.text
|
||||
diff = self.diff
|
||||
if not self.result:
|
||||
if self.result == None:
|
||||
try:
|
||||
new_text = apply_diff(coalesce(text, "").split("\n"), diff.split("\n"), reverse=self.reverse, verify=DEBUG_DIFF)
|
||||
self.result = "\n".join(new_text)
|
||||
|
@ -1241,6 +1241,11 @@ class LongField(object):
|
|||
return self.value
|
||||
|
||||
|
||||
|
||||
# ENSURE WE REGISTER THIS PROMISE AS A STRING
|
||||
python_type_to_es_type[ApplyDiff] = "string"
|
||||
python_type_to_es_type[LongField] = "string"
|
||||
meta_columns._merge_order['ApplyDiff'] = 6
|
||||
meta_columns._merge_order['LongField'] = 6
|
||||
python_type_to_json_type[ApplyDiff] = STRING
|
||||
python_type_to_json_type[LongField] = STRING
|
||||
python_type_to_json_type['ApplyDiff'] = STRING
|
||||
python_type_to_json_type['LongField'] = STRING
|
||||
|
|
|
@ -98,7 +98,7 @@
|
|||
}
|
||||
},
|
||||
"constants": {
|
||||
"jx_elasticsearch.meta.DEBUG": true,
|
||||
"jx_elasticsearch.meta.DEBUG": false,
|
||||
"jx_elasticsearch.meta.ENABLE_META_SCAN": false,
|
||||
"pyLibrary.sql.mysql.EXECUTE_TIMEOUT": 0,
|
||||
"pyLibrary.env.http.default_headers": {
|
||||
|
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -360,6 +360,16 @@ class TestETL(unittest.TestCase):
|
|||
"format": "list"
|
||||
})
|
||||
if set(results.data.bug_id) != set(private_bugs):
|
||||
results = esq.query({
|
||||
"from": esq.name,
|
||||
"where": {"and": [
|
||||
{"in": {"bug_id": private_bugs}},
|
||||
{"gte": {"expires_on": Date.now().milli}}
|
||||
]},
|
||||
"limit": 200000,
|
||||
"format": "list"
|
||||
})
|
||||
|
||||
Log.error("Expecting private bugs to exist")
|
||||
|
||||
# MAKE A CHANGE TO THE PRIVATE BUGS
|
||||
|
@ -630,12 +640,12 @@ def compare_both(candidate, reference, settings, bug_ids):
|
|||
"modified_ts"
|
||||
)
|
||||
for v in versions:
|
||||
v.etl.timestamp = None
|
||||
v.etl = None
|
||||
|
||||
pre_ref_versions = get_all_bug_versions(None, bug_id, max_time, esq=referenceq)
|
||||
ref_versions = jx.sort(pre_ref_versions, "modified_ts")
|
||||
for v in ref_versions:
|
||||
v.etl.timestamp = None
|
||||
v.etl = None
|
||||
|
||||
can = value2json(scrub(versions), pretty=True)
|
||||
ref = value2json(scrub(ref_versions), pretty=True)
|
||||
|
|
|
@ -9,11 +9,10 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from copy import copy
|
||||
|
||||
from mo_dots import Data, is_data, join_field, set_default, split_field, wrap, is_many
|
||||
from mo_future import generator_types, text_type
|
||||
from mo_dots import Data, is_data, is_many, join_field, set_default, split_field, wrap
|
||||
from mo_future import is_text
|
||||
from mo_logs import Log
|
||||
|
||||
type2container = Data()
|
||||
|
@ -46,7 +45,7 @@ class Container(object):
|
|||
"""
|
||||
CONTAINERS HOLD MULTIPLE FACTS AND CAN HANDLE
|
||||
GENERAL JSON QUERY EXPRESSIONS ON ITS CONTENTS
|
||||
METADATA FOR A Container IS CALL A Namespace
|
||||
METADATA FOR A Container IS CALLED A Namespace
|
||||
"""
|
||||
|
||||
|
||||
|
|
|
@ -19,15 +19,14 @@ LANGUAGE, BUT WE KEEP CODE HERE SO THERE IS LESS OF IT
|
|||
"""
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from decimal import Decimal
|
||||
import operator
|
||||
import re
|
||||
|
||||
from jx_base.utils import get_property_name, is_variable_name
|
||||
from jx_base.language import BaseExpression, TYPE_ORDER, define_language, is_expression, is_op, value_compare, ID
|
||||
from mo_dots import Null, coalesce, is_data, is_list, split_field, wrap, is_sequence
|
||||
from mo_future import first, get_function_name, is_text, items as items_, text_type, utf8_json_encoder, zip_longest
|
||||
import mo_json
|
||||
from jx_base.language import BaseExpression, ID, TYPE_ORDER, define_language, is_expression, is_op, value_compare
|
||||
from jx_base.utils import get_property_name, is_variable_name
|
||||
from mo_dots import Null, coalesce, is_data, is_sequence, split_field, wrap, is_container, is_many
|
||||
from mo_future import first, get_function_name, is_text, items as items_, text_type, utf8_json_encoder, zip_longest
|
||||
from mo_json import BOOLEAN, INTEGER, IS_NULL, NUMBER, OBJECT, STRING, python_type_to_json_type, scrub
|
||||
from mo_json.typed_encoder import inserter_type_to_json_type
|
||||
from mo_logs import Except, Log
|
||||
|
@ -185,7 +184,7 @@ class Expression(BaseExpression):
|
|||
|
||||
if term == None:
|
||||
return class_([], **clauses)
|
||||
elif is_list(term):
|
||||
elif is_container(term):
|
||||
terms = [jx_expression(t) for t in term]
|
||||
return class_(terms, **clauses)
|
||||
elif is_data(term):
|
||||
|
@ -300,7 +299,7 @@ class Variable(Expression):
|
|||
row = row.get(p)
|
||||
if row is None:
|
||||
return None
|
||||
if is_list(row) and len(row) == 1:
|
||||
if is_sequence(row) and len(row) == 1:
|
||||
return row[0]
|
||||
return row
|
||||
|
||||
|
@ -440,7 +439,7 @@ class SelectOp(Expression):
|
|||
expr = wrap(expr)
|
||||
term = expr.select
|
||||
terms = []
|
||||
if not is_list(term):
|
||||
if not is_container(term):
|
||||
raise Log.error("Expecting a list")
|
||||
for t in term:
|
||||
if is_text(t):
|
||||
|
@ -564,10 +563,10 @@ class Literal(Expression):
|
|||
return cls.lang[DateOp(term.get('date'))]
|
||||
return object.__new__(cls)
|
||||
|
||||
def __init__(self, term):
|
||||
def __init__(self, value):
|
||||
Expression.__init__(self, None)
|
||||
self.simplified = True
|
||||
self.term = term
|
||||
self._value = value
|
||||
|
||||
@classmethod
|
||||
def define(cls, expr):
|
||||
|
@ -593,14 +592,14 @@ class Literal(Expression):
|
|||
|
||||
@property
|
||||
def value(self):
|
||||
return self.term
|
||||
return self._value
|
||||
|
||||
@property
|
||||
def json(self):
|
||||
if self.term == "":
|
||||
if self._value == "":
|
||||
self._json = '""'
|
||||
else:
|
||||
self._json = value2json(self.term)
|
||||
self._json = value2json(self._value)
|
||||
|
||||
return self._json
|
||||
|
||||
|
@ -611,7 +610,7 @@ class Literal(Expression):
|
|||
return self
|
||||
|
||||
def missing(self):
|
||||
if self.term in [None, Null]:
|
||||
if self._value in [None, Null]:
|
||||
return TRUE
|
||||
if self.value == '':
|
||||
return TRUE
|
||||
|
@ -628,7 +627,7 @@ class Literal(Expression):
|
|||
|
||||
@property
|
||||
def type(self):
|
||||
return python_type_to_json_type[self.term.__class__]
|
||||
return python_type_to_json_type[self._value.__class__]
|
||||
|
||||
def partial_eval(self):
|
||||
return self
|
||||
|
@ -855,7 +854,7 @@ class TupleOp(Expression):
|
|||
Expression.__init__(self, terms)
|
||||
if terms == None:
|
||||
self.terms = []
|
||||
elif is_list(terms):
|
||||
elif is_many(terms):
|
||||
self.terms = terms
|
||||
else:
|
||||
self.terms = [terms]
|
||||
|
@ -1070,12 +1069,12 @@ class EqOp(Expression):
|
|||
data_type = BOOLEAN
|
||||
|
||||
def __new__(cls, terms):
|
||||
if is_list(terms):
|
||||
if is_many(terms):
|
||||
return object.__new__(cls)
|
||||
|
||||
items = terms.items()
|
||||
if len(items) == 1:
|
||||
if is_list(items[0][1]):
|
||||
if is_many(items[0][1]):
|
||||
return cls.lang[InOp(items[0])]
|
||||
else:
|
||||
return cls.lang[EqOp(items[0])]
|
||||
|
@ -1243,7 +1242,7 @@ class AndOp(Expression):
|
|||
Expression.__init__(self, terms)
|
||||
if terms == None:
|
||||
self.terms = []
|
||||
elif is_list(terms):
|
||||
elif is_many(terms):
|
||||
self.terms = terms
|
||||
else:
|
||||
self.terms = [terms]
|
||||
|
@ -1477,10 +1476,8 @@ class LastOp(Expression):
|
|||
elif term is NULL:
|
||||
return term
|
||||
elif is_literal(term):
|
||||
if is_list(term):
|
||||
if len(term) > 0:
|
||||
return term[-1]
|
||||
return NULL
|
||||
if is_many(term):
|
||||
return last(term)
|
||||
return term
|
||||
else:
|
||||
return self.lang[LastOp(term)]
|
||||
|
@ -1711,7 +1708,7 @@ class CountOp(Expression):
|
|||
|
||||
def __init__(self, terms, **clauses):
|
||||
Expression.__init__(self, terms)
|
||||
if is_list(terms):
|
||||
if is_many(terms):
|
||||
# SHORTCUT: ASSUME AN ARRAY OF IS A TUPLE
|
||||
self.terms = self.lang[TupleOp(terms)]
|
||||
else:
|
||||
|
@ -1740,7 +1737,7 @@ class MaxOp(Expression):
|
|||
Expression.__init__(self, terms)
|
||||
if terms == None:
|
||||
self.terms = []
|
||||
elif is_list(terms):
|
||||
elif is_many(terms):
|
||||
self.terms = terms
|
||||
else:
|
||||
self.terms = [terms]
|
||||
|
@ -1793,7 +1790,7 @@ class MinOp(Expression):
|
|||
Expression.__init__(self, terms)
|
||||
if terms == None:
|
||||
self.terms = []
|
||||
elif is_list(terms):
|
||||
elif is_many(terms):
|
||||
self.terms = terms
|
||||
else:
|
||||
self.terms = [terms]
|
||||
|
@ -2591,7 +2588,7 @@ class BetweenOp(Expression):
|
|||
@classmethod
|
||||
def define(cls, expr):
|
||||
term = expr.between
|
||||
if is_list(term):
|
||||
if is_sequence(term):
|
||||
return cls.lang[BetweenOp(
|
||||
value=jx_expression(term[0]),
|
||||
prefix=jx_expression(term[1]),
|
||||
|
@ -2601,7 +2598,7 @@ class BetweenOp(Expression):
|
|||
)]
|
||||
elif is_data(term):
|
||||
var, vals = term.items()[0]
|
||||
if is_list(vals) and len(vals) == 2:
|
||||
if is_sequence(vals) and len(vals) == 2:
|
||||
return cls.lang[BetweenOp(
|
||||
value=Variable(var),
|
||||
prefix=Literal(vals[0]),
|
||||
|
@ -2685,7 +2682,7 @@ class InOp(Expression):
|
|||
def __new__(cls, terms):
|
||||
if is_op(terms[0], Variable) and is_op(terms[1], Literal):
|
||||
name, value = terms
|
||||
if not is_sequence(value.value):
|
||||
if not is_many(value.value):
|
||||
return cls.lang[EqOp([name, Literal([value.value])])]
|
||||
return object.__new__(cls)
|
||||
|
||||
|
@ -2879,7 +2876,7 @@ class UnionOp(Expression):
|
|||
Expression.__init__(self, terms)
|
||||
if terms == None:
|
||||
self.terms = []
|
||||
elif is_list(terms):
|
||||
elif is_many(terms):
|
||||
self.terms = terms
|
||||
else:
|
||||
self.terms = [terms]
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
class Facts(object):
|
||||
"""
|
||||
REPRESENT A HIERARCHICAL DATASTORE: MULTIPLE TABLES IN A DATABASE ALONG
|
||||
|
|
|
@ -14,7 +14,7 @@ from math import isnan
|
|||
|
||||
from mo_dots import Data, data_types, listwrap
|
||||
from mo_dots.lists import list_types
|
||||
from mo_future import boolean_type, long, none_type, text_type
|
||||
from mo_future import boolean_type, long, none_type, text_type, transpose
|
||||
from mo_logs import Log
|
||||
from mo_times import Date
|
||||
|
||||
|
@ -113,7 +113,7 @@ def define_language(lang_name, module_vars):
|
|||
|
||||
if lang_name:
|
||||
# ENSURE THE ALL OPS ARE DEFINED ON THE NEW LANGUAGE
|
||||
for base_op, new_op in list(zip(JX.ops, language.ops)):
|
||||
for base_op, new_op in transpose(JX.ops, language.ops):
|
||||
if new_op is base_op:
|
||||
# MISSED DEFINITION, ADD ONE
|
||||
new_op = type(base_op.__name__, (base_op,), {})
|
||||
|
|
|
@ -0,0 +1,254 @@
|
|||
# encoding: utf-8
|
||||
#
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
# You can obtain one at http:# mozilla.org/MPL/2.0/.
|
||||
#
|
||||
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
|
||||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from collections import Mapping
|
||||
|
||||
from jx_base import Column
|
||||
from jx_base.schema import Schema
|
||||
from mo_collections import UniqueIndex
|
||||
from mo_dots import (
|
||||
Data,
|
||||
FlatList,
|
||||
NullType,
|
||||
ROOT_PATH,
|
||||
concat_field,
|
||||
is_container,
|
||||
join_field,
|
||||
listwrap,
|
||||
split_field,
|
||||
unwraplist,
|
||||
)
|
||||
from mo_future import binary_type, items, long, none_type, reduce, text_type
|
||||
from mo_json import INTEGER, NUMBER, STRING, python_type_to_json_type
|
||||
from mo_times.dates import Date
|
||||
|
||||
DEBUG = False
|
||||
singlton = None
|
||||
META_INDEX_NAME = "meta.columns"
|
||||
META_TYPE_NAME = "column"
|
||||
|
||||
|
||||
def get_schema_from_list(table_name, frum, native_type_to_json_type=python_type_to_json_type):
|
||||
"""
|
||||
SCAN THE LIST FOR COLUMN TYPES
|
||||
"""
|
||||
columns = UniqueIndex(keys=("name",))
|
||||
_get_schema_from_list(
|
||||
frum,
|
||||
".",
|
||||
parent=".",
|
||||
nested_path=ROOT_PATH,
|
||||
columns=columns,
|
||||
native_type_to_json_type=native_type_to_json_type,
|
||||
)
|
||||
return Schema(table_name=table_name, columns=list(columns))
|
||||
|
||||
|
||||
def _get_schema_from_list(
|
||||
frum, # The list
|
||||
table_name, # Name of the table this list holds records for
|
||||
parent, # parent path
|
||||
nested_path, # each nested array, in reverse order
|
||||
columns, # map from full name to column definition
|
||||
native_type_to_json_type # dict from storage type name to json type name
|
||||
):
|
||||
for d in frum:
|
||||
row_type = python_type_to_json_type[d.__class__]
|
||||
|
||||
if row_type != "object":
|
||||
# EXPECTING PRIMITIVE VALUE
|
||||
full_name = parent
|
||||
column = columns[full_name]
|
||||
if not column:
|
||||
column = Column(
|
||||
name=concat_field(table_name, full_name),
|
||||
es_column=full_name,
|
||||
es_index=".",
|
||||
es_type=d.__class__.__name__,
|
||||
jx_type=None, # WILL BE SET BELOW
|
||||
last_updated=Date.now(),
|
||||
nested_path=nested_path,
|
||||
)
|
||||
columns.add(column)
|
||||
column.es_type = _merge_python_type(column.es_type, d.__class__)
|
||||
column.jx_type = native_type_to_json_type[column.es_type]
|
||||
else:
|
||||
for name, value in d.items():
|
||||
full_name = concat_field(parent, name)
|
||||
column = columns[full_name]
|
||||
if not column:
|
||||
column = Column(
|
||||
name=concat_field(table_name, full_name),
|
||||
es_column=full_name,
|
||||
es_index=".",
|
||||
es_type=value.__class__.__name__,
|
||||
jx_type=None, # WILL BE SET BELOW
|
||||
last_updated=Date.now(),
|
||||
nested_path=nested_path,
|
||||
)
|
||||
columns.add(column)
|
||||
if is_container(value): # GET TYPE OF MULTIVALUE
|
||||
v = list(value)
|
||||
if len(v) == 0:
|
||||
this_type = none_type.__name__
|
||||
elif len(v) == 1:
|
||||
this_type = v[0].__class__.__name__
|
||||
else:
|
||||
this_type = reduce(
|
||||
_merge_python_type, (vi.__class__.__name__ for vi in value)
|
||||
)
|
||||
else:
|
||||
this_type = value.__class__.__name__
|
||||
column.es_type = _merge_python_type(column.es_type, this_type)
|
||||
try:
|
||||
column.jx_type = native_type_to_json_type[column.es_type]
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
if this_type in {"object", "dict", "Mapping", "Data"}:
|
||||
_get_schema_from_list(
|
||||
[value], table_name, full_name, nested_path, columns, native_type_to_json_type
|
||||
)
|
||||
elif this_type in {"list", "FlatList"}:
|
||||
np = listwrap(nested_path)
|
||||
newpath = unwraplist([join_field(split_field(np[0]) + [name])] + np)
|
||||
_get_schema_from_list(
|
||||
value, table_name, full_name, newpath, columns
|
||||
)
|
||||
|
||||
|
||||
def get_id(column):
|
||||
"""
|
||||
:param column:
|
||||
:return: Elasticsearch id for column
|
||||
"""
|
||||
return column.es_index + "|" + column.es_column
|
||||
|
||||
|
||||
METADATA_COLUMNS = (
|
||||
[
|
||||
Column(
|
||||
name=c,
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column=c,
|
||||
es_type="keyword",
|
||||
jx_type=STRING,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in [
|
||||
"name",
|
||||
"es_type",
|
||||
"jx_type",
|
||||
"nested_path",
|
||||
"es_column",
|
||||
"es_index",
|
||||
"partitions",
|
||||
]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name=c,
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column=c,
|
||||
es_type="integer",
|
||||
jx_type=INTEGER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in ["count", "cardinality", "multi"]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name="last_updated",
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column="last_updated",
|
||||
es_type="double",
|
||||
jx_type=NUMBER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
SIMPLE_METADATA_COLUMNS = ( # FOR PURELY INTERNAL PYTHON LISTS, NOT MAPPING TO ANOTHER DATASTORE
|
||||
[
|
||||
Column(
|
||||
name=c,
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column=c,
|
||||
es_type="string",
|
||||
jx_type=STRING,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in ["table", "name", "type", "nested_path"]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name=c,
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column=c,
|
||||
es_type="long",
|
||||
jx_type=INTEGER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in ["count", "cardinality", "multi"]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name="last_updated",
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column="last_updated",
|
||||
es_type="time",
|
||||
jx_type=NUMBER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
_merge_order = {
|
||||
none_type: 0,
|
||||
NullType: 1,
|
||||
bool: 2,
|
||||
int: 3,
|
||||
long: 3,
|
||||
Date: 4,
|
||||
float: 5,
|
||||
text_type: 6,
|
||||
binary_type: 6,
|
||||
object: 7,
|
||||
dict: 8,
|
||||
Mapping: 9,
|
||||
Data: 10,
|
||||
list: 11,
|
||||
FlatList: 12,
|
||||
}
|
||||
|
||||
for k, v in items(_merge_order):
|
||||
_merge_order[k.__name__] = v
|
||||
|
||||
|
||||
def _merge_python_type(A, B):
|
||||
a = _merge_order[A]
|
||||
b = _merge_order[B]
|
||||
|
||||
if a >= b:
|
||||
output = A
|
||||
else:
|
||||
output = B
|
||||
|
||||
if isinstance(output, str):
|
||||
return output
|
||||
else:
|
||||
return output.__name__
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from jx_base.query import QueryOp
|
||||
from mo_dots import is_data
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ def _late_import():
|
|||
global _jx
|
||||
global _Column
|
||||
|
||||
from jx_python.meta import Column as _Column
|
||||
from jx_base import Column as _Column
|
||||
from jx_python import jx as _jx
|
||||
|
||||
_ = _jx
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from copy import copy
|
||||
|
||||
from mo_dots import Null, relative_field, set_default, startswith_field, wrap
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
class Snowflake(object):
|
||||
"""
|
||||
REPRESENT ONE ALIAS, AND ITS NESTED ARRAYS
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
class Table(object):
|
||||
|
||||
def __init__(self, full_name):
|
||||
|
|
|
@ -71,6 +71,11 @@ class ES52(Container):
|
|||
else:
|
||||
self.es = elasticsearch.Cluster(kwargs=kwargs).get_index(read_only=read_only, kwargs=kwargs)
|
||||
|
||||
self.es.cluster.put("/" + self.es.settings.index + "/_settings", data={"index": {
|
||||
"max_inner_result_window": 100000,
|
||||
"max_result_window": 100000
|
||||
}})
|
||||
|
||||
self._namespace = ElasticsearchMetadata(kwargs=kwargs)
|
||||
self.settings.type = self.es.settings.type
|
||||
self.edges = Data()
|
||||
|
|
|
@ -11,7 +11,10 @@ from __future__ import absolute_import, division, unicode_literals
|
|||
|
||||
import itertools
|
||||
|
||||
from jx_base.expressions import (AndOp as AndOp_, BasicEqOp as BasicEqOp_, BasicStartsWithOp as BasicStartsWithOp_, BooleanOp as BooleanOp_, CaseOp as CaseOp_, CoalesceOp as CoalesceOp_, ConcatOp as ConcatOp_, DivOp as DivOp_, EqOp as EqOp_, EsNestedOp as EsNestedOp_, ExistsOp as ExistsOp_, FALSE, FalseOp as FalseOp_, GtOp as GtOp_, GteOp as GteOp_, InOp as InOp_, LengthOp as LengthOp_, Literal as Literal_, LtOp as LtOp_, LteOp as LteOp_, MissingOp as MissingOp_, NULL, NeOp as NeOp_, NotOp as NotOp_, NullOp, OrOp as OrOp_, PrefixOp as PrefixOp_, RegExpOp as RegExpOp_, ScriptOp as ScriptOp_, StringOp as StringOp_, SuffixOp as SuffixOp_, TRUE, TrueOp as TrueOp_, Variable as Variable_, WhenOp as WhenOp_, extend, is_literal)
|
||||
from jx_base.expressions import (AndOp as AndOp_, BasicEqOp as BasicEqOp_, BasicStartsWithOp as BasicStartsWithOp_, BooleanOp as BooleanOp_, CaseOp as CaseOp_, CoalesceOp as CoalesceOp_, ConcatOp as ConcatOp_, DivOp as DivOp_,
|
||||
EqOp as EqOp_, EsNestedOp as EsNestedOp_, ExistsOp as ExistsOp_, FALSE, FalseOp as FalseOp_, GtOp as GtOp_, GteOp as GteOp_, InOp as InOp_, LengthOp as LengthOp_, Literal as Literal_, LtOp as LtOp_,
|
||||
LteOp as LteOp_, MissingOp as MissingOp_, NULL, NeOp as NeOp_, NotOp as NotOp_, NullOp, OrOp as OrOp_, PrefixOp as PrefixOp_, RegExpOp as RegExpOp_, ScriptOp as ScriptOp_, StringOp as StringOp_,
|
||||
SuffixOp as SuffixOp_, TRUE, TrueOp as TrueOp_, Variable as Variable_, WhenOp as WhenOp_, extend, is_literal)
|
||||
from jx_base.language import Language, define_language, is_op
|
||||
from jx_elasticsearch.es52.util import (
|
||||
MATCH_ALL,
|
||||
|
@ -25,7 +28,7 @@ from jx_elasticsearch.es52.util import (
|
|||
pull_functions,
|
||||
)
|
||||
from jx_python.jx import value_compare
|
||||
from mo_dots import Data, Null, is_container, is_list, literal_field, set_default, wrap, is_sequence
|
||||
from mo_dots import Data, Null, is_container, literal_field, set_default, wrap, is_many
|
||||
from mo_future import first
|
||||
from mo_json import BOOLEAN, NESTED, OBJECT, python_type_to_json_type
|
||||
from mo_logs import Log, suppress_exception
|
||||
|
@ -172,7 +175,7 @@ class EqOp(EqOp_):
|
|||
lhs = self.lhs.var
|
||||
cols = schema.leaves(lhs)
|
||||
|
||||
if is_list(rhs):
|
||||
if is_container(rhs):
|
||||
if len(rhs) == 1:
|
||||
rhs = rhs[0]
|
||||
else:
|
||||
|
@ -227,9 +230,9 @@ class BasicEqOp(BasicEqOp_):
|
|||
if cols:
|
||||
lhs = first(cols).es_column
|
||||
rhs = self.rhs.value
|
||||
if is_list(rhs):
|
||||
if is_many(rhs):
|
||||
if len(rhs) == 1:
|
||||
return {"term": {lhs: rhs[0]}}
|
||||
return {"term": {lhs: first(rhs)}}
|
||||
else:
|
||||
return {"terms": {lhs: rhs}}
|
||||
else:
|
||||
|
@ -463,17 +466,17 @@ class InOp(InOp_):
|
|||
var = self.value.var
|
||||
cols = schema.leaves(var)
|
||||
if not cols:
|
||||
Log.error("expecting {{var}} to be a column", var=var)
|
||||
return MATCH_NONE
|
||||
col = first(cols)
|
||||
var = col.es_column
|
||||
|
||||
if col.jx_type == BOOLEAN:
|
||||
if is_literal(self.superset) and not is_sequence(self.superset.value):
|
||||
if is_literal(self.superset) and not is_many(self.superset.value):
|
||||
return {"term": {var: value2boolean(self.superset.value)}}
|
||||
else:
|
||||
return {"terms": {var: map(value2boolean, self.superset.value)}}
|
||||
else:
|
||||
if is_literal(self.superset) and not is_sequence(self.superset.value):
|
||||
if is_literal(self.superset) and not is_many(self.superset.value):
|
||||
return {"term": {var: self.superset.value}}
|
||||
else:
|
||||
return {"terms": {var: self.superset.value}}
|
||||
|
@ -701,14 +704,14 @@ def split_expression_by_path(
|
|||
:param var_to_columns: MAP FROM EACH VARIABLE NAME TO THE DEPTH
|
||||
:return: output: A MAP FROM PATH TO EXPRESSION
|
||||
"""
|
||||
where_vars = where.vars()
|
||||
if var_to_columns is None:
|
||||
var_to_columns = {v.var: schema.leaves(v.var) for v in where.vars()}
|
||||
var_to_columns = {v.var: schema.leaves(v.var) for v in where_vars}
|
||||
output = wrap({schema.query_path[0]: []})
|
||||
if not var_to_columns:
|
||||
output["\\."] += [where] # LEGIT EXPRESSIONS OF ZERO VARIABLES
|
||||
return output
|
||||
|
||||
where_vars = where.vars()
|
||||
all_paths = set(c.nested_path[0] for v in where_vars for c in var_to_columns[v.var])
|
||||
|
||||
if len(all_paths) == 0:
|
||||
|
|
|
@ -14,15 +14,14 @@ from datetime import date, datetime
|
|||
from decimal import Decimal
|
||||
|
||||
import jx_base
|
||||
from jx_base import TableDesc
|
||||
from jx_base import TableDesc, Column
|
||||
from jx_base.namespace import Namespace
|
||||
from jx_base.query import QueryOp
|
||||
from jx_elasticsearch.meta_columns import ColumnList
|
||||
from jx_python import jx
|
||||
from jx_python.containers.list_usingPythonList import ListContainer
|
||||
from jx_python.meta import Column, ColumnList
|
||||
from mo_dots import Data, FlatList, Null, NullType, ROOT_PATH, coalesce, concat_field, is_list, literal_field, relative_field, set_default, split_field, startswith_field, tail_field, wrap
|
||||
from mo_files import URL
|
||||
from mo_future import PY2, none_type, text_type
|
||||
from mo_future import long, none_type, text_type
|
||||
from mo_json import BOOLEAN, EXISTS, INTEGER, OBJECT, STRING, STRUCT
|
||||
from mo_json.typed_encoder import BOOLEAN_TYPE, EXISTS_TYPE, NUMBER_TYPE, STRING_TYPE, unnest_path, untype_path
|
||||
from mo_kwargs import override
|
||||
|
@ -78,7 +77,7 @@ class ElasticsearchMetadata(Namespace):
|
|||
self.metadata_last_updated = Date.now() - OLD_METADATA
|
||||
|
||||
self.meta = Data()
|
||||
self.meta.columns = ColumnList(URL(self.es_cluster.settings.host).host)
|
||||
self.meta.columns = ColumnList(self.es_cluster)
|
||||
|
||||
self.alias_to_query_paths = {
|
||||
"meta.columns": [ROOT_PATH],
|
||||
|
@ -143,7 +142,7 @@ class ElasticsearchMetadata(Namespace):
|
|||
for d in diff:
|
||||
dirty = True
|
||||
i1.add_property(*d)
|
||||
meta = self.es_cluster.get_metadata(force=dirty).indices[canonical_index]
|
||||
meta = self.es_cluster.get_metadata(force=dirty).indices[literal_field(canonical_index)]
|
||||
|
||||
data_type, mapping = _get_best_type_from_mapping(meta.mappings)
|
||||
mapping.properties["_id"] = {"type": "string", "index": "not_analyzed"}
|
||||
|
@ -277,7 +276,7 @@ class ElasticsearchMetadata(Namespace):
|
|||
columns = self.meta.columns.find(alias, column_name)
|
||||
DEBUG and Log.note("columns from find()")
|
||||
|
||||
DEBUG and Log.note("columns are {{ids}}", ids=[id(c) for c in columns])
|
||||
DEBUG and Log.note("columns for {{table}} are {{ids}}", table=table_name, ids=[id(c) for c in columns])
|
||||
|
||||
columns = jx.sort(columns, "name")
|
||||
|
||||
|
@ -286,7 +285,7 @@ class ElasticsearchMetadata(Namespace):
|
|||
|
||||
# WAIT FOR THE COLUMNS TO UPDATE
|
||||
while True:
|
||||
pending = [c for c in columns if after >= c.last_updated or (c.cardinality == None and c.jx_type not in STRUCT)]
|
||||
pending = [c for c in columns if after >= c.last_updated]
|
||||
if not pending:
|
||||
break
|
||||
if timeout:
|
||||
|
@ -314,19 +313,6 @@ class ElasticsearchMetadata(Namespace):
|
|||
if column.jx_type in STRUCT:
|
||||
Log.error("not supported")
|
||||
try:
|
||||
if column.es_index == "meta.columns":
|
||||
partitions = jx.sort([g[column.es_column] for g, _ in jx.groupby(self.meta.columns, column.es_column) if g[column.es_column] != None])
|
||||
self.meta.columns.update({
|
||||
"set": {
|
||||
"partitions": partitions,
|
||||
"count": len(self.meta.columns),
|
||||
"cardinality": len(partitions),
|
||||
"multi": 1,
|
||||
"last_updated": now
|
||||
},
|
||||
"where": {"eq": {"es_index": column.es_index, "es_column": column.es_column}}
|
||||
})
|
||||
return
|
||||
if column.es_index == "meta.tables":
|
||||
partitions = jx.sort([g[column.es_column] for g, _ in jx.groupby(self.meta.tables, column.es_column) if g[column.es_column] != None])
|
||||
self.meta.columns.update({
|
||||
|
@ -521,7 +507,7 @@ class ElasticsearchMetadata(Namespace):
|
|||
old_columns = [
|
||||
c
|
||||
for c in self.meta.columns
|
||||
if ((c.last_updated < Date.now() - MAX_COLUMN_METADATA_AGE) or c.cardinality == None) and c.jx_type not in STRUCT
|
||||
if (c.last_updated < Date.now() - MAX_COLUMN_METADATA_AGE) and c.jx_type not in STRUCT
|
||||
]
|
||||
if old_columns:
|
||||
DEBUG and Log.note(
|
||||
|
@ -552,7 +538,7 @@ class ElasticsearchMetadata(Namespace):
|
|||
continue
|
||||
elif column.last_updated > Date.now() - TOO_OLD and column.cardinality is not None:
|
||||
# DO NOT UPDATE FRESH COLUMN METADATA
|
||||
DEBUG and Log.note("{{column.es_column}} is still fresh ({{ago}} ago)", column=column, ago=(Date.now()-Date(column.last_updated)).seconds)
|
||||
DEBUG and Log.note("{{column.es_column}} is still fresh ({{ago}} ago)", column=column, ago=(Date.now()-Date(column.last_updated)))
|
||||
continue
|
||||
try:
|
||||
self._update_cardinality(column)
|
||||
|
@ -575,26 +561,34 @@ class ElasticsearchMetadata(Namespace):
|
|||
column = self.todo.pop()
|
||||
if column == THREAD_STOP:
|
||||
break
|
||||
# if untype_path(column.name) in ["build.type", "run.type"]:
|
||||
# Log.note("found")
|
||||
|
||||
if column.jx_type in STRUCT or split_field(column.es_column)[-1] == EXISTS_TYPE:
|
||||
DEBUG and Log.note("{{column.es_column}} is a struct", column=column)
|
||||
column.last_updated = Date.now()
|
||||
continue
|
||||
elif column.last_updated > Date.now() - TOO_OLD and column.cardinality is not None:
|
||||
# DO NOT UPDATE FRESH COLUMN METADATA
|
||||
DEBUG and Log.note("{{column.es_column}} is still fresh ({{ago}} ago)", column=column, ago=(Date.now()-Date(column.last_updated)).seconds)
|
||||
continue
|
||||
|
||||
with Timer("Update {{col.es_index}}.{{col.es_column}}", param={"col": column}, silent=not DEBUG, too_long=0.05):
|
||||
if column.jx_type in STRUCT or split_field(column.es_column)[-1] == EXISTS_TYPE:
|
||||
DEBUG and Log.note("{{column.es_column}} is a struct", column=column)
|
||||
continue
|
||||
elif column.last_updated > Date.now() - TOO_OLD and column.cardinality>0:
|
||||
# DO NOT UPDATE FRESH COLUMN METADATA
|
||||
DEBUG and Log.note("{{column.es_column}} is still fresh ({{ago}} ago)", column=column, ago=(Date.now()-Date(column.last_updated)).seconds)
|
||||
continue
|
||||
if untype_path(column.name) in ["build.type", "run.type"]:
|
||||
try:
|
||||
self._update_cardinality(column)
|
||||
except Exception as e:
|
||||
Log.warning("problem getting cardinality for {{column.name}}", column=column, cause=e)
|
||||
else:
|
||||
column.last_updated = Date.now()
|
||||
continue
|
||||
|
||||
self.meta.columns.update({
|
||||
"set": {
|
||||
"last_updated": Date.now()
|
||||
},
|
||||
"clear": [
|
||||
"count",
|
||||
"cardinality",
|
||||
"multi",
|
||||
"partitions",
|
||||
],
|
||||
"where": {"eq": {"es_index": column.es_index, "es_column": column.es_column}}
|
||||
})
|
||||
|
||||
|
||||
def get_table(self, name):
|
||||
|
@ -904,6 +898,7 @@ python_type_to_es_type = {
|
|||
str: "string",
|
||||
text_type: "string",
|
||||
int: "integer",
|
||||
long: "integer",
|
||||
float: "double",
|
||||
Data: "object",
|
||||
dict: "object",
|
||||
|
@ -916,9 +911,6 @@ python_type_to_es_type = {
|
|||
date: "double"
|
||||
}
|
||||
|
||||
if PY2:
|
||||
python_type_to_es_type[long] = "integer"
|
||||
|
||||
_merge_es_type = {
|
||||
"undefined": {
|
||||
"undefined": "undefined",
|
||||
|
|
|
@ -0,0 +1,448 @@
|
|||
# encoding: utf-8
|
||||
#
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
# You can obtain one at http:# mozilla.org/MPL/2.0/.
|
||||
#
|
||||
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
|
||||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
import jx_base
|
||||
from jx_base import Column, Table
|
||||
from jx_base.meta_columns import METADATA_COLUMNS, SIMPLE_METADATA_COLUMNS
|
||||
from jx_base.schema import Schema
|
||||
from jx_python import jx
|
||||
from mo_dots import Data, Null, is_data, is_list, unwraplist, wrap
|
||||
from mo_json import STRUCT
|
||||
from mo_json.typed_encoder import unnest_path, untype_path, untyped
|
||||
from mo_logs import Log
|
||||
from mo_math import MAX
|
||||
from mo_threads import Lock, Queue, Thread, Till, MAIN_THREAD
|
||||
from mo_times import Timer
|
||||
from mo_times.dates import Date
|
||||
|
||||
DEBUG = False
|
||||
singlton = None
|
||||
META_INDEX_NAME = "meta.columns"
|
||||
META_TYPE_NAME = "column"
|
||||
COLUMN_LOAD_PERIOD = 10
|
||||
COLUMN_EXTRACT_PERIOD = 2*60
|
||||
|
||||
|
||||
class ColumnList(Table, jx_base.Container):
|
||||
"""
|
||||
OPTIMIZED FOR THE PARTICULAR ACCESS PATTERNS USED
|
||||
"""
|
||||
|
||||
def __init__(self, es_cluster):
|
||||
Table.__init__(self, META_INDEX_NAME)
|
||||
self.data = {} # MAP FROM ES_INDEX TO (abs_column_name to COLUMNS)
|
||||
self.locker = Lock()
|
||||
self._schema = None
|
||||
self.dirty = False
|
||||
self.es_cluster = es_cluster
|
||||
self.es_index = None
|
||||
self.last_load = Null
|
||||
self.todo = Queue(
|
||||
"update columns to es"
|
||||
) # HOLD (action, column) PAIR, WHERE action in ['insert', 'update']
|
||||
self._db_load()
|
||||
Thread.run(
|
||||
"update " + META_INDEX_NAME,
|
||||
self._synch_with_es,
|
||||
parent_thread=MAIN_THREAD,
|
||||
)
|
||||
|
||||
def _query(self, query):
|
||||
result = Data()
|
||||
curr = self.es_cluster.execute(query)
|
||||
result.meta.format = "table"
|
||||
result.header = [d[0] for d in curr.description] if curr.description else None
|
||||
result.data = curr.fetchall()
|
||||
return result
|
||||
|
||||
def _db_create(self):
|
||||
schema = {
|
||||
"settings": {"index.number_of_shards": 1, "index.number_of_replicas": 2},
|
||||
"mappings": {META_TYPE_NAME: {}},
|
||||
}
|
||||
|
||||
self.es_index = self.es_cluster.create_index(
|
||||
id={"field": ["es_index", "es_column"], "version": "last_updated"},
|
||||
index=META_INDEX_NAME,
|
||||
schema=schema,
|
||||
)
|
||||
self.es_index.add_alias(META_INDEX_NAME)
|
||||
|
||||
for c in METADATA_COLUMNS:
|
||||
self._add(c)
|
||||
self.es_index.add({"value": c.__dict__()})
|
||||
|
||||
def _db_load(self):
|
||||
self.last_load = Date.now()
|
||||
|
||||
try:
|
||||
self.es_index = self.es_cluster.get_index(
|
||||
index=META_INDEX_NAME, type=META_TYPE_NAME, read_only=False
|
||||
)
|
||||
|
||||
result = self.es_index.search(
|
||||
{
|
||||
"query": {
|
||||
"bool": {
|
||||
"should": [
|
||||
{
|
||||
"bool": {
|
||||
"must_not": {
|
||||
"exists": {"field": "cardinality.~n~"}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"range": {"cardinality.~n~": {"gt": 0}}
|
||||
}, # ASSUME UNUSED COLUMNS DO NOT EXIST
|
||||
]
|
||||
}
|
||||
},
|
||||
"sort": ["es_index.~s~", "name.~s~", "es_column.~s~"],
|
||||
"size": 10000,
|
||||
}
|
||||
)
|
||||
|
||||
DEBUG and Log.note("{{num}} columns loaded", num=result.hits.total)
|
||||
with self.locker:
|
||||
for r in result.hits.hits._source:
|
||||
self._add(doc_to_column(r))
|
||||
|
||||
except Exception as e:
|
||||
Log.warning(
|
||||
"no {{index}} exists, making one", index=META_INDEX_NAME, cause=e
|
||||
)
|
||||
self._db_create()
|
||||
|
||||
def _synch_with_es(self, please_stop):
|
||||
try:
|
||||
last_extract = Date.now()
|
||||
while not please_stop:
|
||||
now =Date.now()
|
||||
try:
|
||||
if (now-last_extract).seconds > COLUMN_EXTRACT_PERIOD:
|
||||
result = self.es_index.search(
|
||||
{
|
||||
"query": {
|
||||
"range": {"last_updated.~n~": {"gt": self.last_load}}
|
||||
},
|
||||
"sort": ["es_index.~s~", "name.~s~", "es_column.~s~"],
|
||||
"from": 0,
|
||||
"size": 10000,
|
||||
}
|
||||
)
|
||||
last_extract = now
|
||||
|
||||
with self.locker:
|
||||
for r in result.hits.hits._source:
|
||||
c = doc_to_column(r)
|
||||
self._add(c)
|
||||
self.last_load = MAX((self.last_load, c.last_updated))
|
||||
|
||||
while not please_stop:
|
||||
updates = self.todo.pop_all()
|
||||
if not updates:
|
||||
break
|
||||
|
||||
DEBUG and updates and Log.note(
|
||||
"{{num}} columns to push to db", num=len(updates)
|
||||
)
|
||||
self.es_index.extend(
|
||||
{"value": column.__dict__()} for column in updates
|
||||
)
|
||||
except Exception as e:
|
||||
Log.warning("problem updating database", cause=e)
|
||||
|
||||
(Till(seconds=COLUMN_LOAD_PERIOD) | please_stop).wait()
|
||||
finally:
|
||||
Log.note("done")
|
||||
|
||||
def find(self, es_index, abs_column_name=None):
|
||||
with self.locker:
|
||||
if es_index.startswith("meta."):
|
||||
self._update_meta()
|
||||
|
||||
if not abs_column_name:
|
||||
return [c for cs in self.data.get(es_index, {}).values() for c in cs]
|
||||
else:
|
||||
return self.data.get(es_index, {}).get(abs_column_name, [])
|
||||
|
||||
def extend(self, columns):
|
||||
self.dirty = True
|
||||
with self.locker:
|
||||
for column in columns:
|
||||
self._add(column)
|
||||
|
||||
def add(self, column):
|
||||
self.dirty = True
|
||||
with self.locker:
|
||||
canonical = self._add(column)
|
||||
if canonical == None:
|
||||
return column # ALREADY ADDED
|
||||
self.todo.add(canonical)
|
||||
return canonical
|
||||
|
||||
def remove_table(self, table_name):
|
||||
del self.data[table_name]
|
||||
|
||||
def _add(self, column):
|
||||
"""
|
||||
:param column: ANY COLUMN OBJECT
|
||||
:return: None IF column IS canonical ALREADY (NET-ZERO EFFECT)
|
||||
"""
|
||||
columns_for_table = self.data.setdefault(column.es_index, {})
|
||||
existing_columns = columns_for_table.setdefault(column.name, [])
|
||||
|
||||
for canonical in existing_columns:
|
||||
if canonical is column:
|
||||
return None
|
||||
if canonical.es_type == column.es_type:
|
||||
if column.last_updated > canonical.last_updated:
|
||||
for key in Column.__slots__:
|
||||
old_value = canonical[key]
|
||||
new_value = column[key]
|
||||
if new_value == None:
|
||||
pass # DO NOT BOTHER CLEARING OLD VALUES (LIKE cardinality AND paritiions)
|
||||
elif new_value == old_value:
|
||||
pass # NO NEED TO UPDATE WHEN NO CHANGE MADE (COMMON CASE)
|
||||
else:
|
||||
canonical[key] = new_value
|
||||
return canonical
|
||||
existing_columns.append(column)
|
||||
return column
|
||||
|
||||
def _update_meta(self):
|
||||
if not self.dirty:
|
||||
return
|
||||
|
||||
for mcl in self.data.get(META_INDEX_NAME).values():
|
||||
for mc in mcl:
|
||||
count = 0
|
||||
values = set()
|
||||
objects = 0
|
||||
multi = 1
|
||||
for column in self._all_columns():
|
||||
value = column[mc.name]
|
||||
if value == None:
|
||||
pass
|
||||
else:
|
||||
count += 1
|
||||
if is_list(value):
|
||||
multi = max(multi, len(value))
|
||||
try:
|
||||
values |= set(value)
|
||||
except Exception:
|
||||
objects += len(value)
|
||||
elif is_data(value):
|
||||
objects += 1
|
||||
else:
|
||||
values.add(value)
|
||||
mc.count = count
|
||||
mc.cardinality = len(values) + objects
|
||||
mc.partitions = jx.sort(values)
|
||||
mc.multi = multi
|
||||
mc.last_updated = Date.now()
|
||||
self.dirty = False
|
||||
|
||||
def _all_columns(self):
|
||||
return [
|
||||
column
|
||||
for t, cs in self.data.items()
|
||||
for _, css in cs.items()
|
||||
for column in css
|
||||
]
|
||||
|
||||
def __iter__(self):
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
return iter(self._all_columns())
|
||||
|
||||
def __len__(self):
|
||||
return self.data[META_INDEX_NAME]["es_index"].count
|
||||
|
||||
def update(self, command):
|
||||
self.dirty = True
|
||||
try:
|
||||
command = wrap(command)
|
||||
DEBUG and Log.note(
|
||||
"Update {{timestamp}}: {{command|json}}",
|
||||
command=command,
|
||||
timestamp=Date(command["set"].last_updated),
|
||||
)
|
||||
eq = command.where.eq
|
||||
if eq.es_index:
|
||||
if len(eq) == 1:
|
||||
if unwraplist(command.clear) == ".":
|
||||
d = self.data
|
||||
i = eq.es_index
|
||||
with self.locker:
|
||||
cols = d[i]
|
||||
del d[i]
|
||||
|
||||
for c in cols:
|
||||
mark_as_deleted(c)
|
||||
self.todo.add(c)
|
||||
return
|
||||
|
||||
# FASTEST
|
||||
all_columns = self.data.get(eq.es_index, {}).values()
|
||||
with self.locker:
|
||||
columns = [c for cs in all_columns for c in cs]
|
||||
elif eq.es_column and len(eq) == 2:
|
||||
# FASTER
|
||||
all_columns = self.data.get(eq.es_index, {}).values()
|
||||
with self.locker:
|
||||
columns = [
|
||||
c
|
||||
for cs in all_columns
|
||||
for c in cs
|
||||
if c.es_column == eq.es_column
|
||||
]
|
||||
|
||||
else:
|
||||
# SLOWER
|
||||
all_columns = self.data.get(eq.es_index, {}).values()
|
||||
with self.locker:
|
||||
columns = [
|
||||
c
|
||||
for cs in all_columns
|
||||
for c in cs
|
||||
if all(
|
||||
c[k] == v for k, v in eq.items()
|
||||
) # THIS LINE IS VERY SLOW
|
||||
]
|
||||
else:
|
||||
columns = list(self)
|
||||
columns = jx.filter(columns, command.where)
|
||||
|
||||
with self.locker:
|
||||
for col in columns:
|
||||
DEBUG and Log.note(
|
||||
"update column {{table}}.{{column}}",
|
||||
table=col.es_index,
|
||||
column=col.es_column,
|
||||
)
|
||||
for k in command["clear"]:
|
||||
if k == ".":
|
||||
mark_as_deleted(col)
|
||||
self.todo.add(col)
|
||||
lst = self.data[col.es_index]
|
||||
cols = lst[col.name]
|
||||
cols.remove(col)
|
||||
if len(cols) == 0:
|
||||
del lst[col.name]
|
||||
if len(lst) == 0:
|
||||
del self.data[col.es_index]
|
||||
break
|
||||
else:
|
||||
col[k] = None
|
||||
else:
|
||||
# DID NOT DELETE COLUMNM ("."), CONTINUE TO SET PROPERTIES
|
||||
for k, v in command.set.items():
|
||||
col[k] = v
|
||||
self.todo.add(col)
|
||||
|
||||
except Exception as e:
|
||||
Log.error("should not happen", cause=e)
|
||||
|
||||
def query(self, query):
|
||||
# NOT EXPECTED TO BE RUN
|
||||
Log.error("not")
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
if not self._schema:
|
||||
self._schema = Schema(
|
||||
".", [c for cs in self.data[META_INDEX_NAME].values() for c in cs]
|
||||
)
|
||||
snapshot = self._all_columns()
|
||||
|
||||
from jx_python.containers.list_usingPythonList import ListContainer
|
||||
|
||||
query.frum = ListContainer(META_INDEX_NAME, snapshot, self._schema)
|
||||
return jx.run(query)
|
||||
|
||||
def groupby(self, keys):
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
return jx.groupby(self.__iter__(), keys)
|
||||
|
||||
def window(self, window):
|
||||
raise NotImplemented()
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
if not self._schema:
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
self._schema = Schema(
|
||||
".", [c for cs in self.data[META_INDEX_NAME].values() for c in cs]
|
||||
)
|
||||
return self._schema
|
||||
|
||||
@property
|
||||
def namespace(self):
|
||||
return self
|
||||
|
||||
def get_table(self, table_name):
|
||||
if table_name != META_INDEX_NAME:
|
||||
Log.error("this container has only the " + META_INDEX_NAME)
|
||||
return self
|
||||
|
||||
def get_columns(self, table_name):
|
||||
if table_name != META_INDEX_NAME:
|
||||
Log.error("this container has only the " + META_INDEX_NAME)
|
||||
return self._all_columns()
|
||||
|
||||
def denormalized(self):
|
||||
"""
|
||||
THE INTERNAL STRUCTURE FOR THE COLUMN METADATA IS VERY DIFFERENT FROM
|
||||
THE DENORMALIZED PERSPECITVE. THIS PROVIDES THAT PERSPECTIVE FOR QUERIES
|
||||
"""
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
output = [
|
||||
{
|
||||
"table": c.es_index,
|
||||
"name": untype_path(c.name),
|
||||
"cardinality": c.cardinality,
|
||||
"es_column": c.es_column,
|
||||
"es_index": c.es_index,
|
||||
"last_updated": c.last_updated,
|
||||
"count": c.count,
|
||||
"nested_path": [unnest_path(n) for n in c.nested_path],
|
||||
"es_type": c.es_type,
|
||||
"type": c.jx_type,
|
||||
}
|
||||
for tname, css in self.data.items()
|
||||
for cname, cs in css.items()
|
||||
for c in cs
|
||||
if c.jx_type not in STRUCT # and c.es_column != "_id"
|
||||
]
|
||||
|
||||
from jx_python.containers.list_usingPythonList import ListContainer
|
||||
|
||||
return ListContainer(
|
||||
self.name,
|
||||
data=output,
|
||||
schema=jx_base.Schema(META_INDEX_NAME, SIMPLE_METADATA_COLUMNS),
|
||||
)
|
||||
|
||||
|
||||
def doc_to_column(doc):
|
||||
return Column(**wrap(untyped(doc)))
|
||||
|
||||
|
||||
def mark_as_deleted(col):
|
||||
col.count = 0
|
||||
col.cardinality = 0
|
||||
col.multi = 0
|
||||
col.partitions = None
|
||||
col.last_updated = Date.now()
|
|
@ -16,10 +16,10 @@ import jx_base
|
|||
from jx_base import Container
|
||||
from jx_base.expressions import TRUE, Variable
|
||||
from jx_base.language import is_expression, is_op
|
||||
from jx_base.meta_columns import get_schema_from_list
|
||||
from jx_base.schema import Schema
|
||||
from jx_python.expressions import jx_expression_to_function
|
||||
from jx_python.lists.aggs import is_aggs, list_aggs
|
||||
from jx_python.meta import get_schema_from_list
|
||||
from mo_collections import UniqueIndex
|
||||
from mo_dots import Data, Null, is_data, is_list, listwrap, unwrap, unwraplist, wrap
|
||||
from mo_future import first, sort_using_key
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
import itertools
|
||||
|
||||
from jx_base.domains import DefaultDomain, SimpleSetDomain
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
#
|
||||
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
|
||||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
from __future__ import absolute_import, division
|
||||
|
||||
import re
|
||||
|
||||
|
@ -15,12 +15,20 @@ from mo_dots import Data, coalesce, is_data, listwrap, wrap_leaves
|
|||
from mo_logs import Log, strings
|
||||
from mo_times.dates import Date
|
||||
|
||||
true = True
|
||||
false = False
|
||||
null = None
|
||||
EMPTY_DICT = {}
|
||||
|
||||
_keep_imports = [coalesce, listwrap, Date, Log, Data, re, wrap_leaves, is_data]
|
||||
GLOBALS = {
|
||||
"true": True,
|
||||
"false": False,
|
||||
"null": None,
|
||||
"EMPTY_DICT": {},
|
||||
"coalesce": coalesce,
|
||||
"listwrap": listwrap,
|
||||
"Date": Date,
|
||||
"Log": Log,
|
||||
"Data": Data,
|
||||
"re": re,
|
||||
"wrap_leaves": wrap_leaves,
|
||||
"is_data": is_data
|
||||
}
|
||||
|
||||
|
||||
def compile_expression(source):
|
||||
|
@ -33,17 +41,17 @@ def compile_expression(source):
|
|||
fake_locals = {}
|
||||
try:
|
||||
exec(
|
||||
"""
|
||||
def output(row, rownum=None, rows=None):
|
||||
_source = """ + strings.quote(source) + """
|
||||
try:
|
||||
return """ + source + """
|
||||
except Exception as e:
|
||||
Log.error("Problem with dynamic function {{func|quote}}", func=_source, cause=e)
|
||||
""",
|
||||
globals(),
|
||||
fake_locals
|
||||
(
|
||||
"def output(row, rownum=None, rows=None):\n" +
|
||||
" _source = " + strings.quote(source) + "\n" +
|
||||
" try:\n" +
|
||||
" return " + source + "\n" +
|
||||
" except Exception as e:\n" +
|
||||
" Log.error(u'Problem with dynamic function {{func|quote}}', func=_source, cause=e)\n"
|
||||
),
|
||||
GLOBALS,
|
||||
fake_locals,
|
||||
)
|
||||
except Exception as e:
|
||||
Log.error("Bad source: {{source}}", source=source, cause=e)
|
||||
return fake_locals['output']
|
||||
Log.error(u"Bad source: {{source}}", source=source, cause=e)
|
||||
return fake_locals["output"]
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
import functools
|
||||
|
||||
from mo_dots import Data, FlatList, coalesce, is_data, is_list, split_field, wrap
|
||||
|
|
|
@ -640,7 +640,7 @@ def drill_filter(esfilter, data):
|
|||
"""
|
||||
PARTIAL EVALUATE THE FILTER BASED ON DATA GIVEN
|
||||
|
||||
TODO: FIX THIS MONUMENALLY BAD IDEA
|
||||
TODO: FIX THIS MONUMENTALLY BAD IDEA
|
||||
"""
|
||||
esfilter = unwrap(esfilter)
|
||||
primary_nested = [] # track if nested, changes if not
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
# encoding: utf-8
|
||||
#
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
# You can obtain one at http:# mozilla.org/MPL/2.0/.
|
||||
#
|
||||
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
|
||||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
import dataset
|
||||
from jx_python.containers.Table_usingDataset import Table_usingDataset
|
||||
|
||||
|
||||
class Dataset(object):
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.db = dataset.connect('sqlite:///:memory:')
|
||||
|
||||
|
||||
def get_or_create_table(self, name, uid):
|
||||
return Table_usingDataset(name, self.db, primary_id=uid)
|
||||
|
||||
|
||||
|
|
@ -1 +0,0 @@
|
|||
__author__ = 'kyle'
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
import itertools
|
||||
|
||||
from jx_base.domains import DefaultDomain, SimpleSetDomain
|
||||
|
@ -23,6 +22,7 @@ from mo_times.dates import Date
|
|||
|
||||
_ = Date
|
||||
|
||||
|
||||
def is_aggs(query):
|
||||
if query.edges or query.groupby or any(a != None and a != "none" for a in listwrap(query.select).aggregate):
|
||||
return True
|
||||
|
|
|
@ -1,752 +0,0 @@
|
|||
# encoding: utf-8
|
||||
#
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
# You can obtain one at http:# mozilla.org/MPL/2.0/.
|
||||
#
|
||||
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
|
||||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from collections import Mapping
|
||||
from contextlib import contextmanager
|
||||
import sqlite3
|
||||
|
||||
import jx_base
|
||||
from jx_base import Column, Table
|
||||
from jx_base.schema import Schema
|
||||
from jx_python import jx
|
||||
from mo_collections import UniqueIndex
|
||||
from mo_dots import Data, FlatList, Null, NullType, ROOT_PATH, concat_field, is_container, is_data, is_list, join_field, listwrap, split_field, unwraplist, wrap
|
||||
from mo_files import File
|
||||
from mo_future import items, none_type, reduce, text_type, binary_type
|
||||
from mo_json import (INTEGER, NUMBER, STRING, STRUCT, json2value, python_type_to_json_type, value2json)
|
||||
from mo_json.typed_encoder import unnest_path, untype_path
|
||||
from mo_logs import Except, Log
|
||||
from mo_threads import Lock, Queue, Thread, Till
|
||||
from mo_times.dates import Date
|
||||
from pyLibrary.sql import (SQL_AND, SQL_FROM, SQL_ORDERBY, SQL_SELECT, SQL_WHERE, sql_iso, sql_list)
|
||||
from pyLibrary.sql.sqlite import json_type_to_sqlite_type, quote_column, quote_value
|
||||
|
||||
DEBUG = False
|
||||
singlton = None
|
||||
db_table_name = quote_column("meta.columns")
|
||||
|
||||
INSERT, UPDATE, DELETE, EXECUTE = "insert", "update", "delete", "execute"
|
||||
|
||||
|
||||
class ColumnList(Table, jx_base.Container):
|
||||
"""
|
||||
OPTIMIZED FOR THE PARTICULAR ACCESS PATTERNS USED
|
||||
"""
|
||||
|
||||
def __init__(self, name):
|
||||
Table.__init__(self, "meta.columns")
|
||||
self.db_file = File("metadata." + name + ".sqlite")
|
||||
self.data = {} # MAP FROM ES_INDEX TO (abs_column_name to COLUMNS)
|
||||
self.locker = Lock()
|
||||
self._schema = None
|
||||
self.db = sqlite3.connect(
|
||||
database=self.db_file.abspath, check_same_thread=False, isolation_level=None
|
||||
)
|
||||
self.last_load = Null
|
||||
self.todo = Queue(
|
||||
"update columns to db"
|
||||
) # HOLD (action, column) PAIR, WHERE action in ['insert', 'update']
|
||||
self._db_load()
|
||||
Thread.run("update " + name, self._db_worker)
|
||||
|
||||
@contextmanager
|
||||
def _db_transaction(self):
|
||||
self.db.execute(str("BEGIN"))
|
||||
try:
|
||||
yield
|
||||
self.db.execute(str("COMMIT"))
|
||||
except Exception as e:
|
||||
e = Except.wrap(e)
|
||||
self.db.execute(str("ROLLBACK"))
|
||||
Log.error("Transaction failed", cause=e)
|
||||
|
||||
def _query(self, query):
|
||||
result = Data()
|
||||
curr = self.db.execute(query)
|
||||
result.meta.format = "table"
|
||||
result.header = [d[0] for d in curr.description] if curr.description else None
|
||||
result.data = curr.fetchall()
|
||||
return result
|
||||
|
||||
def _db_create(self):
|
||||
with self._db_transaction():
|
||||
self.db.execute(
|
||||
"CREATE TABLE "
|
||||
+ db_table_name
|
||||
+ sql_iso(
|
||||
sql_list(
|
||||
[
|
||||
quote_column(c.name)
|
||||
+ " "
|
||||
+ json_type_to_sqlite_type[c.jx_type]
|
||||
for c in METADATA_COLUMNS
|
||||
]
|
||||
+ [
|
||||
"PRIMARY KEY"
|
||||
+ sql_iso(
|
||||
sql_list(map(quote_column, ["es_index", "es_column"]))
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
for c in METADATA_COLUMNS:
|
||||
self._add(c)
|
||||
self._db_insert_column(c)
|
||||
|
||||
def _db_load(self):
|
||||
self.last_load = Date.now()
|
||||
|
||||
result = self._query(
|
||||
SQL_SELECT
|
||||
+ "name"
|
||||
+ SQL_FROM
|
||||
+ "sqlite_master"
|
||||
+ SQL_WHERE
|
||||
+ SQL_AND.join(["name=" + db_table_name, "type=" + quote_value("table")])
|
||||
)
|
||||
if not result.data:
|
||||
self._db_create()
|
||||
return
|
||||
|
||||
result = self._query(
|
||||
SQL_SELECT
|
||||
+ all_columns
|
||||
+ SQL_FROM
|
||||
+ db_table_name
|
||||
+ SQL_ORDERBY
|
||||
+ sql_list(map(quote_column, ["es_index", "name", "es_column"]))
|
||||
)
|
||||
|
||||
with self.locker:
|
||||
for r in result.data:
|
||||
c = row_to_column(result.header, r)
|
||||
self._add(c)
|
||||
|
||||
def _db_worker(self, please_stop):
|
||||
while not please_stop:
|
||||
try:
|
||||
with self._db_transaction():
|
||||
result = self._query(
|
||||
SQL_SELECT
|
||||
+ all_columns
|
||||
+ SQL_FROM
|
||||
+ db_table_name
|
||||
+ SQL_WHERE
|
||||
+ "last_updated > "
|
||||
+ quote_value(self.last_load)
|
||||
+ SQL_ORDERBY
|
||||
+ sql_list(map(quote_column, ["es_index", "name", "es_column"]))
|
||||
)
|
||||
|
||||
with self.locker:
|
||||
for r in result.data:
|
||||
c = row_to_column(result.header, r)
|
||||
self._add(c)
|
||||
if c.last_updated > self.last_load:
|
||||
self.last_load = c.last_updated
|
||||
|
||||
updates = self.todo.pop_all()
|
||||
DEBUG and updates and Log.note(
|
||||
"{{num}} columns to push to db", num=len(updates)
|
||||
)
|
||||
for action, column in updates:
|
||||
while not please_stop:
|
||||
try:
|
||||
with self._db_transaction():
|
||||
DEBUG and Log.note(
|
||||
"{{action}} db for {{table}}.{{column}}",
|
||||
action=action,
|
||||
table=column.es_index,
|
||||
column=column.es_column,
|
||||
)
|
||||
if action is EXECUTE:
|
||||
self.db.execute(column)
|
||||
elif action is UPDATE:
|
||||
self.db.execute(
|
||||
"UPDATE"
|
||||
+ db_table_name
|
||||
+ "SET"
|
||||
+ sql_list(
|
||||
[
|
||||
"count=" + quote_value(column.count),
|
||||
"cardinality="
|
||||
+ quote_value(column.cardinality),
|
||||
"multi=" + quote_value(column.multi),
|
||||
"partitions="
|
||||
+ quote_value(
|
||||
value2json(column.partitions)
|
||||
),
|
||||
"last_updated="
|
||||
+ quote_value(column.last_updated),
|
||||
]
|
||||
)
|
||||
+ SQL_WHERE
|
||||
+ SQL_AND.join(
|
||||
[
|
||||
"es_index = "
|
||||
+ quote_value(column.es_index),
|
||||
"es_column = "
|
||||
+ quote_value(column.es_column),
|
||||
"last_updated < "
|
||||
+ quote_value(column.last_updated),
|
||||
]
|
||||
)
|
||||
)
|
||||
elif action is DELETE:
|
||||
self.db.execute(
|
||||
"DELETE FROM"
|
||||
+ db_table_name
|
||||
+ SQL_WHERE
|
||||
+ SQL_AND.join(
|
||||
[
|
||||
"es_index = "
|
||||
+ quote_value(column.es_index),
|
||||
"es_column = "
|
||||
+ quote_value(column.es_column),
|
||||
]
|
||||
)
|
||||
)
|
||||
else:
|
||||
self._db_insert_column(column)
|
||||
break
|
||||
except Exception as e:
|
||||
e = Except.wrap(e)
|
||||
if "database is locked" in e:
|
||||
Log.note("metadata database is locked")
|
||||
Till(seconds=1).wait()
|
||||
break
|
||||
else:
|
||||
Log.warning("problem updataing database", cause=e)
|
||||
|
||||
except Exception as e:
|
||||
Log.warning("problem updating database", cause=e)
|
||||
|
||||
(Till(seconds=10) | please_stop).wait()
|
||||
|
||||
def _db_insert_column(self, column):
|
||||
try:
|
||||
self.db.execute(
|
||||
"INSERT INTO"
|
||||
+ db_table_name
|
||||
+ sql_iso(all_columns)
|
||||
+ "VALUES"
|
||||
+ sql_iso(
|
||||
sql_list(
|
||||
[
|
||||
quote_value(column[c.name])
|
||||
if c.name not in ("nested_path", "partitions")
|
||||
else quote_value(value2json(column[c.name]))
|
||||
for c in METADATA_COLUMNS
|
||||
]
|
||||
)
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
e = Except.wrap(e)
|
||||
if "UNIQUE constraint failed" in e or " are not unique" in e:
|
||||
# THIS CAN HAPPEN BECAUSE todo HAS OLD COLUMN DATA
|
||||
self.todo.add((UPDATE, column), force=True)
|
||||
else:
|
||||
Log.error("do not know how to handle", cause=e)
|
||||
|
||||
def __copy__(self):
|
||||
output = object.__new__(ColumnList)
|
||||
Table.__init__(output, "meta.columns")
|
||||
output.data = {
|
||||
t: {c: list(cs) for c, cs in dd.items()} for t, dd in self.data.items()
|
||||
}
|
||||
output.locker = Lock()
|
||||
output._schema = None
|
||||
return output
|
||||
|
||||
def find(self, es_index, abs_column_name=None):
|
||||
with self.locker:
|
||||
if es_index.startswith("meta."):
|
||||
self._update_meta()
|
||||
|
||||
if not abs_column_name:
|
||||
return [c for cs in self.data.get(es_index, {}).values() for c in cs]
|
||||
else:
|
||||
return self.data.get(es_index, {}).get(abs_column_name, [])
|
||||
|
||||
def extend(self, columns):
|
||||
self.dirty = True
|
||||
with self.locker:
|
||||
for column in columns:
|
||||
self._add(column)
|
||||
|
||||
def add(self, column):
|
||||
self.dirty = True
|
||||
with self.locker:
|
||||
canonical = self._add(column)
|
||||
if canonical == None:
|
||||
return column # ALREADY ADDED
|
||||
self.todo.add((INSERT if canonical is column else UPDATE, canonical))
|
||||
return canonical
|
||||
|
||||
def remove_table(self, table_name):
|
||||
del self.data[table_name]
|
||||
|
||||
def _add(self, column):
|
||||
"""
|
||||
:param column: ANY COLUMN OBJECT
|
||||
:return: None IF column IS canonical ALREADY (NET-ZERO EFFECT)
|
||||
"""
|
||||
columns_for_table = self.data.setdefault(column.es_index, {})
|
||||
existing_columns = columns_for_table.setdefault(column.name, [])
|
||||
|
||||
for canonical in existing_columns:
|
||||
if canonical is column:
|
||||
return None
|
||||
if canonical.es_type == column.es_type:
|
||||
if column.last_updated > canonical.last_updated:
|
||||
for key in Column.__slots__:
|
||||
old_value = canonical[key]
|
||||
new_value = column[key]
|
||||
if new_value == None:
|
||||
pass # DO NOT BOTHER CLEARING OLD VALUES (LIKE cardinality AND paritiions)
|
||||
elif new_value == old_value:
|
||||
pass # NO NEED TO UPDATE WHEN NO CHANGE MADE (COMMON CASE)
|
||||
else:
|
||||
canonical[key] = new_value
|
||||
return canonical
|
||||
existing_columns.append(column)
|
||||
return column
|
||||
|
||||
def _update_meta(self):
|
||||
if not self.dirty:
|
||||
return
|
||||
|
||||
for mcl in self.data.get("meta.columns").values():
|
||||
for mc in mcl:
|
||||
count = 0
|
||||
values = set()
|
||||
objects = 0
|
||||
multi = 1
|
||||
for column in self._all_columns():
|
||||
value = column[mc.name]
|
||||
if value == None:
|
||||
pass
|
||||
else:
|
||||
count += 1
|
||||
if is_list(value):
|
||||
multi = max(multi, len(value))
|
||||
try:
|
||||
values |= set(value)
|
||||
except Exception:
|
||||
objects += len(value)
|
||||
elif is_data(value):
|
||||
objects += 1
|
||||
else:
|
||||
values.add(value)
|
||||
mc.count = count
|
||||
mc.cardinality = len(values) + objects
|
||||
mc.partitions = jx.sort(values)
|
||||
mc.multi = multi
|
||||
mc.last_updated = Date.now()
|
||||
self.dirty = False
|
||||
|
||||
def _all_columns(self):
|
||||
return [
|
||||
column
|
||||
for t, cs in self.data.items()
|
||||
for _, css in cs.items()
|
||||
for column in css
|
||||
]
|
||||
|
||||
def __iter__(self):
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
return iter(self._all_columns())
|
||||
|
||||
def __len__(self):
|
||||
return self.data["meta.columns"]["es_index"].count
|
||||
|
||||
def update(self, command):
|
||||
self.dirty = True
|
||||
try:
|
||||
command = wrap(command)
|
||||
DEBUG and Log.note(
|
||||
"Update {{timestamp}}: {{command|json}}",
|
||||
command=command,
|
||||
timestamp=Date(command["set"].last_updated),
|
||||
)
|
||||
eq = command.where.eq
|
||||
if eq.es_index:
|
||||
if len(eq) == 1:
|
||||
if unwraplist(command.clear) == ".":
|
||||
with self.locker:
|
||||
del self.data[eq.es_index]
|
||||
self.todo.add(
|
||||
(
|
||||
EXECUTE,
|
||||
"DELETE FROM "
|
||||
+ db_table_name
|
||||
+ SQL_WHERE
|
||||
+ " es_index="
|
||||
+ quote_value(eq.es_index),
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
# FASTEST
|
||||
all_columns = self.data.get(eq.es_index, {}).values()
|
||||
with self.locker:
|
||||
columns = [c for cs in all_columns for c in cs]
|
||||
elif eq.es_column and len(eq) == 2:
|
||||
# FASTER
|
||||
all_columns = self.data.get(eq.es_index, {}).values()
|
||||
with self.locker:
|
||||
columns = [
|
||||
c
|
||||
for cs in all_columns
|
||||
for c in cs
|
||||
if c.es_column == eq.es_column
|
||||
]
|
||||
|
||||
else:
|
||||
# SLOWER
|
||||
all_columns = self.data.get(eq.es_index, {}).values()
|
||||
with self.locker:
|
||||
columns = [
|
||||
c
|
||||
for cs in all_columns
|
||||
for c in cs
|
||||
if all(
|
||||
c[k] == v for k, v in eq.items()
|
||||
) # THIS LINE IS VERY SLOW
|
||||
]
|
||||
else:
|
||||
columns = list(self)
|
||||
columns = jx.filter(columns, command.where)
|
||||
|
||||
with self.locker:
|
||||
for col in columns:
|
||||
DEBUG and Log.note(
|
||||
"update column {{table}}.{{column}}",
|
||||
table=col.es_index,
|
||||
column=col.es_column,
|
||||
)
|
||||
for k in command["clear"]:
|
||||
if k == ".":
|
||||
self.todo.add((DELETE, col))
|
||||
lst = self.data[col.es_index]
|
||||
cols = lst[col.name]
|
||||
cols.remove(col)
|
||||
if len(cols) == 0:
|
||||
del lst[col.name]
|
||||
if len(lst) == 0:
|
||||
del self.data[col.es_index]
|
||||
break
|
||||
else:
|
||||
col[k] = None
|
||||
else:
|
||||
# DID NOT DELETE COLUMNM ("."), CONTINUE TO SET PROPERTIES
|
||||
for k, v in command.set.items():
|
||||
col[k] = v
|
||||
self.todo.add((UPDATE, col))
|
||||
|
||||
except Exception as e:
|
||||
Log.error("should not happen", cause=e)
|
||||
|
||||
def query(self, query):
|
||||
# NOT EXPECTED TO BE RUN
|
||||
Log.error("not")
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
if not self._schema:
|
||||
self._schema = Schema(
|
||||
".", [c for cs in self.data["meta.columns"].values() for c in cs]
|
||||
)
|
||||
snapshot = self._all_columns()
|
||||
|
||||
from jx_python.containers.list_usingPythonList import ListContainer
|
||||
|
||||
query.frum = ListContainer("meta.columns", snapshot, self._schema)
|
||||
return jx.run(query)
|
||||
|
||||
def groupby(self, keys):
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
return jx.groupby(self.__iter__(), keys)
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
if not self._schema:
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
self._schema = Schema(
|
||||
".", [c for cs in self.data["meta.columns"].values() for c in cs]
|
||||
)
|
||||
return self._schema
|
||||
|
||||
@property
|
||||
def namespace(self):
|
||||
return self
|
||||
|
||||
def get_table(self, table_name):
|
||||
if table_name != "meta.columns":
|
||||
Log.error("this container has only the meta.columns")
|
||||
return self
|
||||
|
||||
def denormalized(self):
|
||||
"""
|
||||
THE INTERNAL STRUCTURE FOR THE COLUMN METADATA IS VERY DIFFERENT FROM
|
||||
THE DENORMALIZED PERSPECITVE. THIS PROVIDES THAT PERSPECTIVE FOR QUERIES
|
||||
"""
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
output = [
|
||||
{
|
||||
"table": c.es_index,
|
||||
"name": untype_path(c.name),
|
||||
"cardinality": c.cardinality,
|
||||
"es_column": c.es_column,
|
||||
"es_index": c.es_index,
|
||||
"last_updated": c.last_updated,
|
||||
"count": c.count,
|
||||
"nested_path": [unnest_path(n) for n in c.nested_path],
|
||||
"es_type": c.es_type,
|
||||
"type": c.jx_type,
|
||||
}
|
||||
for tname, css in self.data.items()
|
||||
for cname, cs in css.items()
|
||||
for c in cs
|
||||
if c.jx_type not in STRUCT # and c.es_column != "_id"
|
||||
]
|
||||
|
||||
from jx_python.containers.list_usingPythonList import ListContainer
|
||||
|
||||
return ListContainer(
|
||||
self.name,
|
||||
data=output,
|
||||
schema=jx_base.Schema("meta.columns", SIMPLE_METADATA_COLUMNS),
|
||||
)
|
||||
|
||||
|
||||
def get_schema_from_list(table_name, frum):
|
||||
"""
|
||||
SCAN THE LIST FOR COLUMN TYPES
|
||||
"""
|
||||
columns = UniqueIndex(keys=("name",))
|
||||
_get_schema_from_list(frum, ".", parent=".", nested_path=ROOT_PATH, columns=columns)
|
||||
return Schema(table_name=table_name, columns=list(columns))
|
||||
|
||||
|
||||
def _get_schema_from_list(frum, table_name, parent, nested_path, columns):
|
||||
"""
|
||||
:param frum: The list
|
||||
:param table_name: Name of the table this list holds records for
|
||||
:param parent: parent path
|
||||
:param nested_path: each nested array, in reverse order
|
||||
:param columns: map from full name to column definition
|
||||
:return:
|
||||
"""
|
||||
|
||||
for d in frum:
|
||||
row_type = python_type_to_json_type[d.__class__]
|
||||
|
||||
if row_type != "object":
|
||||
# EXPECTING PRIMITIVE VALUE
|
||||
full_name = parent
|
||||
column = columns[full_name]
|
||||
if not column:
|
||||
column = Column(
|
||||
name=concat_field(table_name, full_name),
|
||||
es_column=full_name,
|
||||
es_index=".",
|
||||
es_type=d.__class__.__name__,
|
||||
jx_type=None, # WILL BE SET BELOW
|
||||
last_updated=Date.now(),
|
||||
nested_path=nested_path,
|
||||
)
|
||||
columns.add(column)
|
||||
column.es_type = _merge_python_type(column.es_type, d.__class__)
|
||||
column.jx_type = python_type_to_json_type[column.es_type]
|
||||
else:
|
||||
for name, value in d.items():
|
||||
full_name = concat_field(parent, name)
|
||||
column = columns[full_name]
|
||||
if not column:
|
||||
column = Column(
|
||||
name=concat_field(table_name, full_name),
|
||||
es_column=full_name,
|
||||
es_index=".",
|
||||
es_type=value.__class__.__name__,
|
||||
jx_type=None, # WILL BE SET BELOW
|
||||
last_updated=Date.now(),
|
||||
nested_path=nested_path,
|
||||
)
|
||||
columns.add(column)
|
||||
if is_container(value): # GET TYPE OF MULTIVALUE
|
||||
v = list(value)
|
||||
if len(v) == 0:
|
||||
this_type = none_type.__name__
|
||||
elif len(v) == 1:
|
||||
this_type = v[0].__class__.__name__
|
||||
else:
|
||||
this_type = reduce(
|
||||
_merge_python_type, (vi.__class__.__name__ for vi in value)
|
||||
)
|
||||
else:
|
||||
this_type = value.__class__.__name__
|
||||
column.es_type = _merge_python_type(column.es_type, this_type)
|
||||
column.jx_type = python_type_to_json_type[column.es_type]
|
||||
|
||||
if this_type in {"object", "dict", "Mapping", "Data"}:
|
||||
_get_schema_from_list(
|
||||
[value], table_name, full_name, nested_path, columns
|
||||
)
|
||||
elif this_type in {"list", "FlatList"}:
|
||||
np = listwrap(nested_path)
|
||||
newpath = unwraplist([join_field(split_field(np[0]) + [name])] + np)
|
||||
_get_schema_from_list(
|
||||
value, table_name, full_name, newpath, columns
|
||||
)
|
||||
|
||||
|
||||
METADATA_COLUMNS = (
|
||||
[
|
||||
Column(
|
||||
name=c,
|
||||
es_index="meta.columns",
|
||||
es_column=c,
|
||||
es_type="keyword",
|
||||
jx_type=STRING,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in [
|
||||
"name",
|
||||
"es_type",
|
||||
"jx_type",
|
||||
"nested_path",
|
||||
"es_column",
|
||||
"es_index",
|
||||
"partitions",
|
||||
]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name=c,
|
||||
es_index="meta.columns",
|
||||
es_column=c,
|
||||
es_type="integer",
|
||||
jx_type=INTEGER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in ["count", "cardinality", "multi"]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name="last_updated",
|
||||
es_index="meta.columns",
|
||||
es_column="last_updated",
|
||||
es_type="double",
|
||||
jx_type=NUMBER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def row_to_column(header, row):
|
||||
return Column(
|
||||
**{
|
||||
h: c
|
||||
if c is None or h not in ("nested_path", "partitions")
|
||||
else json2value(c)
|
||||
for h, c in zip(header, row)
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
all_columns = sql_list([quote_column(c.name) for c in METADATA_COLUMNS])
|
||||
|
||||
|
||||
SIMPLE_METADATA_COLUMNS = ( # FOR PURLY INTERNAL PYTHON LISTS, NOT MAPPING TO ANOTHER DATASTORE
|
||||
[
|
||||
Column(
|
||||
name=c,
|
||||
es_index="meta.columns",
|
||||
es_column=c,
|
||||
es_type="string",
|
||||
jx_type=STRING,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in ["table", "name", "type", "nested_path"]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name=c,
|
||||
es_index="meta.columns",
|
||||
es_column=c,
|
||||
es_type="long",
|
||||
jx_type=INTEGER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in ["count", "cardinality", "multi"]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name="last_updated",
|
||||
es_index="meta.columns",
|
||||
es_column="last_updated",
|
||||
es_type="time",
|
||||
jx_type=NUMBER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
_merge_order = {
|
||||
none_type: 0,
|
||||
NullType: 1,
|
||||
bool: 2,
|
||||
int: 3,
|
||||
long: 3,
|
||||
Date: 4,
|
||||
float: 5,
|
||||
text_type: 6,
|
||||
binary_type: 6,
|
||||
object: 7,
|
||||
dict: 8,
|
||||
Mapping: 9,
|
||||
Data: 10,
|
||||
list: 11,
|
||||
FlatList: 12,
|
||||
}
|
||||
|
||||
for k, v in items(_merge_order):
|
||||
_merge_order[k.__name__] = v
|
||||
|
||||
|
||||
def _merge_python_type(A, B):
|
||||
a = _merge_order[A]
|
||||
b = _merge_order[B]
|
||||
|
||||
if a >= b:
|
||||
output = A
|
||||
else:
|
||||
output = B
|
||||
|
||||
if isinstance(output, str):
|
||||
return output
|
||||
else:
|
||||
return output.__name__
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from mo_dots import listwrap
|
||||
|
||||
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
import jx_base
|
||||
from mo_dots import Data
|
||||
|
||||
|
|
|
@ -268,7 +268,7 @@ def _all_default(d, default, seen=None):
|
|||
_all_default(existing_value, default_value, seen)
|
||||
|
||||
|
||||
def _getdefault(obj, key):
|
||||
def _get_dict_default(obj, key):
|
||||
"""
|
||||
obj MUST BE A DICT
|
||||
key IS EXPECTED TO BE LITERAL (NO ESCAPING)
|
||||
|
@ -280,7 +280,28 @@ def _getdefault(obj, key):
|
|||
pass
|
||||
|
||||
try:
|
||||
return getattr(obj, key)
|
||||
if float(key) == round(float(key), 0):
|
||||
return obj[int(key)]
|
||||
except Exception as f:
|
||||
pass
|
||||
|
||||
return NullType(obj, key)
|
||||
|
||||
|
||||
def _getdefault(obj, key):
|
||||
"""
|
||||
obj ANY OBJECT
|
||||
key IS EXPECTED TO BE LITERAL (NO ESCAPING)
|
||||
TRY BOTH ATTRIBUTE AND ITEM ACCESS, OR RETURN Null
|
||||
"""
|
||||
try:
|
||||
return obj[key]
|
||||
except Exception as f:
|
||||
pass
|
||||
|
||||
try:
|
||||
if obj.__class__ is not dict:
|
||||
return getattr(obj, key)
|
||||
except Exception as f:
|
||||
pass
|
||||
|
||||
|
|
|
@ -147,7 +147,8 @@ class Data(MutableMapping):
|
|||
d[seq[-1]] = value
|
||||
return self
|
||||
except Exception as e:
|
||||
raise e
|
||||
from mo_logs import Log
|
||||
Log.error("can not set key={{key}}", key=key, cause=e)
|
||||
|
||||
def __getattr__(self, key):
|
||||
d = _get(self, SLOT)
|
||||
|
|
|
@ -13,6 +13,7 @@ from mo_dots import get_logger, is_data, wrap, zip as dict_zip
|
|||
from mo_future import get_function_arguments, get_function_defaults, get_function_name, text_type
|
||||
from mo_logs import Except
|
||||
|
||||
KWARGS = str("kwargs")
|
||||
|
||||
def override(func):
|
||||
"""
|
||||
|
@ -55,12 +56,12 @@ def override(func):
|
|||
)
|
||||
raise e
|
||||
|
||||
if "kwargs" not in params:
|
||||
if KWARGS not in params:
|
||||
# WE ASSUME WE ARE ONLY ADDING A kwargs PARAMETER TO SOME REGULAR METHOD
|
||||
def wo_kwargs(*args, **kwargs):
|
||||
settings = kwargs.get("kwargs")
|
||||
settings = kwargs.get(KWARGS)
|
||||
ordered_params = dict(zip(params, args))
|
||||
packed = params_pack(params, ordered_params, kwargs, settings, defaults)
|
||||
packed = params_pack(params, defaults, settings, kwargs, ordered_params)
|
||||
try:
|
||||
return func(**packed)
|
||||
except TypeError as e:
|
||||
|
@ -69,14 +70,14 @@ def override(func):
|
|||
|
||||
elif func_name in ("__init__", "__new__"):
|
||||
def w_constructor(*args, **kwargs):
|
||||
if "kwargs" in kwargs:
|
||||
packed = params_pack(params, dict_zip(params[1:], args[1:]), kwargs, kwargs["kwargs"], defaults)
|
||||
if KWARGS in kwargs:
|
||||
packed = params_pack(params, defaults, kwargs[KWARGS], kwargs, dict_zip(params[1:], args[1:]))
|
||||
elif len(args) == 2 and len(kwargs) == 0 and is_data(args[1]):
|
||||
# ASSUME SECOND UNNAMED PARAM IS kwargs
|
||||
packed = params_pack(params, args[1], defaults)
|
||||
packed = params_pack(params, defaults, args[1])
|
||||
else:
|
||||
# DO NOT INCLUDE self IN kwargs
|
||||
packed = params_pack(params, dict_zip(params[1:], args[1:]), kwargs, defaults)
|
||||
packed = params_pack(params, defaults, kwargs, dict_zip(params[1:], args[1:]))
|
||||
try:
|
||||
return func(args[0], **packed)
|
||||
except TypeError as e:
|
||||
|
@ -88,12 +89,12 @@ def override(func):
|
|||
def w_bound_method(*args, **kwargs):
|
||||
if len(args) == 2 and len(kwargs) == 0 and is_data(args[1]):
|
||||
# ASSUME SECOND UNNAMED PARAM IS kwargs
|
||||
packed = params_pack(params, args[1], defaults)
|
||||
elif "kwargs" in kwargs and is_data(kwargs["kwargs"]):
|
||||
packed = params_pack(params, defaults, args[1])
|
||||
elif KWARGS in kwargs and is_data(kwargs[KWARGS]):
|
||||
# PUT args INTO kwargs
|
||||
packed = params_pack(params, kwargs, dict_zip(params[1:], args[1:]), kwargs["kwargs"], defaults)
|
||||
packed = params_pack(params, defaults, kwargs[KWARGS], dict_zip(params[1:], args[1:]), kwargs)
|
||||
else:
|
||||
packed = params_pack(params, kwargs, dict_zip(params[1:], args[1:]), defaults)
|
||||
packed = params_pack(params, defaults, dict_zip(params[1:], args[1:]), kwargs)
|
||||
try:
|
||||
return func(args[0], **packed)
|
||||
except TypeError as e:
|
||||
|
@ -104,13 +105,13 @@ def override(func):
|
|||
def w_kwargs(*args, **kwargs):
|
||||
if len(args) == 1 and len(kwargs) == 0 and is_data(args[0]):
|
||||
# ASSUME SINGLE PARAMETER IS kwargs
|
||||
packed = params_pack(params, args[0], defaults)
|
||||
elif "kwargs" in kwargs and is_data(kwargs["kwargs"]):
|
||||
packed = params_pack(params, defaults, args[0])
|
||||
elif KWARGS in kwargs and is_data(kwargs[KWARGS]):
|
||||
# PUT args INTO kwargs
|
||||
packed = params_pack(params, kwargs, dict_zip(params, args), kwargs["kwargs"], defaults)
|
||||
packed = params_pack(params, defaults, kwargs[KWARGS], dict_zip(params, args), kwargs)
|
||||
else:
|
||||
# PULL kwargs OUT INTO PARAMS
|
||||
packed = params_pack(params, kwargs, dict_zip(params, args), defaults)
|
||||
packed = params_pack(params, defaults, dict_zip(params, args), kwargs)
|
||||
try:
|
||||
return func(**packed)
|
||||
except TypeError as e:
|
||||
|
@ -120,15 +121,13 @@ def override(func):
|
|||
|
||||
def params_pack(params, *args):
|
||||
settings = {}
|
||||
for a in reversed(args):
|
||||
if a == None:
|
||||
continue
|
||||
for a in args:
|
||||
for k, v in a.items():
|
||||
settings[str(k)] = None if v == None else v
|
||||
settings["kwargs"] = settings
|
||||
settings[str(k)] = v
|
||||
settings[KWARGS] = wrap(settings)
|
||||
|
||||
output = {
|
||||
str(k): settings[k] if k != "kwargs" else wrap(settings)
|
||||
k: settings[k]
|
||||
for k in params
|
||||
if k in settings
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ class StructuredLogger_usingElasticSearch(StructuredLogger):
|
|||
schema=schema,
|
||||
limit_replicas=True,
|
||||
typed=True,
|
||||
read_only=False,
|
||||
kwargs=kwargs,
|
||||
)
|
||||
self.batch_size = batch_size
|
||||
|
|
|
@ -20,7 +20,7 @@ import re
|
|||
import string
|
||||
|
||||
from mo_dots import Data, coalesce, get_module, is_data, is_list, wrap, is_sequence
|
||||
from mo_future import PY3, get_function_name, is_binary, is_text, round as _round, text_type, transpose, xrange, zip_longest
|
||||
from mo_future import PY3, get_function_name, is_binary, is_text, round as _round, text_type, transpose, xrange, zip_longest, binary_type
|
||||
from mo_logs.convert import datetime2string, datetime2unix, milli2datetime, unix2datetime, value2json
|
||||
|
||||
FORMATTERS = {}
|
||||
|
@ -678,7 +678,7 @@ def toString(val):
|
|||
return text_type(round(duration, 3)) + " seconds"
|
||||
elif is_text(val):
|
||||
return val
|
||||
elif isinstance(val, str):
|
||||
elif isinstance(val, binary_type):
|
||||
try:
|
||||
return val.decode('utf8')
|
||||
except Exception as _:
|
||||
|
|
|
@ -13,11 +13,10 @@
|
|||
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from copy import copy
|
||||
from datetime import datetime, timedelta
|
||||
import signal as _signal
|
||||
import sys
|
||||
from copy import copy
|
||||
from datetime import datetime, timedelta
|
||||
from time import sleep
|
||||
|
||||
from mo_dots import Data, coalesce, unwraplist
|
||||
|
@ -30,6 +29,8 @@ from mo_threads.till import Till
|
|||
|
||||
DEBUG = False
|
||||
|
||||
PLEASE_STOP = str("please_stop") # REQUIRED thread PARAMETER TO SIGNAL STOP
|
||||
PARENT_THREAD = str("parent_thread") # OPTIONAL PARAMETER TO ASSIGN THREAD TO SOMETHING OTHER THAN CURRENT THREAD
|
||||
MAX_DATETIME = datetime(2286, 11, 20, 17, 46, 39)
|
||||
DEFAULT_WAIT_TIME = timedelta(minutes=10)
|
||||
THREAD_STOP = "stop"
|
||||
|
@ -215,15 +216,15 @@ class Thread(BaseThread):
|
|||
|
||||
# ENSURE THERE IS A SHARED please_stop SIGNAL
|
||||
self.kwargs = copy(kwargs)
|
||||
self.kwargs["please_stop"] = self.kwargs.get("please_stop", Signal("please_stop for " + self.name))
|
||||
self.please_stop = self.kwargs["please_stop"]
|
||||
self.kwargs[PLEASE_STOP] = self.kwargs.get(PLEASE_STOP, Signal("please_stop for " + self.name))
|
||||
self.please_stop = self.kwargs[PLEASE_STOP]
|
||||
|
||||
self.thread = None
|
||||
self.stopped = Signal("stopped signal for " + self.name)
|
||||
|
||||
if "parent_thread" in kwargs:
|
||||
del self.kwargs["parent_thread"]
|
||||
self.parent = kwargs["parent_thread"]
|
||||
if PARENT_THREAD in kwargs:
|
||||
del self.kwargs[PARENT_THREAD]
|
||||
self.parent = kwargs[PARENT_THREAD]
|
||||
else:
|
||||
self.parent = Thread.current()
|
||||
self.parent.add_child(self)
|
||||
|
@ -339,7 +340,7 @@ class Thread(BaseThread):
|
|||
# ENSURE target HAS please_stop ARGUMENT
|
||||
if get_function_name(target) == 'wrapper':
|
||||
pass # GIVE THE override DECORATOR A PASS
|
||||
elif "please_stop" not in target.__code__.co_varnames:
|
||||
elif PLEASE_STOP not in target.__code__.co_varnames:
|
||||
Log.error("function must have please_stop argument for signalling emergency shutdown")
|
||||
|
||||
Thread.num_threads += 1
|
||||
|
|
|
@ -12,9 +12,9 @@ from __future__ import absolute_import, division, unicode_literals
|
|||
from copy import deepcopy
|
||||
import re
|
||||
|
||||
from jx_base import Column
|
||||
from jx_python import jx
|
||||
from jx_python.meta import Column
|
||||
from mo_dots import Data, FlatList, Null, ROOT_PATH, SLOT, coalesce, concat_field, is_data, is_list, listwrap, literal_field, set_default, split_field, wrap
|
||||
from mo_dots import Data, FlatList, Null, ROOT_PATH, SLOT, coalesce, concat_field, is_data, is_list, listwrap, literal_field, set_default, split_field, wrap, unwrap
|
||||
from mo_files.url import URL
|
||||
from mo_future import binary_type, is_binary, is_text, items, text_type
|
||||
from mo_json import BOOLEAN, EXISTS, NESTED, NUMBER, OBJECT, STRING, json2value, value2json
|
||||
|
@ -98,7 +98,7 @@ class Index(Features):
|
|||
Log.error("not allowed")
|
||||
if type == None:
|
||||
# NO type PROVIDED, MAYBE THERE IS A SUITABLE DEFAULT?
|
||||
about = self.cluster.get_metadata().indices[self.settings.index]
|
||||
about = self.cluster.get_metadata().indices[literal_field(self.settings.index)]
|
||||
type = self.settings.type = _get_best_type_from_mapping(about.mappings)[0]
|
||||
if type == "_default_":
|
||||
Log.error("not allowed")
|
||||
|
@ -146,7 +146,7 @@ class Index(Features):
|
|||
def get_properties(self, retry=True):
|
||||
if self.settings.explore_metadata:
|
||||
metadata = self.cluster.get_metadata()
|
||||
index = metadata.indices[self.settings.index]
|
||||
index = metadata.indices[literal_field(self.settings.index)]
|
||||
|
||||
if index == None and retry:
|
||||
#TRY AGAIN, JUST IN CASE
|
||||
|
@ -204,7 +204,7 @@ class Index(Features):
|
|||
# WAIT FOR ALIAS TO APPEAR
|
||||
while True:
|
||||
metadata = self.cluster.get_metadata(force=True)
|
||||
if alias in metadata.indices[self.settings.index].aliases:
|
||||
if alias in metadata.indices[literal_field(self.settings.index)].aliases:
|
||||
return
|
||||
Log.note("Waiting for alias {{alias}} to appear", alias=alias)
|
||||
Till(seconds=1).wait()
|
||||
|
@ -294,6 +294,18 @@ class Index(Features):
|
|||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
def delete_id(self, id):
|
||||
result = self.cluster.delete(
|
||||
path = self.path + "/" + id,
|
||||
timeout=600,
|
||||
# params={"wait_for_active_shards": wait_for_active_shards}
|
||||
)
|
||||
if result.failures:
|
||||
Log.error("Failure to delete fom {{index}}:\n{{data|pretty}}", index=self.settings.index, data=result)
|
||||
|
||||
|
||||
|
||||
|
||||
def extend(self, records):
|
||||
"""
|
||||
records - MUST HAVE FORM OF
|
||||
|
@ -309,8 +321,6 @@ class Index(Features):
|
|||
if '_id' in r or 'value' not in r: # I MAKE THIS MISTAKE SO OFTEN, I NEED A CHECK
|
||||
Log.error('Expecting {"id":id, "value":document} form. Not expecting _id')
|
||||
id, version, json_bytes = self.encode(r)
|
||||
if '"_id":' in json_bytes:
|
||||
id, version, json_bytes = self.encode(r)
|
||||
|
||||
if version:
|
||||
lines.append(value2json({"index": {"_id": id, "version": int(version), "version_type": "external_gte"}}))
|
||||
|
@ -597,7 +607,7 @@ class Cluster(object):
|
|||
|
||||
index = kwargs.index
|
||||
meta = self.get_metadata()
|
||||
type, about = _get_best_type_from_mapping(meta.indices[index].mappings)
|
||||
type, about = _get_best_type_from_mapping(meta.indices[literal_field(index)].mappings)
|
||||
|
||||
if typed == None:
|
||||
typed = True
|
||||
|
@ -613,7 +623,7 @@ class Cluster(object):
|
|||
return Index(kwargs=kwargs, cluster=self)
|
||||
|
||||
@override
|
||||
def get_index(self, index, type, alias=None, typed=None, read_only=True, kwargs=None):
|
||||
def get_index(self, index, alias=None, typed=None, read_only=True, kwargs=None):
|
||||
"""
|
||||
TESTS THAT THE INDEX EXISTS BEFORE RETURNING A HANDLE
|
||||
"""
|
||||
|
@ -785,9 +795,10 @@ class Cluster(object):
|
|||
if schema.settings.index.number_of_replicas >= health.number_of_nodes:
|
||||
if limit_replicas_warning:
|
||||
Log.warning(
|
||||
"Reduced number of replicas: {{from}} requested, {{to}} realized",
|
||||
"Reduced number of replicas for {{index}}: {{from}} requested, {{to}} realized",
|
||||
{"from": schema.settings.index.number_of_replicas},
|
||||
to=health.number_of_nodes - 1
|
||||
to=health.number_of_nodes - 1,
|
||||
index=index
|
||||
)
|
||||
schema.settings.index.number_of_replicas = health.number_of_nodes - 1
|
||||
|
||||
|
@ -802,7 +813,7 @@ class Cluster(object):
|
|||
while not Till(seconds=30):
|
||||
try:
|
||||
metadata = self.get_metadata(force=True)
|
||||
if index in metadata.indices:
|
||||
if index in metadata.indices.keys():
|
||||
break
|
||||
Log.note("Waiting for index {{index}} to appear", index=index)
|
||||
except Exception as e:
|
||||
|
@ -864,7 +875,7 @@ class Cluster(object):
|
|||
with self.metadata_locker:
|
||||
self._metadata = wrap(response.metadata)
|
||||
for new_index_name, new_meta in self._metadata.indices.items():
|
||||
old_index = old_indices[new_index_name]
|
||||
old_index = old_indices[literal_field(new_index_name)]
|
||||
if not old_index:
|
||||
DEBUG_METADATA_UPDATE and Log.note("New index found {{index}} at {{time}}", index=new_index_name, time=now)
|
||||
self.index_last_updated[new_index_name] = now
|
||||
|
@ -876,7 +887,7 @@ class Cluster(object):
|
|||
DEBUG_METADATA_UPDATE and Log.note("More columns found in {{index}} at {{time}}", index=new_index_name, time=now)
|
||||
self.index_last_updated[new_index_name] = now
|
||||
for old_index_name, old_meta in old_indices.items():
|
||||
new_index = self._metadata.indices[old_index_name]
|
||||
new_index = self._metadata.indices[literal_field(old_index_name)]
|
||||
if not new_index:
|
||||
DEBUG_METADATA_UPDATE and Log.note("Old index lost: {{index}} at {{time}}", index=old_index_name, time=now)
|
||||
self.index_last_updated[old_index_name] = now
|
||||
|
@ -1018,6 +1029,9 @@ class Cluster(object):
|
|||
response = http.put(url, **kwargs)
|
||||
if response.status_code not in [200]:
|
||||
Log.error(response.reason + ": " + utf82unicode(response.content))
|
||||
if not response.content:
|
||||
return Null
|
||||
|
||||
self.debug and Log.note("response: {{response}}", response=utf82unicode(response.content)[0:300:])
|
||||
|
||||
details = json2value(utf82unicode(response.content))
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from jx_python import jx
|
||||
from mo_dots import Data, ROOT_PATH, is_data, unwrap
|
||||
from mo_json import NESTED, OBJECT, json2value
|
||||
|
|
|
@ -349,7 +349,7 @@ class MySQL(object):
|
|||
Log.error("Expecting transaction to be started before issuing queries")
|
||||
|
||||
if param:
|
||||
sql = expand_template(sql, quote_param(param))
|
||||
sql = expand_template(text_type(sql), quote_param(param))
|
||||
sql = outdent(sql)
|
||||
self.backlog.append(sql)
|
||||
if self.debug or len(self.backlog) >= MAX_BATCH_SIZE:
|
||||
|
|
|
@ -83,6 +83,11 @@ class FakeES():
|
|||
v["id"]: v["value"] if "value" in v else mo_json.json2value(v['json'])
|
||||
for v in records
|
||||
}
|
||||
for r in records.values():
|
||||
try:
|
||||
del r['etl']
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
unwrap(self.data).update(records)
|
||||
self.refresh()
|
||||
|
|
Загрузка…
Ссылка в новой задаче