lib updates
This commit is contained in:
Родитель
f6a2f41706
Коммит
e356ad1fb0
|
@ -9,11 +9,10 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from copy import copy
|
||||
|
||||
from mo_dots import Data, is_data, join_field, set_default, split_field, wrap, is_many
|
||||
from mo_future import generator_types, text_type
|
||||
from mo_dots import Data, is_data, is_many, join_field, set_default, split_field, wrap
|
||||
from mo_future import is_text
|
||||
from mo_logs import Log
|
||||
|
||||
type2container = Data()
|
||||
|
@ -46,7 +45,7 @@ class Container(object):
|
|||
"""
|
||||
CONTAINERS HOLD MULTIPLE FACTS AND CAN HANDLE
|
||||
GENERAL JSON QUERY EXPRESSIONS ON ITS CONTENTS
|
||||
METADATA FOR A Container IS CALL A Namespace
|
||||
METADATA FOR A Container IS CALLED A Namespace
|
||||
"""
|
||||
|
||||
|
||||
|
|
|
@ -19,15 +19,14 @@ LANGUAGE, BUT WE KEEP CODE HERE SO THERE IS LESS OF IT
|
|||
"""
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from decimal import Decimal
|
||||
import operator
|
||||
import re
|
||||
|
||||
from jx_base.utils import get_property_name, is_variable_name
|
||||
from jx_base.language import BaseExpression, TYPE_ORDER, define_language, is_expression, is_op, value_compare, ID
|
||||
from mo_dots import Null, coalesce, is_data, is_list, split_field, wrap, is_sequence
|
||||
from mo_future import first, get_function_name, is_text, items as items_, text_type, utf8_json_encoder, zip_longest
|
||||
import mo_json
|
||||
from jx_base.language import BaseExpression, ID, TYPE_ORDER, define_language, is_expression, is_op, value_compare
|
||||
from jx_base.utils import get_property_name, is_variable_name
|
||||
from mo_dots import Null, coalesce, is_data, is_list, is_sequence, split_field, wrap
|
||||
from mo_future import first, get_function_name, is_text, items as items_, text_type, utf8_json_encoder, zip_longest
|
||||
from mo_json import BOOLEAN, INTEGER, IS_NULL, NUMBER, OBJECT, STRING, python_type_to_json_type, scrub
|
||||
from mo_json.typed_encoder import inserter_type_to_json_type
|
||||
from mo_logs import Except, Log
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
class Facts(object):
|
||||
"""
|
||||
REPRESENT A HIERARCHICAL DATASTORE: MULTIPLE TABLES IN A DATABASE ALONG
|
||||
|
|
|
@ -14,7 +14,7 @@ from math import isnan
|
|||
|
||||
from mo_dots import Data, data_types, listwrap
|
||||
from mo_dots.lists import list_types
|
||||
from mo_future import boolean_type, long, none_type, text_type
|
||||
from mo_future import boolean_type, long, none_type, text_type, transpose
|
||||
from mo_logs import Log
|
||||
from mo_times import Date
|
||||
|
||||
|
@ -113,7 +113,7 @@ def define_language(lang_name, module_vars):
|
|||
|
||||
if lang_name:
|
||||
# ENSURE THE ALL OPS ARE DEFINED ON THE NEW LANGUAGE
|
||||
for base_op, new_op in list(zip(JX.ops, language.ops)):
|
||||
for base_op, new_op in transpose(JX.ops, language.ops):
|
||||
if new_op is base_op:
|
||||
# MISSED DEFINITION, ADD ONE
|
||||
new_op = type(base_op.__name__, (base_op,), {})
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from jx_base.query import QueryOp
|
||||
from mo_dots import is_data
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ def _late_import():
|
|||
global _jx
|
||||
global _Column
|
||||
|
||||
from jx_python.meta import Column as _Column
|
||||
from jx_base import Column as _Column
|
||||
from jx_python import jx as _jx
|
||||
|
||||
_ = _jx
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from copy import copy
|
||||
|
||||
from mo_dots import Null, relative_field, set_default, startswith_field, wrap
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
class Snowflake(object):
|
||||
"""
|
||||
REPRESENT ONE ALIAS, AND ITS NESTED ARRAYS
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
class Table(object):
|
||||
|
||||
def __init__(self, full_name):
|
||||
|
|
|
@ -71,6 +71,11 @@ class ES52(Container):
|
|||
else:
|
||||
self.es = elasticsearch.Cluster(kwargs=kwargs).get_index(read_only=read_only, kwargs=kwargs)
|
||||
|
||||
self.es.cluster.put("/" + self.es.settings.index + "/_settings", data={"index": {
|
||||
"max_inner_result_window": 100000,
|
||||
"max_result_window": 100000
|
||||
}})
|
||||
|
||||
self._namespace = ElasticsearchMetadata(kwargs=kwargs)
|
||||
self.settings.type = self.es.settings.type
|
||||
self.edges = Data()
|
||||
|
|
|
@ -14,15 +14,14 @@ from datetime import date, datetime
|
|||
from decimal import Decimal
|
||||
|
||||
import jx_base
|
||||
from jx_base import TableDesc
|
||||
from jx_base import TableDesc, Column
|
||||
from jx_base.namespace import Namespace
|
||||
from jx_base.query import QueryOp
|
||||
from jx_elasticsearch.meta_columns import ColumnList
|
||||
from jx_python import jx
|
||||
from jx_python.containers.list_usingPythonList import ListContainer
|
||||
from jx_python.meta import Column, ColumnList
|
||||
from mo_dots import Data, FlatList, Null, NullType, ROOT_PATH, coalesce, concat_field, is_list, literal_field, relative_field, set_default, split_field, startswith_field, tail_field, wrap
|
||||
from mo_files import URL
|
||||
from mo_future import PY2, none_type, text_type
|
||||
from mo_future import long, none_type, text_type
|
||||
from mo_json import BOOLEAN, EXISTS, INTEGER, OBJECT, STRING, STRUCT
|
||||
from mo_json.typed_encoder import BOOLEAN_TYPE, EXISTS_TYPE, NUMBER_TYPE, STRING_TYPE, unnest_path, untype_path
|
||||
from mo_kwargs import override
|
||||
|
@ -78,7 +77,7 @@ class ElasticsearchMetadata(Namespace):
|
|||
self.metadata_last_updated = Date.now() - OLD_METADATA
|
||||
|
||||
self.meta = Data()
|
||||
self.meta.columns = ColumnList(URL(self.es_cluster.settings.host).host)
|
||||
self.meta.columns = ColumnList(self.es_cluster)
|
||||
|
||||
self.alias_to_query_paths = {
|
||||
"meta.columns": [ROOT_PATH],
|
||||
|
@ -143,7 +142,7 @@ class ElasticsearchMetadata(Namespace):
|
|||
for d in diff:
|
||||
dirty = True
|
||||
i1.add_property(*d)
|
||||
meta = self.es_cluster.get_metadata(force=dirty).indices[canonical_index]
|
||||
meta = self.es_cluster.get_metadata(force=dirty).indices[literal_field(canonical_index)]
|
||||
|
||||
data_type, mapping = _get_best_type_from_mapping(meta.mappings)
|
||||
mapping.properties["_id"] = {"type": "string", "index": "not_analyzed"}
|
||||
|
@ -314,19 +313,6 @@ class ElasticsearchMetadata(Namespace):
|
|||
if column.jx_type in STRUCT:
|
||||
Log.error("not supported")
|
||||
try:
|
||||
if column.es_index == "meta.columns":
|
||||
partitions = jx.sort([g[column.es_column] for g, _ in jx.groupby(self.meta.columns, column.es_column) if g[column.es_column] != None])
|
||||
self.meta.columns.update({
|
||||
"set": {
|
||||
"partitions": partitions,
|
||||
"count": len(self.meta.columns),
|
||||
"cardinality": len(partitions),
|
||||
"multi": 1,
|
||||
"last_updated": now
|
||||
},
|
||||
"where": {"eq": {"es_index": column.es_index, "es_column": column.es_column}}
|
||||
})
|
||||
return
|
||||
if column.es_index == "meta.tables":
|
||||
partitions = jx.sort([g[column.es_column] for g, _ in jx.groupby(self.meta.tables, column.es_column) if g[column.es_column] != None])
|
||||
self.meta.columns.update({
|
||||
|
@ -552,7 +538,7 @@ class ElasticsearchMetadata(Namespace):
|
|||
continue
|
||||
elif column.last_updated > Date.now() - TOO_OLD and column.cardinality is not None:
|
||||
# DO NOT UPDATE FRESH COLUMN METADATA
|
||||
DEBUG and Log.note("{{column.es_column}} is still fresh ({{ago}} ago)", column=column, ago=(Date.now()-Date(column.last_updated)).seconds)
|
||||
DEBUG and Log.note("{{column.es_column}} is still fresh ({{ago}} ago)", column=column, ago=(Date.now()-Date(column.last_updated)))
|
||||
continue
|
||||
try:
|
||||
self._update_cardinality(column)
|
||||
|
@ -904,6 +890,7 @@ python_type_to_es_type = {
|
|||
str: "string",
|
||||
text_type: "string",
|
||||
int: "integer",
|
||||
long: "integer",
|
||||
float: "double",
|
||||
Data: "object",
|
||||
dict: "object",
|
||||
|
@ -916,9 +903,6 @@ python_type_to_es_type = {
|
|||
date: "double"
|
||||
}
|
||||
|
||||
if PY2:
|
||||
python_type_to_es_type[long] = "integer"
|
||||
|
||||
_merge_es_type = {
|
||||
"undefined": {
|
||||
"undefined": "undefined",
|
||||
|
|
|
@ -0,0 +1,662 @@
|
|||
# encoding: utf-8
|
||||
#
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
# You can obtain one at http:# mozilla.org/MPL/2.0/.
|
||||
#
|
||||
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
|
||||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from collections import Mapping
|
||||
|
||||
import jx_base
|
||||
from jx_base import Column, Table
|
||||
from jx_base.schema import Schema
|
||||
from jx_python import jx
|
||||
from mo_collections import UniqueIndex
|
||||
from mo_dots import (
|
||||
Data,
|
||||
FlatList,
|
||||
Null,
|
||||
NullType,
|
||||
ROOT_PATH,
|
||||
concat_field,
|
||||
is_container,
|
||||
is_data,
|
||||
is_list,
|
||||
join_field,
|
||||
listwrap,
|
||||
split_field,
|
||||
unwraplist,
|
||||
wrap,
|
||||
)
|
||||
from mo_future import binary_type, items, long, none_type, reduce, text_type
|
||||
from mo_json import INTEGER, NUMBER, STRING, STRUCT, python_type_to_json_type
|
||||
from mo_json.typed_encoder import unnest_path, untype_path, untyped
|
||||
from mo_logs import Except, Log
|
||||
from mo_threads import Lock, Queue, Thread, Till
|
||||
from mo_times.dates import Date
|
||||
|
||||
DEBUG = True
|
||||
singlton = None
|
||||
META_INDEX_NAME = "meta.columns"
|
||||
META_TYPE_NAME = "column"
|
||||
|
||||
DELETE_INDEX, UPDATE, DELETE = "delete_index", "update", "delete"
|
||||
|
||||
|
||||
class ColumnList(Table, jx_base.Container):
|
||||
"""
|
||||
OPTIMIZED FOR THE PARTICULAR ACCESS PATTERNS USED
|
||||
"""
|
||||
|
||||
def __init__(self, es_cluster):
|
||||
Table.__init__(self, META_INDEX_NAME)
|
||||
self.data = {} # MAP FROM ES_INDEX TO (abs_column_name to COLUMNS)
|
||||
self.locker = Lock()
|
||||
self._schema = None
|
||||
self.dirty = False
|
||||
self.es_cluster = es_cluster
|
||||
self.es_index = None
|
||||
self.last_load = Null
|
||||
self.todo = Queue(
|
||||
"update columns to db"
|
||||
) # HOLD (action, column) PAIR, WHERE action in ['insert', 'update']
|
||||
self._db_load()
|
||||
Thread.run("update " + META_INDEX_NAME, self._db_worker)
|
||||
|
||||
def _query(self, query):
|
||||
result = Data()
|
||||
curr = self.es_cluster.execute(query)
|
||||
result.meta.format = "table"
|
||||
result.header = [d[0] for d in curr.description] if curr.description else None
|
||||
result.data = curr.fetchall()
|
||||
return result
|
||||
|
||||
def _db_create(self):
|
||||
schema = {
|
||||
"settings": {"index.number_of_shards": 1, "index.number_of_replicas": 2},
|
||||
"mappings": {META_TYPE_NAME: {}},
|
||||
}
|
||||
|
||||
self.es_index = self.es_cluster.create_index(
|
||||
index=META_INDEX_NAME, schema=schema
|
||||
)
|
||||
self.es_index.add_alias(META_INDEX_NAME)
|
||||
|
||||
for c in METADATA_COLUMNS:
|
||||
self._add(c)
|
||||
self.es_index.add({"id": get_id(c), "value": c.__dict__()})
|
||||
|
||||
def _db_load(self):
|
||||
self.last_load = Date.now()
|
||||
|
||||
try:
|
||||
self.es_index = self.es_cluster.get_index(
|
||||
index=META_INDEX_NAME, type=META_TYPE_NAME, read_only=False
|
||||
)
|
||||
|
||||
result = self.es_index.search(
|
||||
{
|
||||
"query": {"match_all": {}},
|
||||
"sort": ["es_index.~s~", "name.~s~", "es_column.~s~"],
|
||||
"size": 10000
|
||||
}
|
||||
)
|
||||
|
||||
DEBUG and Log.note("{{num}} columns loaded", num=result.hits.total)
|
||||
with self.locker:
|
||||
for r in result.hits.hits._source:
|
||||
self._add(doc_to_column(r))
|
||||
|
||||
except Exception as e:
|
||||
Log.warning(
|
||||
"no {{index}} exists, making one", index=META_INDEX_NAME, cause=e
|
||||
)
|
||||
self._db_create()
|
||||
|
||||
def _db_worker(self, please_stop):
|
||||
batch_size = 10000
|
||||
while not please_stop:
|
||||
try:
|
||||
result = self.es_index.search(
|
||||
{
|
||||
"query": {
|
||||
"range": {"last_updated.~n~": {"gt": self.last_load}}
|
||||
},
|
||||
"sort": ["es_index.~s~", "name.~s~", "es_column.~s~"],
|
||||
"from": 0,
|
||||
"size": batch_size,
|
||||
}
|
||||
)
|
||||
batch_size = 10
|
||||
|
||||
with self.locker:
|
||||
for r in result.hits.hits._source:
|
||||
c = doc_to_column(r)
|
||||
self._add(c)
|
||||
if c.last_updated > self.last_load:
|
||||
self.last_load = c.last_updated
|
||||
|
||||
updates = self.todo.pop_all()
|
||||
DEBUG and updates and Log.note(
|
||||
"{{num}} columns to push to db", num=len(updates)
|
||||
)
|
||||
for action, column in updates:
|
||||
if please_stop:
|
||||
return
|
||||
try:
|
||||
DEBUG and Log.note(
|
||||
"{{action}} db for {{table}}.{{column}}",
|
||||
action=action,
|
||||
table=column.es_index,
|
||||
column=column.es_column,
|
||||
)
|
||||
if action is DELETE_INDEX:
|
||||
self.es_index.delete_record(
|
||||
{"term": {"es_index.~s~": column}}
|
||||
)
|
||||
elif action is UPDATE:
|
||||
self.es_index.add(
|
||||
{"id": get_id(column), "value": column.__dict__()}
|
||||
)
|
||||
elif action is DELETE:
|
||||
self.es_index.delete_id(get_id(column))
|
||||
except Exception as e:
|
||||
e = Except.wrap(e)
|
||||
Log.warning("problem updataing database", cause=e)
|
||||
|
||||
except Exception as e:
|
||||
Log.warning("problem updating database", cause=e)
|
||||
|
||||
(Till(seconds=10) | please_stop).wait()
|
||||
|
||||
def __copy__(self):
|
||||
output = object.__new__(ColumnList)
|
||||
Table.__init__(output, META_INDEX_NAME)
|
||||
output.data = {
|
||||
t: {c: list(cs) for c, cs in dd.items()} for t, dd in self.data.items()
|
||||
}
|
||||
output.locker = Lock()
|
||||
output._schema = None
|
||||
return output
|
||||
|
||||
def find(self, es_index, abs_column_name=None):
|
||||
with self.locker:
|
||||
if es_index.startswith("meta."):
|
||||
self._update_meta()
|
||||
|
||||
if not abs_column_name:
|
||||
return [c for cs in self.data.get(es_index, {}).values() for c in cs]
|
||||
else:
|
||||
return self.data.get(es_index, {}).get(abs_column_name, [])
|
||||
|
||||
def extend(self, columns):
|
||||
self.dirty = True
|
||||
with self.locker:
|
||||
for column in columns:
|
||||
self._add(column)
|
||||
|
||||
def add(self, column):
|
||||
self.dirty = True
|
||||
with self.locker:
|
||||
canonical = self._add(column)
|
||||
if canonical == None:
|
||||
return column # ALREADY ADDED
|
||||
self.todo.add((UPDATE, canonical))
|
||||
return canonical
|
||||
|
||||
def remove_table(self, table_name):
|
||||
del self.data[table_name]
|
||||
|
||||
def _add(self, column):
|
||||
"""
|
||||
:param column: ANY COLUMN OBJECT
|
||||
:return: None IF column IS canonical ALREADY (NET-ZERO EFFECT)
|
||||
"""
|
||||
columns_for_table = self.data.setdefault(column.es_index, {})
|
||||
existing_columns = columns_for_table.setdefault(column.name, [])
|
||||
|
||||
for canonical in existing_columns:
|
||||
if canonical is column:
|
||||
return None
|
||||
if canonical.es_type == column.es_type:
|
||||
if column.last_updated > canonical.last_updated:
|
||||
for key in Column.__slots__:
|
||||
old_value = canonical[key]
|
||||
new_value = column[key]
|
||||
if new_value == None:
|
||||
pass # DO NOT BOTHER CLEARING OLD VALUES (LIKE cardinality AND paritiions)
|
||||
elif new_value == old_value:
|
||||
pass # NO NEED TO UPDATE WHEN NO CHANGE MADE (COMMON CASE)
|
||||
else:
|
||||
canonical[key] = new_value
|
||||
return canonical
|
||||
existing_columns.append(column)
|
||||
return column
|
||||
|
||||
def _update_meta(self):
|
||||
if not self.dirty:
|
||||
return
|
||||
|
||||
for mcl in self.data.get(META_INDEX_NAME).values():
|
||||
for mc in mcl:
|
||||
count = 0
|
||||
values = set()
|
||||
objects = 0
|
||||
multi = 1
|
||||
for column in self._all_columns():
|
||||
value = column[mc.name]
|
||||
if value == None:
|
||||
pass
|
||||
else:
|
||||
count += 1
|
||||
if is_list(value):
|
||||
multi = max(multi, len(value))
|
||||
try:
|
||||
values |= set(value)
|
||||
except Exception:
|
||||
objects += len(value)
|
||||
elif is_data(value):
|
||||
objects += 1
|
||||
else:
|
||||
values.add(value)
|
||||
mc.count = count
|
||||
mc.cardinality = len(values) + objects
|
||||
mc.partitions = jx.sort(values)
|
||||
mc.multi = multi
|
||||
mc.last_updated = Date.now()
|
||||
self.dirty = False
|
||||
|
||||
def _all_columns(self):
|
||||
return [
|
||||
column
|
||||
for t, cs in self.data.items()
|
||||
for _, css in cs.items()
|
||||
for column in css
|
||||
]
|
||||
|
||||
def __iter__(self):
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
return iter(self._all_columns())
|
||||
|
||||
def __len__(self):
|
||||
return self.data[META_INDEX_NAME]["es_index"].count
|
||||
|
||||
def update(self, command):
|
||||
self.dirty = True
|
||||
try:
|
||||
command = wrap(command)
|
||||
DEBUG and Log.note(
|
||||
"Update {{timestamp}}: {{command|json}}",
|
||||
command=command,
|
||||
timestamp=Date(command["set"].last_updated),
|
||||
)
|
||||
eq = command.where.eq
|
||||
if eq.es_index:
|
||||
if len(eq) == 1:
|
||||
if unwraplist(command.clear) == ".":
|
||||
with self.locker:
|
||||
del self.data[eq.es_index]
|
||||
self.todo.add((DELETE_INDEX, eq.es_index))
|
||||
return
|
||||
|
||||
# FASTEST
|
||||
all_columns = self.data.get(eq.es_index, {}).values()
|
||||
with self.locker:
|
||||
columns = [c for cs in all_columns for c in cs]
|
||||
elif eq.es_column and len(eq) == 2:
|
||||
# FASTER
|
||||
all_columns = self.data.get(eq.es_index, {}).values()
|
||||
with self.locker:
|
||||
columns = [
|
||||
c
|
||||
for cs in all_columns
|
||||
for c in cs
|
||||
if c.es_column == eq.es_column
|
||||
]
|
||||
|
||||
else:
|
||||
# SLOWER
|
||||
all_columns = self.data.get(eq.es_index, {}).values()
|
||||
with self.locker:
|
||||
columns = [
|
||||
c
|
||||
for cs in all_columns
|
||||
for c in cs
|
||||
if all(
|
||||
c[k] == v for k, v in eq.items()
|
||||
) # THIS LINE IS VERY SLOW
|
||||
]
|
||||
else:
|
||||
columns = list(self)
|
||||
columns = jx.filter(columns, command.where)
|
||||
|
||||
with self.locker:
|
||||
for col in columns:
|
||||
DEBUG and Log.note(
|
||||
"update column {{table}}.{{column}}",
|
||||
table=col.es_index,
|
||||
column=col.es_column,
|
||||
)
|
||||
for k in command["clear"]:
|
||||
if k == ".":
|
||||
self.todo.add((DELETE, col))
|
||||
lst = self.data[col.es_index]
|
||||
cols = lst[col.name]
|
||||
cols.remove(col)
|
||||
if len(cols) == 0:
|
||||
del lst[col.name]
|
||||
if len(lst) == 0:
|
||||
del self.data[col.es_index]
|
||||
break
|
||||
else:
|
||||
col[k] = None
|
||||
else:
|
||||
# DID NOT DELETE COLUMNM ("."), CONTINUE TO SET PROPERTIES
|
||||
for k, v in command.set.items():
|
||||
col[k] = v
|
||||
self.todo.add((UPDATE, col))
|
||||
|
||||
except Exception as e:
|
||||
Log.error("should not happen", cause=e)
|
||||
|
||||
def query(self, query):
|
||||
# NOT EXPECTED TO BE RUN
|
||||
Log.error("not")
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
if not self._schema:
|
||||
self._schema = Schema(
|
||||
".", [c for cs in self.data[META_INDEX_NAME].values() for c in cs]
|
||||
)
|
||||
snapshot = self._all_columns()
|
||||
|
||||
from jx_python.containers.list_usingPythonList import ListContainer
|
||||
|
||||
query.frum = ListContainer(META_INDEX_NAME, snapshot, self._schema)
|
||||
return jx.run(query)
|
||||
|
||||
def groupby(self, keys):
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
return jx.groupby(self.__iter__(), keys)
|
||||
|
||||
def window(self, window):
|
||||
raise NotImplemented()
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
if not self._schema:
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
self._schema = Schema(
|
||||
".", [c for cs in self.data[META_INDEX_NAME].values() for c in cs]
|
||||
)
|
||||
return self._schema
|
||||
|
||||
@property
|
||||
def namespace(self):
|
||||
return self
|
||||
|
||||
def get_table(self, table_name):
|
||||
if table_name != META_INDEX_NAME:
|
||||
Log.error("this container has only the " + META_INDEX_NAME)
|
||||
return self
|
||||
|
||||
def get_columns(self, table_name):
|
||||
if table_name != META_INDEX_NAME:
|
||||
Log.error("this container has only the " + META_INDEX_NAME)
|
||||
return self._all_columns()
|
||||
|
||||
def denormalized(self):
|
||||
"""
|
||||
THE INTERNAL STRUCTURE FOR THE COLUMN METADATA IS VERY DIFFERENT FROM
|
||||
THE DENORMALIZED PERSPECITVE. THIS PROVIDES THAT PERSPECTIVE FOR QUERIES
|
||||
"""
|
||||
with self.locker:
|
||||
self._update_meta()
|
||||
output = [
|
||||
{
|
||||
"table": c.es_index,
|
||||
"name": untype_path(c.name),
|
||||
"cardinality": c.cardinality,
|
||||
"es_column": c.es_column,
|
||||
"es_index": c.es_index,
|
||||
"last_updated": c.last_updated,
|
||||
"count": c.count,
|
||||
"nested_path": [unnest_path(n) for n in c.nested_path],
|
||||
"es_type": c.es_type,
|
||||
"type": c.jx_type,
|
||||
}
|
||||
for tname, css in self.data.items()
|
||||
for cname, cs in css.items()
|
||||
for c in cs
|
||||
if c.jx_type not in STRUCT # and c.es_column != "_id"
|
||||
]
|
||||
|
||||
from jx_python.containers.list_usingPythonList import ListContainer
|
||||
|
||||
return ListContainer(
|
||||
self.name,
|
||||
data=output,
|
||||
schema=jx_base.Schema(META_INDEX_NAME, SIMPLE_METADATA_COLUMNS),
|
||||
)
|
||||
|
||||
|
||||
def get_schema_from_list(table_name, frum):
|
||||
"""
|
||||
SCAN THE LIST FOR COLUMN TYPES
|
||||
"""
|
||||
columns = UniqueIndex(keys=("name",))
|
||||
_get_schema_from_list(frum, ".", parent=".", nested_path=ROOT_PATH, columns=columns)
|
||||
return Schema(table_name=table_name, columns=list(columns))
|
||||
|
||||
|
||||
def _get_schema_from_list(frum, table_name, parent, nested_path, columns):
|
||||
"""
|
||||
:param frum: The list
|
||||
:param table_name: Name of the table this list holds records for
|
||||
:param parent: parent path
|
||||
:param nested_path: each nested array, in reverse order
|
||||
:param columns: map from full name to column definition
|
||||
:return:
|
||||
"""
|
||||
|
||||
for d in frum:
|
||||
row_type = python_type_to_json_type[d.__class__]
|
||||
|
||||
if row_type != "object":
|
||||
# EXPECTING PRIMITIVE VALUE
|
||||
full_name = parent
|
||||
column = columns[full_name]
|
||||
if not column:
|
||||
column = Column(
|
||||
name=concat_field(table_name, full_name),
|
||||
es_column=full_name,
|
||||
es_index=".",
|
||||
es_type=d.__class__.__name__,
|
||||
jx_type=None, # WILL BE SET BELOW
|
||||
last_updated=Date.now(),
|
||||
nested_path=nested_path,
|
||||
)
|
||||
columns.add(column)
|
||||
column.es_type = _merge_python_type(column.es_type, d.__class__)
|
||||
column.jx_type = python_type_to_json_type[column.es_type]
|
||||
else:
|
||||
for name, value in d.items():
|
||||
full_name = concat_field(parent, name)
|
||||
column = columns[full_name]
|
||||
if not column:
|
||||
column = Column(
|
||||
name=concat_field(table_name, full_name),
|
||||
es_column=full_name,
|
||||
es_index=".",
|
||||
es_type=value.__class__.__name__,
|
||||
jx_type=None, # WILL BE SET BELOW
|
||||
last_updated=Date.now(),
|
||||
nested_path=nested_path,
|
||||
)
|
||||
columns.add(column)
|
||||
if is_container(value): # GET TYPE OF MULTIVALUE
|
||||
v = list(value)
|
||||
if len(v) == 0:
|
||||
this_type = none_type.__name__
|
||||
elif len(v) == 1:
|
||||
this_type = v[0].__class__.__name__
|
||||
else:
|
||||
this_type = reduce(
|
||||
_merge_python_type, (vi.__class__.__name__ for vi in value)
|
||||
)
|
||||
else:
|
||||
this_type = value.__class__.__name__
|
||||
column.es_type = _merge_python_type(column.es_type, this_type)
|
||||
column.jx_type = python_type_to_json_type[column.es_type]
|
||||
|
||||
if this_type in {"object", "dict", "Mapping", "Data"}:
|
||||
_get_schema_from_list(
|
||||
[value], table_name, full_name, nested_path, columns
|
||||
)
|
||||
elif this_type in {"list", "FlatList"}:
|
||||
np = listwrap(nested_path)
|
||||
newpath = unwraplist([join_field(split_field(np[0]) + [name])] + np)
|
||||
_get_schema_from_list(
|
||||
value, table_name, full_name, newpath, columns
|
||||
)
|
||||
|
||||
|
||||
def get_id(column):
|
||||
"""
|
||||
:param column:
|
||||
:return: Elasticsearch id for column
|
||||
"""
|
||||
return column.es_index + "|" + column.es_column
|
||||
|
||||
|
||||
METADATA_COLUMNS = (
|
||||
[
|
||||
Column(
|
||||
name=c,
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column=c,
|
||||
es_type="keyword",
|
||||
jx_type=STRING,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in [
|
||||
"name",
|
||||
"es_type",
|
||||
"jx_type",
|
||||
"nested_path",
|
||||
"es_column",
|
||||
"es_index",
|
||||
"partitions",
|
||||
]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name=c,
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column=c,
|
||||
es_type="integer",
|
||||
jx_type=INTEGER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in ["count", "cardinality", "multi"]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name="last_updated",
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column="last_updated",
|
||||
es_type="double",
|
||||
jx_type=NUMBER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def doc_to_column(doc):
|
||||
return Column(**wrap(untyped(doc)))
|
||||
|
||||
|
||||
SIMPLE_METADATA_COLUMNS = ( # FOR PURELY INTERNAL PYTHON LISTS, NOT MAPPING TO ANOTHER DATASTORE
|
||||
[
|
||||
Column(
|
||||
name=c,
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column=c,
|
||||
es_type="string",
|
||||
jx_type=STRING,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in ["table", "name", "type", "nested_path"]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name=c,
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column=c,
|
||||
es_type="long",
|
||||
jx_type=INTEGER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
for c in ["count", "cardinality", "multi"]
|
||||
]
|
||||
+ [
|
||||
Column(
|
||||
name="last_updated",
|
||||
es_index=META_INDEX_NAME,
|
||||
es_column="last_updated",
|
||||
es_type="time",
|
||||
jx_type=NUMBER,
|
||||
last_updated=Date.now(),
|
||||
nested_path=ROOT_PATH,
|
||||
)
|
||||
]
|
||||
)
|
||||
|
||||
_merge_order = {
|
||||
none_type: 0,
|
||||
NullType: 1,
|
||||
bool: 2,
|
||||
int: 3,
|
||||
long: 3,
|
||||
Date: 4,
|
||||
float: 5,
|
||||
text_type: 6,
|
||||
binary_type: 6,
|
||||
object: 7,
|
||||
dict: 8,
|
||||
Mapping: 9,
|
||||
Data: 10,
|
||||
list: 11,
|
||||
FlatList: 12,
|
||||
}
|
||||
|
||||
for k, v in items(_merge_order):
|
||||
_merge_order[k.__name__] = v
|
||||
|
||||
|
||||
def _merge_python_type(A, B):
|
||||
a = _merge_order[A]
|
||||
b = _merge_order[B]
|
||||
|
||||
if a >= b:
|
||||
output = A
|
||||
else:
|
||||
output = B
|
||||
|
||||
if isinstance(output, str):
|
||||
return output
|
||||
else:
|
||||
return output.__name__
|
|
@ -17,9 +17,9 @@ from jx_base import Container
|
|||
from jx_base.expressions import TRUE, Variable
|
||||
from jx_base.language import is_expression, is_op
|
||||
from jx_base.schema import Schema
|
||||
from jx_elasticsearch.meta_columns import get_schema_from_list
|
||||
from jx_python.expressions import jx_expression_to_function
|
||||
from jx_python.lists.aggs import is_aggs, list_aggs
|
||||
from jx_python.meta import get_schema_from_list
|
||||
from mo_collections import UniqueIndex
|
||||
from mo_dots import Data, Null, is_data, is_list, listwrap, unwrap, unwraplist, wrap
|
||||
from mo_future import first, sort_using_key
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
import itertools
|
||||
|
||||
from jx_base.domains import DefaultDomain, SimpleSetDomain
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
#
|
||||
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
|
||||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
from __future__ import absolute_import, division
|
||||
|
||||
import re
|
||||
|
||||
|
@ -15,12 +15,20 @@ from mo_dots import Data, coalesce, is_data, listwrap, wrap_leaves
|
|||
from mo_logs import Log, strings
|
||||
from mo_times.dates import Date
|
||||
|
||||
true = True
|
||||
false = False
|
||||
null = None
|
||||
EMPTY_DICT = {}
|
||||
|
||||
_keep_imports = [coalesce, listwrap, Date, Log, Data, re, wrap_leaves, is_data]
|
||||
GLOBALS = {
|
||||
"true": True,
|
||||
"false": False,
|
||||
"null": None,
|
||||
"EMPTY_DICT": {},
|
||||
"coalesce": coalesce,
|
||||
"listwrap": listwrap,
|
||||
"Date": Date,
|
||||
"Log": Log,
|
||||
"Data": Data,
|
||||
"re": re,
|
||||
"wrap_leaves": wrap_leaves,
|
||||
"is_data": is_data
|
||||
}
|
||||
|
||||
|
||||
def compile_expression(source):
|
||||
|
@ -33,17 +41,17 @@ def compile_expression(source):
|
|||
fake_locals = {}
|
||||
try:
|
||||
exec(
|
||||
"""
|
||||
def output(row, rownum=None, rows=None):
|
||||
_source = """ + strings.quote(source) + """
|
||||
try:
|
||||
return """ + source + """
|
||||
except Exception as e:
|
||||
Log.error("Problem with dynamic function {{func|quote}}", func=_source, cause=e)
|
||||
""",
|
||||
globals(),
|
||||
fake_locals
|
||||
(
|
||||
"def output(row, rownum=None, rows=None):\n" +
|
||||
" _source = " + strings.quote(source) + "\n" +
|
||||
" try:\n" +
|
||||
" return " + source + "\n" +
|
||||
" except Exception as e:\n" +
|
||||
" Log.error(u'Problem with dynamic function {{func|quote}}', func=_source, cause=e)\n"
|
||||
),
|
||||
GLOBALS,
|
||||
fake_locals,
|
||||
)
|
||||
except Exception as e:
|
||||
Log.error("Bad source: {{source}}", source=source, cause=e)
|
||||
return fake_locals['output']
|
||||
Log.error(u"Bad source: {{source}}", source=source, cause=e)
|
||||
return fake_locals["output"]
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
import functools
|
||||
|
||||
from mo_dots import Data, FlatList, coalesce, is_data, is_list, split_field, wrap
|
||||
|
|
|
@ -640,7 +640,7 @@ def drill_filter(esfilter, data):
|
|||
"""
|
||||
PARTIAL EVALUATE THE FILTER BASED ON DATA GIVEN
|
||||
|
||||
TODO: FIX THIS MONUMENALLY BAD IDEA
|
||||
TODO: FIX THIS MONUMENTALLY BAD IDEA
|
||||
"""
|
||||
esfilter = unwrap(esfilter)
|
||||
primary_nested = [] # track if nested, changes if not
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
# encoding: utf-8
|
||||
#
|
||||
#
|
||||
# This Source Code Form is subject to the terms of the Mozilla Public
|
||||
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
# You can obtain one at http:# mozilla.org/MPL/2.0/.
|
||||
#
|
||||
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
|
||||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
import dataset
|
||||
from jx_python.containers.Table_usingDataset import Table_usingDataset
|
||||
|
||||
|
||||
class Dataset(object):
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.db = dataset.connect('sqlite:///:memory:')
|
||||
|
||||
|
||||
def get_or_create_table(self, name, uid):
|
||||
return Table_usingDataset(name, self.db, primary_id=uid)
|
||||
|
||||
|
||||
|
|
@ -1 +0,0 @@
|
|||
__author__ = 'kyle'
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
import itertools
|
||||
|
||||
from jx_base.domains import DefaultDomain, SimpleSetDomain
|
||||
|
@ -23,6 +22,7 @@ from mo_times.dates import Date
|
|||
|
||||
_ = Date
|
||||
|
||||
|
||||
def is_aggs(query):
|
||||
if query.edges or query.groupby or any(a != None and a != "none" for a in listwrap(query.select).aggregate):
|
||||
return True
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from mo_dots import listwrap
|
||||
|
||||
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
import jx_base
|
||||
from mo_dots import Data
|
||||
|
||||
|
|
|
@ -268,7 +268,7 @@ def _all_default(d, default, seen=None):
|
|||
_all_default(existing_value, default_value, seen)
|
||||
|
||||
|
||||
def _getdefault(obj, key):
|
||||
def _get_dict_default(obj, key):
|
||||
"""
|
||||
obj MUST BE A DICT
|
||||
key IS EXPECTED TO BE LITERAL (NO ESCAPING)
|
||||
|
@ -280,7 +280,28 @@ def _getdefault(obj, key):
|
|||
pass
|
||||
|
||||
try:
|
||||
return getattr(obj, key)
|
||||
if float(key) == round(float(key), 0):
|
||||
return obj[int(key)]
|
||||
except Exception as f:
|
||||
pass
|
||||
|
||||
return NullType(obj, key)
|
||||
|
||||
|
||||
def _getdefault(obj, key):
|
||||
"""
|
||||
obj ANY OBJECT
|
||||
key IS EXPECTED TO BE LITERAL (NO ESCAPING)
|
||||
TRY BOTH ATTRIBUTE AND ITEM ACCESS, OR RETURN Null
|
||||
"""
|
||||
try:
|
||||
return obj[key]
|
||||
except Exception as f:
|
||||
pass
|
||||
|
||||
try:
|
||||
if obj.__class__ is not dict:
|
||||
return getattr(obj, key)
|
||||
except Exception as f:
|
||||
pass
|
||||
|
||||
|
|
|
@ -147,7 +147,8 @@ class Data(MutableMapping):
|
|||
d[seq[-1]] = value
|
||||
return self
|
||||
except Exception as e:
|
||||
raise e
|
||||
from mo_logs import Log
|
||||
Log.error("can not set key={{key}}", key=key, cause=e)
|
||||
|
||||
def __getattr__(self, key):
|
||||
d = _get(self, SLOT)
|
||||
|
|
|
@ -13,6 +13,7 @@ from mo_dots import get_logger, is_data, wrap, zip as dict_zip
|
|||
from mo_future import get_function_arguments, get_function_defaults, get_function_name, text_type
|
||||
from mo_logs import Except
|
||||
|
||||
KWARGS = str("kwargs")
|
||||
|
||||
def override(func):
|
||||
"""
|
||||
|
@ -55,12 +56,12 @@ def override(func):
|
|||
)
|
||||
raise e
|
||||
|
||||
if "kwargs" not in params:
|
||||
if KWARGS not in params:
|
||||
# WE ASSUME WE ARE ONLY ADDING A kwargs PARAMETER TO SOME REGULAR METHOD
|
||||
def wo_kwargs(*args, **kwargs):
|
||||
settings = kwargs.get("kwargs")
|
||||
settings = kwargs.get(KWARGS)
|
||||
ordered_params = dict(zip(params, args))
|
||||
packed = params_pack(params, ordered_params, kwargs, settings, defaults)
|
||||
packed = params_pack(params, defaults, settings, kwargs, ordered_params)
|
||||
try:
|
||||
return func(**packed)
|
||||
except TypeError as e:
|
||||
|
@ -69,14 +70,14 @@ def override(func):
|
|||
|
||||
elif func_name in ("__init__", "__new__"):
|
||||
def w_constructor(*args, **kwargs):
|
||||
if "kwargs" in kwargs:
|
||||
packed = params_pack(params, dict_zip(params[1:], args[1:]), kwargs, kwargs["kwargs"], defaults)
|
||||
if KWARGS in kwargs:
|
||||
packed = params_pack(params, defaults, kwargs[KWARGS], kwargs, dict_zip(params[1:], args[1:]))
|
||||
elif len(args) == 2 and len(kwargs) == 0 and is_data(args[1]):
|
||||
# ASSUME SECOND UNNAMED PARAM IS kwargs
|
||||
packed = params_pack(params, args[1], defaults)
|
||||
packed = params_pack(params, defaults, args[1])
|
||||
else:
|
||||
# DO NOT INCLUDE self IN kwargs
|
||||
packed = params_pack(params, dict_zip(params[1:], args[1:]), kwargs, defaults)
|
||||
packed = params_pack(params, defaults, kwargs, dict_zip(params[1:], args[1:]))
|
||||
try:
|
||||
return func(args[0], **packed)
|
||||
except TypeError as e:
|
||||
|
@ -88,12 +89,12 @@ def override(func):
|
|||
def w_bound_method(*args, **kwargs):
|
||||
if len(args) == 2 and len(kwargs) == 0 and is_data(args[1]):
|
||||
# ASSUME SECOND UNNAMED PARAM IS kwargs
|
||||
packed = params_pack(params, args[1], defaults)
|
||||
elif "kwargs" in kwargs and is_data(kwargs["kwargs"]):
|
||||
packed = params_pack(params, defaults, args[1])
|
||||
elif KWARGS in kwargs and is_data(kwargs[KWARGS]):
|
||||
# PUT args INTO kwargs
|
||||
packed = params_pack(params, kwargs, dict_zip(params[1:], args[1:]), kwargs["kwargs"], defaults)
|
||||
packed = params_pack(params, defaults, kwargs[KWARGS], dict_zip(params[1:], args[1:]), kwargs)
|
||||
else:
|
||||
packed = params_pack(params, kwargs, dict_zip(params[1:], args[1:]), defaults)
|
||||
packed = params_pack(params, defaults, dict_zip(params[1:], args[1:]), kwargs)
|
||||
try:
|
||||
return func(args[0], **packed)
|
||||
except TypeError as e:
|
||||
|
@ -104,13 +105,13 @@ def override(func):
|
|||
def w_kwargs(*args, **kwargs):
|
||||
if len(args) == 1 and len(kwargs) == 0 and is_data(args[0]):
|
||||
# ASSUME SINGLE PARAMETER IS kwargs
|
||||
packed = params_pack(params, args[0], defaults)
|
||||
elif "kwargs" in kwargs and is_data(kwargs["kwargs"]):
|
||||
packed = params_pack(params, defaults, args[0])
|
||||
elif KWARGS in kwargs and is_data(kwargs[KWARGS]):
|
||||
# PUT args INTO kwargs
|
||||
packed = params_pack(params, kwargs, dict_zip(params, args), kwargs["kwargs"], defaults)
|
||||
packed = params_pack(params, defaults, kwargs[KWARGS], dict_zip(params, args), kwargs)
|
||||
else:
|
||||
# PULL kwargs OUT INTO PARAMS
|
||||
packed = params_pack(params, kwargs, dict_zip(params, args), defaults)
|
||||
packed = params_pack(params, defaults, dict_zip(params, args), kwargs)
|
||||
try:
|
||||
return func(**packed)
|
||||
except TypeError as e:
|
||||
|
@ -120,15 +121,13 @@ def override(func):
|
|||
|
||||
def params_pack(params, *args):
|
||||
settings = {}
|
||||
for a in reversed(args):
|
||||
if a == None:
|
||||
continue
|
||||
for a in args:
|
||||
for k, v in a.items():
|
||||
settings[str(k)] = None if v == None else v
|
||||
settings["kwargs"] = settings
|
||||
settings[str(k)] = v
|
||||
settings[KWARGS] = wrap(settings)
|
||||
|
||||
output = {
|
||||
str(k): settings[k] if k != "kwargs" else wrap(settings)
|
||||
k: settings[k]
|
||||
for k in params
|
||||
if k in settings
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ class StructuredLogger_usingElasticSearch(StructuredLogger):
|
|||
schema=schema,
|
||||
limit_replicas=True,
|
||||
typed=True,
|
||||
read_only=False,
|
||||
kwargs=kwargs,
|
||||
)
|
||||
self.batch_size = batch_size
|
||||
|
|
|
@ -12,9 +12,9 @@ from __future__ import absolute_import, division, unicode_literals
|
|||
from copy import deepcopy
|
||||
import re
|
||||
|
||||
from jx_base import Column
|
||||
from jx_python import jx
|
||||
from jx_python.meta import Column
|
||||
from mo_dots import Data, FlatList, Null, ROOT_PATH, SLOT, coalesce, concat_field, is_data, is_list, listwrap, literal_field, set_default, split_field, wrap
|
||||
from mo_dots import Data, FlatList, Null, ROOT_PATH, SLOT, coalesce, concat_field, is_data, is_list, listwrap, literal_field, set_default, split_field, wrap, unwrap
|
||||
from mo_files.url import URL
|
||||
from mo_future import binary_type, is_binary, is_text, items, text_type
|
||||
from mo_json import BOOLEAN, EXISTS, NESTED, NUMBER, OBJECT, STRING, json2value, value2json
|
||||
|
@ -98,7 +98,7 @@ class Index(Features):
|
|||
Log.error("not allowed")
|
||||
if type == None:
|
||||
# NO type PROVIDED, MAYBE THERE IS A SUITABLE DEFAULT?
|
||||
about = self.cluster.get_metadata().indices[self.settings.index]
|
||||
about = self.cluster.get_metadata().indices[literal_field(self.settings.index)]
|
||||
type = self.settings.type = _get_best_type_from_mapping(about.mappings)[0]
|
||||
if type == "_default_":
|
||||
Log.error("not allowed")
|
||||
|
@ -146,7 +146,7 @@ class Index(Features):
|
|||
def get_properties(self, retry=True):
|
||||
if self.settings.explore_metadata:
|
||||
metadata = self.cluster.get_metadata()
|
||||
index = metadata.indices[self.settings.index]
|
||||
index = metadata.indices[literal_field(self.settings.index)]
|
||||
|
||||
if index == None and retry:
|
||||
#TRY AGAIN, JUST IN CASE
|
||||
|
@ -204,7 +204,7 @@ class Index(Features):
|
|||
# WAIT FOR ALIAS TO APPEAR
|
||||
while True:
|
||||
metadata = self.cluster.get_metadata(force=True)
|
||||
if alias in metadata.indices[self.settings.index].aliases:
|
||||
if alias in metadata.indices[literal_field(self.settings.index)].aliases:
|
||||
return
|
||||
Log.note("Waiting for alias {{alias}} to appear", alias=alias)
|
||||
Till(seconds=1).wait()
|
||||
|
@ -294,6 +294,18 @@ class Index(Features):
|
|||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
def delete_id(self, id):
|
||||
result = self.cluster.delete(
|
||||
path = self.path + "/" + id,
|
||||
timeout=600,
|
||||
# params={"wait_for_active_shards": wait_for_active_shards}
|
||||
)
|
||||
if result.failures:
|
||||
Log.error("Failure to delete fom {{index}}:\n{{data|pretty}}", index=self.settings.index, data=result)
|
||||
|
||||
|
||||
|
||||
|
||||
def extend(self, records):
|
||||
"""
|
||||
records - MUST HAVE FORM OF
|
||||
|
@ -597,7 +609,7 @@ class Cluster(object):
|
|||
|
||||
index = kwargs.index
|
||||
meta = self.get_metadata()
|
||||
type, about = _get_best_type_from_mapping(meta.indices[index].mappings)
|
||||
type, about = _get_best_type_from_mapping(meta.indices[literal_field(index)].mappings)
|
||||
|
||||
if typed == None:
|
||||
typed = True
|
||||
|
@ -802,7 +814,7 @@ class Cluster(object):
|
|||
while not Till(seconds=30):
|
||||
try:
|
||||
metadata = self.get_metadata(force=True)
|
||||
if index in metadata.indices:
|
||||
if index in metadata.indices.keys():
|
||||
break
|
||||
Log.note("Waiting for index {{index}} to appear", index=index)
|
||||
except Exception as e:
|
||||
|
@ -864,7 +876,7 @@ class Cluster(object):
|
|||
with self.metadata_locker:
|
||||
self._metadata = wrap(response.metadata)
|
||||
for new_index_name, new_meta in self._metadata.indices.items():
|
||||
old_index = old_indices[new_index_name]
|
||||
old_index = old_indices[literal_field(new_index_name)]
|
||||
if not old_index:
|
||||
DEBUG_METADATA_UPDATE and Log.note("New index found {{index}} at {{time}}", index=new_index_name, time=now)
|
||||
self.index_last_updated[new_index_name] = now
|
||||
|
@ -876,7 +888,7 @@ class Cluster(object):
|
|||
DEBUG_METADATA_UPDATE and Log.note("More columns found in {{index}} at {{time}}", index=new_index_name, time=now)
|
||||
self.index_last_updated[new_index_name] = now
|
||||
for old_index_name, old_meta in old_indices.items():
|
||||
new_index = self._metadata.indices[old_index_name]
|
||||
new_index = self._metadata.indices[literal_field(old_index_name)]
|
||||
if not new_index:
|
||||
DEBUG_METADATA_UPDATE and Log.note("Old index lost: {{index}} at {{time}}", index=old_index_name, time=now)
|
||||
self.index_last_updated[old_index_name] = now
|
||||
|
@ -1018,6 +1030,9 @@ class Cluster(object):
|
|||
response = http.put(url, **kwargs)
|
||||
if response.status_code not in [200]:
|
||||
Log.error(response.reason + ": " + utf82unicode(response.content))
|
||||
if not response.content:
|
||||
return Null
|
||||
|
||||
self.debug and Log.note("response: {{response}}", response=utf82unicode(response.content)[0:300:])
|
||||
|
||||
details = json2value(utf82unicode(response.content))
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from jx_python import jx
|
||||
from mo_dots import Data, ROOT_PATH, is_data, unwrap
|
||||
from mo_json import NESTED, OBJECT, json2value
|
||||
|
|
Загрузка…
Ссылка в новой задаче