lib updates
This commit is contained in:
Родитель
bcc57f4ff3
Коммит
8dd538d3df
|
@ -77,8 +77,8 @@ class SelectOp(Expression):
|
|||
def __data__(self):
|
||||
return {
|
||||
"select": [
|
||||
{"name": t.name.__data__(), "value": t.value.__data__()}
|
||||
for t in self.terms
|
||||
{"name": t.name, "value": t.value.__data__()}
|
||||
for t in wrap(self.terms)
|
||||
]
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ from mo_json.typed_encoder import untype_path
|
|||
from mo_logs import Log
|
||||
from mo_math import AND, UNION, is_number
|
||||
|
||||
BAD_SELECT = "Expecting `value` or `aggregate` in select clause not {{select}}"
|
||||
DEFAULT_LIMIT = 10
|
||||
MAX_LIMIT = 10000
|
||||
DEFAULT_SELECT = Data(name="count", value=jx_expression("."), aggregate="count", default=0)
|
||||
|
@ -35,6 +36,7 @@ _jx = None
|
|||
_Column = None
|
||||
|
||||
|
||||
|
||||
def _late_import():
|
||||
global _jx
|
||||
global _Column
|
||||
|
@ -339,7 +341,10 @@ def _normalize_select(select, frum, schema=None):
|
|||
return frum._normalize_select(canonical)
|
||||
|
||||
output = []
|
||||
if not select.value or select.value == ".":
|
||||
|
||||
if len(select) and not select.value:
|
||||
Log.error(BAD_SELECT, select=select)
|
||||
elif not select.value or select.value == ".":
|
||||
output.extend([
|
||||
set_default(
|
||||
{
|
||||
|
@ -397,6 +402,8 @@ def _normalize_select_no_context(select, schema=None):
|
|||
output.name = coalesce(select.name, select.aggregate)
|
||||
if output.name:
|
||||
output.value = jx_expression(".", schema=schema)
|
||||
elif len(select):
|
||||
Log.error(BAD_SELECT, select=select)
|
||||
else:
|
||||
return Null
|
||||
elif is_text(select.value):
|
||||
|
|
|
@ -30,7 +30,7 @@ from mo_logs.exceptions import Except, suppress_exception
|
|||
from mo_math import is_integer, is_number
|
||||
from mo_math.randoms import Random
|
||||
from mo_threads import Lock, ThreadedQueue, Till, THREAD_STOP, Thread, MAIN_THREAD
|
||||
from mo_times import Date, Timer, HOUR, dates
|
||||
from mo_times import Date, Timer, HOUR, dates, Duration
|
||||
from mo_http import http
|
||||
|
||||
DEBUG = True
|
||||
|
@ -386,7 +386,7 @@ class Index(object):
|
|||
if seconds <= 0:
|
||||
interval = -1
|
||||
else:
|
||||
interval = text(seconds) + "s"
|
||||
interval = text(int(seconds)) + "s"
|
||||
|
||||
if self.cluster.version.startswith(("1.4.", "1.5.", "1.6.", "1.7.", "5.", "6.")):
|
||||
result = self.cluster.put(
|
||||
|
@ -563,11 +563,11 @@ class Cluster(object):
|
|||
|
||||
def set_refresh(please_stop):
|
||||
try:
|
||||
known_index.set_refresh_interval(seconds=int(dates.parse(kwargs.refresh_interval).seconds))
|
||||
known_index.set_refresh_interval(seconds=Duration(kwargs.refresh_interval).seconds)
|
||||
except Exception as e:
|
||||
Log.warning("could not set refresh interval for {{index}}", index=known_index.settings.index, cause=e)
|
||||
if kwargs.refresh_interval:
|
||||
Thread.run("setting refresh interval", set_refresh, parent_thread=MAIN_THREAD)
|
||||
Thread.run("setting refresh interval", set_refresh, parent_thread=MAIN_THREAD).release()
|
||||
else:
|
||||
pass
|
||||
return known_index
|
||||
|
|
|
@ -98,6 +98,8 @@ def es_bulkaggsop(esq, frum, query):
|
|||
|
||||
if num_partitions > MAX_PARTITIONS:
|
||||
Log.error("Requesting more than {{num}} partitions", num=num_partitions)
|
||||
if num_partitions == 0:
|
||||
num_partitions = 1
|
||||
|
||||
acc, decoders, es_query = build_es_query(selects, query_path, schema, query)
|
||||
guid = Random.base64(32, extra="-_")
|
||||
|
@ -119,7 +121,7 @@ def es_bulkaggsop(esq, frum, query):
|
|||
abs_limit,
|
||||
formatter,
|
||||
parent_thread=Null,
|
||||
)
|
||||
).release()
|
||||
|
||||
output = wrap(
|
||||
{
|
||||
|
|
|
@ -20,7 +20,7 @@ from mo_dots import Data, FlatList, coalesce, concat_field, is_list as is_list_,
|
|||
relative_field, set_default, split_field, startswith_field, unwrap, wrap
|
||||
from mo_future import zip_longest
|
||||
from mo_json import NESTED
|
||||
from mo_json.typed_encoder import untype_path
|
||||
from mo_json.typed_encoder import untype_path, untyped
|
||||
from mo_logs import Log
|
||||
from mo_threads import Thread
|
||||
from mo_times.timer import Timer
|
||||
|
@ -33,7 +33,8 @@ def is_deepop(es, query):
|
|||
return False
|
||||
if all(s.aggregate not in (None, "none") for s in listwrap(query.select)):
|
||||
return False
|
||||
if len(split_field(query.frum.name)) > 1:
|
||||
# THE schema.name SHOWS THE REAL NESTED DEPTH
|
||||
if len(split_field(query.frum.schema.name)) > 1:
|
||||
return True
|
||||
|
||||
# ASSUME IT IS NESTED IF WE ARE ASKING FOR NESTED COLUMNS
|
||||
|
@ -128,23 +129,24 @@ def es_deepop(es, query):
|
|||
})
|
||||
else:
|
||||
for n in net_columns:
|
||||
pull = get_pull_function(n)
|
||||
if n.nested_path[0] == ".":
|
||||
if n.jx_type == NESTED:
|
||||
continue
|
||||
es_query.stored_fields += [n.es_column]
|
||||
|
||||
# WE MUST FIGURE OUT WHICH NAMESSPACE s.value.var IS USING SO WE CAN EXTRACT THE child
|
||||
if len(n.nested_path[0]) > len(query_path):
|
||||
# SELECTING INNER PROPERTIES IS NOT ALLOWED
|
||||
continue
|
||||
# WE MUST FIGURE OUT WHICH NAMESPACE s.value.var IS USING SO WE CAN EXTRACT THE child
|
||||
for np in n.nested_path:
|
||||
c_name = untype_path(relative_field(n.name, np))
|
||||
if startswith_field(c_name, select.value.var):
|
||||
child = relative_field(c_name, select.value.var)
|
||||
break
|
||||
else:
|
||||
continue
|
||||
# REMOVED BECAUSE SELECTING INNER PROPERTIES IS NOT ALLOWED
|
||||
# child = relative_field(untype_path(relative_field(n.name, n.nested_path[0])), s.value.var)
|
||||
raise Log.error("Not expected")
|
||||
|
||||
pull = get_pull_function(n)
|
||||
new_select.append({
|
||||
"name": select.name,
|
||||
"pull": pull,
|
||||
|
|
|
@ -67,7 +67,7 @@ def es_bulksetop(esq, frum, query):
|
|||
es_query,
|
||||
formatter,
|
||||
parent_thread=Null,
|
||||
)
|
||||
).release()
|
||||
|
||||
output = wrap(
|
||||
{
|
||||
|
@ -102,6 +102,8 @@ def extractor(guid, abs_limit, esq, es_query, formatter, please_stop):
|
|||
hits = result.hits.hits
|
||||
chunk_limit = abs_limit - total
|
||||
hits = hits[:chunk_limit]
|
||||
if len(hits) == 0:
|
||||
break
|
||||
formatter.add(hits)
|
||||
for b in formatter.bytes():
|
||||
if b is DONE:
|
||||
|
@ -128,6 +130,8 @@ def extractor(guid, abs_limit, esq, es_query, formatter, please_stop):
|
|||
result = esq.es.scroll(scroll_id)
|
||||
continue
|
||||
break
|
||||
if please_stop:
|
||||
Log.error("Bulk download stopped for shutdown")
|
||||
for b in formatter.footer():
|
||||
output.write(b)
|
||||
|
||||
|
|
|
@ -41,7 +41,7 @@ from mo_dots import (
|
|||
wrap,
|
||||
)
|
||||
from mo_future import first, text
|
||||
from mo_json import NESTED
|
||||
from mo_json import NESTED, STRUCT
|
||||
from mo_json.typed_encoder import decode_property, unnest_path, untype_path, untyped
|
||||
from mo_logs import Log
|
||||
from mo_math import AND
|
||||
|
@ -364,8 +364,11 @@ def get_pull(column):
|
|||
|
||||
|
||||
def get_pull_function(column):
|
||||
return jx_expression_to_function(get_pull(column))
|
||||
|
||||
func = jx_expression_to_function(get_pull(column))
|
||||
if column.jx_type in STRUCT:
|
||||
return lambda doc: untyped(func(doc))
|
||||
else:
|
||||
return func
|
||||
|
||||
def get_pull_source(es_column):
|
||||
def output(row):
|
||||
|
|
|
@ -54,7 +54,7 @@ class ColumnList(Table, jx_base.Container):
|
|||
self._db_load()
|
||||
Thread.run(
|
||||
"update " + META_COLUMNS_NAME, self._update_from_es, parent_thread=MAIN_THREAD
|
||||
)
|
||||
).release()
|
||||
|
||||
def _query(self, query):
|
||||
result = Data()
|
||||
|
|
|
@ -13,6 +13,7 @@ import re
|
|||
from jx_elasticsearch import elasticsearch
|
||||
from jx_python import jx
|
||||
from mo_dots import Null, coalesce, wrap
|
||||
from mo_dots.lists import last
|
||||
from mo_future import items, sort_using_key
|
||||
from mo_json import CAN_NOT_DECODE_JSON, json2value, value2json
|
||||
from mo_kwargs import override
|
||||
|
@ -81,7 +82,7 @@ class RolloverIndex(object):
|
|||
with self.locker:
|
||||
queue = self.known_queues.get(rounded_timestamp.unix)
|
||||
if queue == None:
|
||||
candidates = sort_using_key(
|
||||
candidates = wrap(sort_using_key(
|
||||
filter(
|
||||
lambda r: re.match(
|
||||
re.escape(self.settings.index) + r"\d\d\d\d\d\d\d\d_\d\d\d\d\d\d$",
|
||||
|
@ -90,15 +91,14 @@ class RolloverIndex(object):
|
|||
self.cluster.get_aliases()
|
||||
),
|
||||
key=lambda r: r['index']
|
||||
)
|
||||
))
|
||||
best = None
|
||||
for c in candidates:
|
||||
c = wrap(c)
|
||||
c.date = unicode2Date(c.index[-15:], elasticsearch.INDEX_DATE_FORMAT)
|
||||
if timestamp > c.date:
|
||||
best = c
|
||||
if not best or rounded_timestamp > best.date:
|
||||
if rounded_timestamp < wrap(candidates[-1]).date:
|
||||
if rounded_timestamp < wrap(last(candidates)).date:
|
||||
es = self.cluster.get_or_create_index(read_only=False, alias=best.alias, index=best.index, kwargs=self.settings)
|
||||
else:
|
||||
try:
|
||||
|
@ -114,11 +114,11 @@ class RolloverIndex(object):
|
|||
|
||||
def refresh(please_stop):
|
||||
try:
|
||||
es.set_refresh_interval(seconds=60 * 10, timeout=5)
|
||||
es.set_refresh_interval(seconds=coalesce(Duration(self.settings.refresh_interval).seconds, 60 * 10), timeout=5)
|
||||
except Exception:
|
||||
Log.note("Could not set refresh interval for {{index}}", index=es.settings.index)
|
||||
|
||||
Thread.run("refresh", refresh)
|
||||
Thread.run("refresh", refresh).release()
|
||||
|
||||
self._delete_old_indexes(candidates)
|
||||
threaded_queue = es.threaded_queue(max_size=self.settings.queue_size, batch_size=self.settings.batch_size, silent=True)
|
||||
|
|
|
@ -9,8 +9,8 @@
|
|||
#
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from copy import copy
|
||||
import itertools
|
||||
from copy import copy
|
||||
|
||||
import jx_base
|
||||
from jx_base import Container
|
||||
|
@ -18,15 +18,14 @@ from jx_base.expressions import TRUE, Variable
|
|||
from jx_base.language import is_expression, is_op
|
||||
from jx_base.meta_columns import get_schema_from_list
|
||||
from jx_base.schema import Schema
|
||||
from jx_python.convert import list2cube, list2table
|
||||
from jx_python.expressions import jx_expression_to_function
|
||||
from jx_python.lists.aggs import is_aggs, list_aggs
|
||||
from mo_collections import UniqueIndex
|
||||
from mo_dots import Data, Null, is_data, is_list, listwrap, unwrap, unwraplist, wrap, coalesce, relative_field, \
|
||||
split_field
|
||||
from mo_dots import Data, Null, is_data, is_list, listwrap, unwrap, unwraplist, wrap, coalesce
|
||||
from mo_future import first, sort_using_key
|
||||
from mo_logs import Log
|
||||
from mo_threads import Lock
|
||||
from pyLibrary import convert
|
||||
|
||||
|
||||
class ListContainer(Container, jx_base.Namespace, jx_base.Table):
|
||||
|
@ -208,9 +207,9 @@ class ListContainer(Container, jx_base.Namespace, jx_base.Table):
|
|||
|
||||
def format(self, format):
|
||||
if format == "table":
|
||||
frum = convert.list2table(self.data, self._schema.lookup.keys())
|
||||
frum = list2table(self.data, self._schema.lookup.keys())
|
||||
elif format == "cube":
|
||||
frum = convert.list2cube(self.data, self.schema.lookup.keys())
|
||||
frum = list2cube(self.data, self.schema.lookup.keys())
|
||||
else:
|
||||
frum = self.__data__()
|
||||
|
||||
|
|
|
@ -34,18 +34,41 @@ def jx_expression_to_function(expr):
|
|||
return Null
|
||||
|
||||
if is_expression(expr):
|
||||
# ALREADY AN EXPRESSION OBJECT
|
||||
if is_op(expr, ScriptOp) and not is_text(expr.script):
|
||||
return expr.script
|
||||
else:
|
||||
return compile_expression(Python[expr].to_python())
|
||||
func = compile_expression(Python[expr].to_python())
|
||||
return JXExpression(func, expr.__data__())
|
||||
if (
|
||||
expr != None
|
||||
and not is_data(expr)
|
||||
not is_data(expr)
|
||||
and not is_list(expr)
|
||||
and hasattr(expr, "__call__")
|
||||
):
|
||||
# THIS APPEARS TO BE A FUNCTION ALREADY
|
||||
return expr
|
||||
return compile_expression(Python[jx_expression(expr)].to_python())
|
||||
|
||||
expr = jx_expression(expr)
|
||||
func = compile_expression(Python[expr].to_python())
|
||||
return JXExpression(func, expr)
|
||||
|
||||
|
||||
class JXExpression(object):
|
||||
def __init__(self, func, expr):
|
||||
self.func = func
|
||||
self.expr = expr
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return self.func(*args)
|
||||
|
||||
def __str__(self):
|
||||
return str(self.expr.__data__())
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.expr.__data__())
|
||||
|
||||
def __data__(self):
|
||||
return self.expr.__data__()
|
||||
|
||||
|
||||
@extend(NullOp)
|
||||
|
|
|
@ -17,6 +17,7 @@ from jx_base.query import QueryOp, _normalize_selects
|
|||
from jx_base.language import is_op, value_compare
|
||||
from jx_python import expressions as _expressions, flat_list, group_by
|
||||
from jx_python.containers.cube import Cube
|
||||
from jx_python.convert import list2table, list2cube
|
||||
from jx_python.cubes.aggs import cube_aggs
|
||||
from jx_python.expression_compiler import compile_expression
|
||||
from jx_python.expressions import jx_expression_to_function as get
|
||||
|
@ -30,7 +31,6 @@ from mo_future import is_text, sort_using_cmp
|
|||
from mo_logs import Log
|
||||
import mo_math
|
||||
from mo_math import MIN, UNION
|
||||
from pyLibrary import convert
|
||||
|
||||
# A COLLECTION OF DATABASE OPERATORS (RELATIONAL ALGEBRA OPERATORS)
|
||||
# JSON QUERY EXPRESSION DOCUMENTATION: https://github.com/klahnakoski/jx/tree/master/docs
|
||||
|
@ -98,9 +98,9 @@ def run(query, container=Null):
|
|||
|
||||
# AT THIS POINT frum IS IN LIST FORMAT, NOW PACKAGE RESULT
|
||||
if query_op.format == "cube":
|
||||
container = convert.list2cube(container)
|
||||
container = list2cube(container)
|
||||
elif query_op.format == "table":
|
||||
container = convert.list2table(container)
|
||||
container = list2table(container)
|
||||
container.meta.format = "table"
|
||||
else:
|
||||
container = wrap({"meta": {"format": "list"}, "data": container})
|
||||
|
|
|
@ -164,7 +164,9 @@ class FlatList(list):
|
|||
if not Log:
|
||||
_late_import()
|
||||
Log.warning(
|
||||
"slicing is broken in Python 2.7: a[i:j] == a[i+len(a), j] sometimes. Use [start:stop:step] (see https://github.com/klahnakoski/pyLibrary/blob/master/pyLibrary/dot/README.md#the-slice-operator-in-python27-is-inconsistent)"
|
||||
"slicing is broken in Python 2.7: a[i:j] == a[i+len(a), j] sometimes. Use [start:stop:step] (see "
|
||||
"https://github.com/klahnakoski/mo-dots/tree/dev/docs#the-slice-operator-in-python27-is-inconsistent"
|
||||
")"
|
||||
)
|
||||
return self[i:j:]
|
||||
|
||||
|
|
|
@ -452,7 +452,7 @@ class TempDirectory(File):
|
|||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
from mo_threads import Thread
|
||||
|
||||
Thread.run("delete dir " + self.name, delete_daemon, file=self, caller_stack=get_stacktrace(1))
|
||||
Thread.run("delete dir " + self.name, delete_daemon, file=self, caller_stack=get_stacktrace(1)).release()
|
||||
|
||||
|
||||
class TempFile(File):
|
||||
|
@ -476,8 +476,7 @@ class TempFile(File):
|
|||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
from mo_threads import Thread
|
||||
|
||||
Thread.run("delete file " + self.name, delete_daemon, file=self, caller_stack=get_stacktrace(1))
|
||||
|
||||
Thread.run("delete file " + self.name, delete_daemon, file=self, caller_stack=get_stacktrace(1)).release()
|
||||
|
||||
def _copy(from_, to_):
|
||||
if from_.is_directory():
|
||||
|
|
|
@ -152,7 +152,7 @@ class Log(object):
|
|||
from mo_logs.log_usingNothing import StructuredLogger
|
||||
return StructuredLogger()
|
||||
|
||||
Log.error("Log type of {{log_type|quote}} is not recognized", log_type=settings.log_type)
|
||||
Log.error("Log type of {{config|json}} is not recognized", config=settings)
|
||||
|
||||
@classmethod
|
||||
def _add_log(cls, log):
|
||||
|
|
|
@ -10,20 +10,31 @@
|
|||
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
from mo_future import is_text, is_binary
|
||||
from datetime import date, datetime
|
||||
import json as _json
|
||||
from datetime import date, datetime
|
||||
|
||||
from mo_future import PY3
|
||||
|
||||
if PY3:
|
||||
from datetime import timezone
|
||||
def utcfromtimestamp(u):
|
||||
d = datetime.utcfromtimestamp(u)
|
||||
d = d.replace(tzinfo=timezone.utc)
|
||||
return d
|
||||
MAX_TIME = datetime(2286, 11, 20, 17, 46, 39, 0, timezone.utc)
|
||||
else:
|
||||
def utcfromtimestamp(u):
|
||||
return datetime.utcfromtimestamp(u)
|
||||
MAX_TIME = datetime(2286, 11, 20, 17, 46, 39)
|
||||
|
||||
|
||||
def unix2datetime(u):
|
||||
try:
|
||||
if u == None:
|
||||
return None
|
||||
if u == 9999999999: # PYPY BUG https://bugs.pypy.org/issue1697
|
||||
return datetime(2286, 11, 20, 17, 46, 39)
|
||||
return datetime.utcfromtimestamp(u)
|
||||
return MAX_TIME
|
||||
return utcfromtimestamp(u)
|
||||
except Exception as e:
|
||||
from mo_logs import Log
|
||||
Log.error("Can not convert {{value}} to datetime", value= u, cause=e)
|
||||
|
@ -37,8 +48,7 @@ def milli2datetime(u):
|
|||
|
||||
def datetime2string(value, format="%Y-%m-%d %H:%M:%S"):
|
||||
try:
|
||||
utc_time = datetime.utcfromtimestamp(value.timestamp())
|
||||
return utc_time.strftime(format)
|
||||
return value.strftime(format)
|
||||
except Exception as e:
|
||||
from mo_logs import Log
|
||||
Log.error("Can not format {{value}} with {{format}}", value=value, format=format, cause=e)
|
||||
|
|
|
@ -43,6 +43,7 @@ class StructuredLogger_usingElasticSearch(StructuredLogger):
|
|||
type="log",
|
||||
queue_size=1000,
|
||||
batch_size=100,
|
||||
refresh_interval="1second",
|
||||
kwargs=None,
|
||||
):
|
||||
"""
|
||||
|
|
|
@ -164,6 +164,8 @@ def assertAlmostEqualValue(test, expected, digits=None, places=None, msg=None, d
|
|||
if test == expected:
|
||||
# shortcut
|
||||
return
|
||||
if isinstance(expected, dates.Date):
|
||||
return assertAlmostEqualValue(dates.Date(test).unix, expected.unix)
|
||||
|
||||
if not is_number(expected):
|
||||
# SOME SPECIAL CASES, EXPECTING EMPTY CONTAINERS IS THE SAME AS EXPECTING NULL
|
||||
|
@ -220,4 +222,4 @@ def assertAlmostEqualValue(test, expected, digits=None, places=None, msg=None, d
|
|||
|
||||
|
||||
def is_null(v):
|
||||
return v.__class__.__name__ == "NullOp"
|
||||
return v.__class__.__name__ == "NullOp"
|
||||
|
|
|
@ -73,8 +73,9 @@ class Lock(object):
|
|||
def __exit__(self, a, b, c):
|
||||
if self.waiting:
|
||||
self.debug and _Log.note("signaling {{num}} waiters on {{name|quote}}", name=self.name, num=len(self.waiting))
|
||||
waiter = self.waiting.pop()
|
||||
waiter.go()
|
||||
# TELL ANOTHER THAT THE LOCK IS READY SOON
|
||||
other = self.waiting.pop()
|
||||
other.go()
|
||||
self.lock.release()
|
||||
self.debug and _Log.note("released lock {{name|quote}}", name=self.name)
|
||||
|
||||
|
@ -86,6 +87,9 @@ class Lock(object):
|
|||
"""
|
||||
waiter = Signal()
|
||||
if self.waiting:
|
||||
# TELL ANOTHER THAT THE LOCK IS READY SOON
|
||||
other = self.waiting.pop()
|
||||
other.go()
|
||||
self.debug and _Log.note("waiting with {{num}} others on {{name|quote}}", num=len(self.waiting), name=self.name, stack_depth=1)
|
||||
self.waiting.insert(0, waiter)
|
||||
else:
|
||||
|
|
|
@ -23,7 +23,7 @@ from mo_threads.threads import THREAD_STOP, Thread
|
|||
from mo_threads.till import Till
|
||||
from mo_times import Timer
|
||||
|
||||
DEBUG = False
|
||||
DEBUG = True
|
||||
|
||||
|
||||
class Process(object):
|
||||
|
@ -45,13 +45,14 @@ class Process(object):
|
|||
:param shell: true to run as command line
|
||||
:param bufsize: if you want to screw stuff up
|
||||
"""
|
||||
self.debug = debug or DEBUG
|
||||
self.process_id = Process.next_process_id
|
||||
Process.next_process_id += 1
|
||||
self.name = name + " (" + text(self.process_id) + ")"
|
||||
self.service_stopped = Signal("stopped signal for " + strings.quote(name))
|
||||
self.stdin = Queue("stdin for process " + strings.quote(name), silent=True)
|
||||
self.stdout = Queue("stdout for process " + strings.quote(name), silent=True)
|
||||
self.stderr = Queue("stderr for process " + strings.quote(name), silent=True)
|
||||
self.stdin = Queue("stdin for process " + strings.quote(name), silent=not self.debug)
|
||||
self.stdout = Queue("stdout for process " + strings.quote(name), silent=not self.debug)
|
||||
self.stderr = Queue("stderr for process " + strings.quote(name), silent=not self.debug)
|
||||
|
||||
try:
|
||||
if cwd == None:
|
||||
|
@ -59,7 +60,8 @@ class Process(object):
|
|||
else:
|
||||
cwd = str(cwd)
|
||||
|
||||
self.debug = debug or DEBUG
|
||||
command = [str(p) for p in params]
|
||||
self.debug and Log.note("command: {{command}}", command=command)
|
||||
self.service = service = subprocess.Popen(
|
||||
[str(p) for p in params],
|
||||
stdin=subprocess.PIPE,
|
||||
|
|
|
@ -11,8 +11,8 @@ from __future__ import absolute_import, division, unicode_literals
|
|||
|
||||
import cProfile
|
||||
import pstats
|
||||
from datetime import datetime
|
||||
|
||||
from mo_dots import wrap
|
||||
from mo_future import iteritems
|
||||
from mo_logs import Log
|
||||
|
||||
|
@ -82,7 +82,6 @@ def write_profiles(main_thread_profile):
|
|||
if cprofiler_stats is None:
|
||||
return
|
||||
|
||||
from pyLibrary import convert
|
||||
from mo_files import File
|
||||
|
||||
cprofiler_stats.add(pstats.Stats(main_thread_profile.cprofiler))
|
||||
|
@ -106,6 +105,23 @@ def write_profiles(main_thread_profile):
|
|||
}
|
||||
for f, d, in iteritems(acc.stats)
|
||||
]
|
||||
stats_file = File(FILENAME, suffix=convert.datetime2string(datetime.now(), "_%Y%m%d_%H%M%S"))
|
||||
stats_file.write(convert.list2tab(stats))
|
||||
from mo_times import Date
|
||||
|
||||
stats_file = File(FILENAME, suffix=Date.now().format("_%Y%m%d_%H%M%S"))
|
||||
stats_file.write(list2tab(stats))
|
||||
Log.note("profile written to {{filename}}", filename=stats_file.abspath)
|
||||
|
||||
|
||||
def list2tab(rows):
|
||||
from mo_json import value2json
|
||||
|
||||
columns = set()
|
||||
for r in wrap(rows):
|
||||
columns |= set(k for k, v in r.leaves())
|
||||
keys = list(columns)
|
||||
|
||||
output = []
|
||||
for r in wrap(rows):
|
||||
output.append("\t".join(value2json(r[k]) for k in keys))
|
||||
|
||||
return "\t".join(keys) + "\n" + "\n".join(output)
|
|
@ -38,7 +38,7 @@ class Python(object):
|
|||
cwd=os.getcwd(),
|
||||
shell=shell
|
||||
)
|
||||
self.process.stdin.add(value2json({"debug": {"trace": True}} | config))
|
||||
self.process.stdin.add(value2json(set_default({}, config, {"debug": {"trace": True}})))
|
||||
status = self.process.stdout.pop()
|
||||
if status != '{"out":"ok"}':
|
||||
Log.error("could not start python\n{{error|indent}}", error=self.process.stderr.pop_all()+[status]+self.process.stdin.pop_all())
|
||||
|
|
|
@ -15,6 +15,8 @@ from mo_dots import listwrap, coalesce
|
|||
from mo_future import is_text, text
|
||||
from mo_json import json2value, value2json
|
||||
from mo_logs import Log, constants, Except
|
||||
from mo_logs.log_usingNothing import StructuredLogger
|
||||
|
||||
from mo_threads import Signal
|
||||
from mo_threads.threads import STDOUT, STDIN
|
||||
|
||||
|
@ -114,11 +116,18 @@ def temp_var():
|
|||
num_temps += 1
|
||||
|
||||
|
||||
class RawLogger(StructuredLogger):
|
||||
def write(self, template, params):
|
||||
STDOUT.write(value2json({"log": {"template": template, "params": params}}))
|
||||
|
||||
|
||||
def start():
|
||||
try:
|
||||
config = json2value(STDIN.readline().decode("utf8"))
|
||||
line = STDIN.readline().decode("utf8")
|
||||
config = json2value(line)
|
||||
constants.set(config.constants)
|
||||
Log.start(config.debug + {"logs": [{"type": "raw"}]})
|
||||
Log.start(config.debug)
|
||||
Log.set_logger(RawLogger())
|
||||
command_loop({"config": config})
|
||||
except Exception as e:
|
||||
Log.error("problem staring worker", cause=e)
|
||||
|
|
|
@ -419,10 +419,9 @@ class ThreadedQueue(Queue):
|
|||
):
|
||||
if period !=None and not isinstance(period, (int, float, long)):
|
||||
Log.error("Expecting a float for the period")
|
||||
|
||||
period = coalesce(period, 1) # SECONDS
|
||||
batch_size = coalesce(batch_size, int(max_size / 2) if max_size else None, 900)
|
||||
max_size = coalesce(max_size, batch_size * 2) # REASONABLE DEFAULT
|
||||
period = coalesce(period, 1) # SECONDS
|
||||
|
||||
Queue.__init__(self, name=name, max=max_size, silent=silent)
|
||||
|
||||
|
|
|
@ -95,9 +95,9 @@ class AllThread(object):
|
|||
class BaseThread(object):
|
||||
__slots__ = ["id", "name", "children", "child_locker", "cprofiler", "trace_func"]
|
||||
|
||||
def __init__(self, ident):
|
||||
def __init__(self, ident, name=None):
|
||||
self.id = ident
|
||||
self.name = None
|
||||
self.name = name
|
||||
if ident != -1:
|
||||
self.name = "Unknown Thread " + text(ident)
|
||||
self.child_locker = allocate_lock()
|
||||
|
@ -242,11 +242,9 @@ class Thread(BaseThread):
|
|||
num_threads = 0
|
||||
|
||||
def __init__(self, name, target, *args, **kwargs):
|
||||
BaseThread.__init__(self, -1)
|
||||
self.name = coalesce(name, "thread_" + text(object.__hash__(self)))
|
||||
BaseThread.__init__(self, -1, coalesce(name, "thread_" + text(object.__hash__(self))))
|
||||
self.target = target
|
||||
self.end_of_thread = Data()
|
||||
self.synch_lock = Lock("response synch lock")
|
||||
self.args = args
|
||||
|
||||
# ENSURE THERE IS A SHARED please_stop SIGNAL
|
||||
|
@ -258,7 +256,7 @@ class Thread(BaseThread):
|
|||
)
|
||||
|
||||
self.thread = None
|
||||
self.join_attempt = Signal("joining with " + self.name)
|
||||
self.ready_to_stop = Signal("joining with " + self.name)
|
||||
self.stopped = Signal("stopped signal for " + self.name)
|
||||
|
||||
if PARENT_THREAD in kwargs:
|
||||
|
@ -301,9 +299,6 @@ class Thread(BaseThread):
|
|||
DEBUG and Log.note("Thread {{name|quote}} got request to stop", name=self.name)
|
||||
|
||||
def _run(self):
|
||||
# if self.trace_func:
|
||||
# sys.settrace(self.trace_func)
|
||||
# self.trace_func = None
|
||||
self.id = get_ident()
|
||||
with RegisterThread(self):
|
||||
try:
|
||||
|
@ -312,8 +307,7 @@ class Thread(BaseThread):
|
|||
self.end_of_thread.response = self.target(*a, **k)
|
||||
except Exception as e:
|
||||
e = Except.wrap(e)
|
||||
with self.synch_lock:
|
||||
self.end_of_thread.exception = e
|
||||
self.end_of_thread.exception = e
|
||||
with self.parent.child_locker:
|
||||
emit_problem = self not in self.parent.children
|
||||
if emit_problem:
|
||||
|
@ -361,11 +355,18 @@ class Thread(BaseThread):
|
|||
"problem with thread {{name|quote}}", cause=e, name=self.name
|
||||
)
|
||||
finally:
|
||||
(Till(seconds=60) | self.join_attempt).wait()
|
||||
self.stopped.go()
|
||||
DEBUG and Log.note("thread {{name|quote}} is done, wait for join", name=self.name)
|
||||
if not self.ready_to_stop:
|
||||
DEBUG and Log.note("thread {{name|quote}} is done, wait for join", name=self.name)
|
||||
# WHERE DO WE PUT THE THREAD RESULT?
|
||||
# IF NO THREAD JOINS WITH THIS, THEN WHAT DO WE DO WITH THE RESULT?
|
||||
# HOW LONG DO WE WAIT FOR ANOTHER TO ACCEPT THE RESULT?
|
||||
#
|
||||
# WAIT 60seconds, THEN SEND RESULT TO LOGGER
|
||||
(Till(seconds=60) | self.ready_to_stop).wait()
|
||||
|
||||
if not self.join_attempt:
|
||||
self.stopped.go()
|
||||
|
||||
if not self.ready_to_stop:
|
||||
if self.end_of_thread.exception:
|
||||
# THREAD FAILURES ARE A PROBLEM ONLY IF NO ONE WILL BE JOINING WITH IT
|
||||
try:
|
||||
|
@ -389,6 +390,16 @@ class Thread(BaseThread):
|
|||
def is_alive(self):
|
||||
return not self.stopped
|
||||
|
||||
def release(self):
|
||||
"""
|
||||
RELEASE THREAD TO FEND FOR ITSELF. THREAD CAN EXPECT TO NEVER
|
||||
JOIN. WILL SEND RESULTS TO LOGS WHEN DONE.
|
||||
|
||||
PARENT THREAD WILL STILL ENSURE self HAS STOPPED PROPERLY
|
||||
"""
|
||||
self.ready_to_stop.go()
|
||||
return self
|
||||
|
||||
def join(self, till=None):
|
||||
"""
|
||||
RETURN THE RESULT {"response":r, "exception":e} OF THE THREAD EXECUTION (INCLUDING EXCEPTION, IF EXISTS)
|
||||
|
@ -406,7 +417,7 @@ class Thread(BaseThread):
|
|||
parent=Thread.current().name,
|
||||
child=self.name,
|
||||
)
|
||||
self.join_attempt.go()
|
||||
self.ready_to_stop.go()
|
||||
(self.stopped | till).wait()
|
||||
if self.stopped:
|
||||
self.parent.remove_child(self)
|
||||
|
@ -467,9 +478,8 @@ class RegisterThread(object):
|
|||
|
||||
def __init__(self, thread=None, name=None):
|
||||
if thread is None:
|
||||
thread = BaseThread(get_ident())
|
||||
thread = BaseThread(get_ident(), name)
|
||||
self.thread = thread
|
||||
thread.name = name
|
||||
|
||||
def __enter__(self):
|
||||
with ALL_LOCK:
|
||||
|
@ -505,32 +515,10 @@ def register_thread(func):
|
|||
return output
|
||||
|
||||
|
||||
def stop_main_thread(signum=0, frame=None):
|
||||
MAIN_THREAD.please_stop.go()
|
||||
if signum == 0:
|
||||
return
|
||||
elif signum == _signal.SIGTERM:
|
||||
raise SystemExit()
|
||||
else:
|
||||
raise KeyboardInterrupt()
|
||||
|
||||
|
||||
_signal.signal(_signal.SIGTERM, stop_main_thread)
|
||||
_signal.signal(_signal.SIGINT, stop_main_thread)
|
||||
|
||||
|
||||
def _wait_for_exit(please_stop):
|
||||
"""
|
||||
/dev/null PIPED TO sys.stdin SPEWS INFINITE LINES, DO NOT POLL AS OFTEN
|
||||
"""
|
||||
try:
|
||||
import msvcrt
|
||||
|
||||
_wait_for_exit_on_windows(please_stop)
|
||||
return
|
||||
except:
|
||||
pass
|
||||
|
||||
cr_count = 0 # COUNT NUMBER OF BLANK LINES
|
||||
|
||||
try:
|
||||
|
@ -565,23 +553,6 @@ def _wait_for_exit(please_stop):
|
|||
Log.note("done waiting for exit")
|
||||
|
||||
|
||||
def _wait_for_exit_on_windows(please_stop):
|
||||
import msvcrt
|
||||
|
||||
line = ""
|
||||
while not please_stop:
|
||||
if msvcrt.kbhit():
|
||||
chr = msvcrt.getche()
|
||||
if ord(chr) == 13:
|
||||
if line == "exit":
|
||||
Log.alert("'exit' Detected! Stopping...")
|
||||
return
|
||||
elif ord(chr) > 32:
|
||||
line += chr
|
||||
else:
|
||||
sleep(1)
|
||||
|
||||
|
||||
def _wait_for_interrupt(please_stop):
|
||||
DEBUG and Log.note("wait for stop signal")
|
||||
try:
|
||||
|
@ -594,6 +565,14 @@ def _wait_for_interrupt(please_stop):
|
|||
|
||||
MAIN_THREAD = MainThread()
|
||||
|
||||
|
||||
def stop_main_thread(signum=0, frame=None):
|
||||
MAIN_THREAD.please_stop.go()
|
||||
|
||||
|
||||
_signal.signal(_signal.SIGTERM, stop_main_thread)
|
||||
_signal.signal(_signal.SIGINT, stop_main_thread)
|
||||
|
||||
ALL_LOCK = allocate_lock()
|
||||
ALL = dict()
|
||||
ALL[get_ident()] = MAIN_THREAD
|
||||
|
|
|
@ -158,7 +158,7 @@ def capture_termination_signal(please_stop):
|
|||
(Till(seconds=61) | please_stop).wait()
|
||||
(Till(seconds=11) | please_stop).wait()
|
||||
|
||||
Thread.run("listen for termination", worker)
|
||||
Thread.run("listen for termination", worker).release()
|
||||
|
||||
|
||||
def get_instance_metadata(timeout=None):
|
||||
|
@ -192,6 +192,6 @@ def _get_metadata_from_from_aws(please_stop):
|
|||
machine_metadata.aws_instance_type = ec2.instance_type
|
||||
machine_metadata.name = ec2.instance_id
|
||||
|
||||
Thread.run("get aws machine metadata", _get_metadata_from_from_aws)
|
||||
Thread.run("get aws machine metadata", _get_metadata_from_from_aws).release()
|
||||
|
||||
from . import s3
|
||||
|
|
|
@ -176,51 +176,6 @@ def list2tab(rows):
|
|||
return "\t".join(keys) + "\n" + "\n".join(output)
|
||||
|
||||
|
||||
def list2table(rows, column_names=None):
|
||||
if column_names:
|
||||
keys = list(set(column_names))
|
||||
else:
|
||||
columns = set()
|
||||
for r in rows:
|
||||
columns |= set(r.keys())
|
||||
keys = list(columns)
|
||||
|
||||
output = [[unwraplist(r.get(k)) for k in keys] for r in rows]
|
||||
|
||||
return wrap({
|
||||
"meta": {"format": "table"},
|
||||
"header": keys,
|
||||
"data": output
|
||||
})
|
||||
|
||||
|
||||
def list2cube(rows, column_names=None):
|
||||
if column_names:
|
||||
keys = column_names
|
||||
else:
|
||||
columns = set()
|
||||
for r in rows:
|
||||
columns |= set(r.keys())
|
||||
keys = list(columns)
|
||||
|
||||
data = {k: [] for k in keys}
|
||||
output = wrap({
|
||||
"meta": {"format": "cube"},
|
||||
"edges": [
|
||||
{
|
||||
"name": "rownum",
|
||||
"domain": {"type": "rownum", "min": 0, "max": len(rows), "interval": 1}
|
||||
}
|
||||
],
|
||||
"data": data
|
||||
})
|
||||
|
||||
for r in rows:
|
||||
for k in keys:
|
||||
data[k].append(unwraplist(r[k]))
|
||||
|
||||
return output
|
||||
|
||||
|
||||
def value2string(value):
|
||||
# PROPER NULL HANDLING
|
||||
|
@ -598,21 +553,6 @@ def json_schema_to_markdown(schema):
|
|||
return "\n".join(lines)
|
||||
|
||||
|
||||
def table2csv(table_data):
|
||||
"""
|
||||
:param table_data: expecting a list of tuples
|
||||
:return: text in nice formatted csv
|
||||
"""
|
||||
text_data = [tuple(value2json(vals, pretty=True) for vals in rows) for rows in table_data]
|
||||
|
||||
col_widths = [max(len(t) for t in cols) for cols in zip(*text_data)]
|
||||
template = ", ".join(
|
||||
"{{" + text(i) + "|left_align(" + text(w) + ")}}"
|
||||
for i, w in enumerate(col_widths)
|
||||
)
|
||||
output = "\n".join(expand_template(template, d) for d in text_data)
|
||||
return output
|
||||
|
||||
|
||||
ZeroMoment2dict = mo_math.stats.ZeroMoment2dict
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче