This commit is contained in:
Kyle Lahnakoski 2018-09-28 07:25:01 -04:00
Родитель 71ae119ab2
Коммит c1b6a758c5
11 изменённых файлов: 215 добавлений и 80 удалений

12
vendor/jx_base/domains.py поставляемый
Просмотреть файл

@ -178,8 +178,16 @@ class DefaultDomain(Domain):
self.map[key] = canonical
return canonical
# def getIndexByKey(self, key):
# return self.map.get(key).dataIndex;
def getIndexByKey(self, key):
canonical = self.map.get(key)
if canonical:
return canonical.dataIndex
index = len(self.partitions)
canonical = Data(name=key, value=key, dataIndex=index)
self.partitions.append(canonical)
self.map[key] = canonical
return index
def getKey(self, part):
return part.value

6
vendor/jx_base/expressions.py поставляемый
Просмотреть файл

@ -921,7 +921,7 @@ class DivOp(Expression):
return DivOp("div", [self.lhs.map(map_), self.rhs.map(map_)], default=self.default.map(map_))
def missing(self):
AndOp("and", [
return AndOp("and", [
self.default.missing(),
OrOp("or", [self.lhs.missing(), self.rhs.missing(), EqOp("eq", [self.rhs, ZERO])])
]).partial_eval()
@ -2668,6 +2668,10 @@ class CaseOp(Expression):
def __data__(self):
return {"case": [w.__data__() for w in self.whens]}
def __eq__(self, other):
if isinstance(other, CaseOp):
return all(s==o for s, o in zip(self.whens, other.whens))
def vars(self):
output = set()
for w in self.whens:

11
vendor/jx_base/query.py поставляемый
Просмотреть файл

@ -484,7 +484,8 @@ def _normalize_edge(edge, dim_index, limit, schema=None):
name=edge,
value=jx_expression(edge, schema=schema),
allowNulls=True,
dim=dim_index
dim=dim_index,
domain=DefaultDomain()
)
]
else:
@ -609,13 +610,15 @@ def _normalize_window(window, schema=None):
try:
expr = jx_expression(v, schema=schema)
except Exception:
expr = ScriptOp("script", v)
if hasattr(v, "__call__"):
expr = v
else:
expr = ScriptOp("script", v)
return Data(
name=coalesce(window.name, window.value),
value=expr,
edges=[n for e in listwrap(window.edges) for n in _normalize_edge(e, schema)],
edges=[n for i, e in enumerate(listwrap(window.edges)) for n in _normalize_edge(e, i, limit=None, schema=schema)],
sort=_normalize_sort(window.sort),
aggregate=window.aggregate,
range=_normalize_range(window.range),

Просмотреть файл

@ -154,7 +154,7 @@ class ListContainer(Container, jx_base.Namespace, jx_base.Table):
return ListContainer("from "+self.name, filter(temp, self.data), self.schema)
def sort(self, sort):
return ListContainer("from "+self.name, jx.sort(self.data, sort, already_normalized=True), self.schema)
return ListContainer("sorted "+self.name, jx.sort(self.data, sort, already_normalized=True), self.schema)
def get(self, select):
"""
@ -199,7 +199,7 @@ class ListContainer(Container, jx_base.Namespace, jx_base.Table):
return ListContainer("from "+self.name, data=new_data, schema=new_schema)
def window(self, window):
_ = window
# _ = window
jx.window(self.data, window)
return self

19
vendor/jx_python/expressions.py поставляемый
Просмотреть файл

@ -25,7 +25,7 @@ from jx_base.expressions import Variable, DateOp, TupleOp, LeavesOp, BinaryOp, O
InequalityOp, extend, RowsOp, OffsetOp, GetOp, Literal, NullOp, TrueOp, FalseOp, DivOp, FloorOp, \
EqOp, NeOp, NotOp, LengthOp, NumberOp, StringOp, CountOp, MultiOp, RegExpOp, CoalesceOp, MissingOp, ExistsOp, \
PrefixOp, NotLeftOp, RightOp, NotRightOp, FindOp, BetweenOp, RangeOp, CaseOp, AndOp, \
ConcatOp, InOp, jx_expression, Expression, WhenOp, MaxOp, SplitOp, NULL, SelectOp, SuffixOp, LastOp
ConcatOp, InOp, jx_expression, Expression, WhenOp, MaxOp, SplitOp, NULL, SelectOp, SuffixOp, LastOp, IntegerOp, BasicEqOp
from jx_python.expression_compiler import compile_expression
from mo_times.dates import Date
@ -83,7 +83,7 @@ def to_python(self, not_null=False, boolean=False, many=False):
@extend(RowsOp)
def to_python(self, not_null=False, boolean=False, many=False):
agg = "rows[rownum+" + self.offset.to_python() + "]"
agg = "rows[rownum+" + IntegerOp("", self.offset).to_python() + "]"
path = split_field(json2value(self.var.json))
if not path:
return agg
@ -92,6 +92,9 @@ def to_python(self, not_null=False, boolean=False, many=False):
agg = agg + ".get(" + convert.value2quote(p) + ", EMPTY_DICT)"
return agg + ".get(" + convert.value2quote(path[-1]) + ")"
@extend(IntegerOp)
def to_python(self, not_null=False, boolean=False, many=False):
return "int(" + self.term.to_python() + ")"
@extend(GetOp)
def to_python(self, not_null=False, boolean=False, many=False):
@ -179,8 +182,10 @@ def to_python(self, not_null=False, boolean=False, many=False):
@extend(DivOp)
def to_python(self, not_null=False, boolean=False, many=False):
return "None if (" + self.missing().to_python() + ") else (" + self.lhs.to_python(
not_null=True) + ") / (" + self.rhs.to_python(not_null=True) + ")"
miss = self.missing().to_python()
lhs = self.lhs.to_python(not_null=True)
rhs = self.rhs.to_python(not_null=True)
return "None if (" + miss + ") else (" + lhs + ") / (" + rhs + ")"
@extend(FloorOp)
@ -193,6 +198,11 @@ def to_python(self, not_null=False, boolean=False, many=False):
return "(" + self.rhs.to_python() + ") in listwrap(" + self.lhs.to_python() + ")"
@extend(BasicEqOp)
def to_python(self, not_null=False, boolean=False, many=False):
return "(" + self.rhs.to_python() + ") == (" + self.lhs.to_python() + ")"
@extend(NeOp)
def to_python(self, not_null=False, boolean=False, many=False):
lhs = self.lhs.to_python()
@ -356,3 +366,4 @@ def to_python(self, not_null=False, boolean=False, many=False):
@extend(WhenOp)
def to_python(self, not_null=False, boolean=False, many=False):
return "(" + self.then.to_python() + ") if (" + self.when.to_python(boolean=True) + ") else (" + self.els_.to_python() + ")"

9
vendor/jx_python/jx.py поставляемый
Просмотреть файл

@ -82,6 +82,10 @@ def run(query, container=Null):
return cube_aggs(container, query_op)
elif isinstance(container, QueryOp):
container = run(container)
elif isinstance(container, Mapping):
query = container
container = query['from']
container = run(QueryOp.wrap(query, container, container.namespace), container)
else:
Log.error("Do not know how to handle {{type}}", type=container.__class__.__name__)
@ -996,7 +1000,10 @@ def window(data, param):
data = sort(data, sortColumns, already_normalized=True)
# SIMPLE CALCULATED VALUE
for rownum, r in enumerate(data):
r[name] = calc_value(r, rownum, data)
try:
r[name] = calc_value(r, rownum, data)
except Exception as e:
raise e
return
try:

4
vendor/jx_python/lists/aggs.py поставляемый
Просмотреть файл

@ -13,6 +13,8 @@ from __future__ import unicode_literals
import itertools
from jx_base.query import _normalize_domain
from jx_python import windows
from mo_dots import listwrap, wrap, coalesce
from mo_logs import Log
@ -105,7 +107,7 @@ def list_aggs(frum, query):
def make_accessor(e):
d = e.domain
# d = _normalize_domain(d)
if e.value:
accessor = jx_expression_to_function(e.value)
if e.allowNulls:

10
vendor/mo_collections/matrix.py поставляемый
Просмотреть файл

@ -11,7 +11,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from mo_future import text_type, xrange
from mo_future import text_type, xrange, transpose
from mo_dots import Null, Data, coalesce, get_module
from mo_kwargs import override
from mo_logs import Log
@ -341,7 +341,10 @@ def _getitem(c, i):
return (), c[select]
else:
select = i[0]
if select == None:
if isinstance(select, int):
return _getitem(c[select], i[1::])
elif select == None:
dims, cube = transpose(*[_getitem(cc, i[1::]) for cc in c])
return (len(cube),)+dims[0], cube
elif isinstance(select, slice):
@ -349,8 +352,7 @@ def _getitem(c, i):
dims, cube = transpose(*[_getitem(cc, i[1::]) for cc in sub])
return (len(cube),)+dims[0], cube
else:
with suppress_exception:
return _getitem(c[select], i[1::])
return _getitem(c[select], i[1::])
def _zero_dim(value):

177
vendor/mo_fabric/__init__.py поставляемый
Просмотреть файл

@ -1,57 +1,142 @@
from mo_logs.strings import quote
# encoding: utf-8
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from mo_json import value2json
import os
import sys
from datetime import datetime
from mo_threads.python import Python
from fabric2 import Config
from fabric2 import Connection as _Connection
from mo_dots import set_default, unwrap, wrap
from mo_files import File, TempFile
from mo_future import text_type
from mo_kwargs import override
from mo_logs import Log, exceptions, machine_metadata
class FabricProcess(object):
class Connection(object):
@override
def __init__(
self,
host,
user=None,
port=None,
config=None,
gateway=None,
forward_agent=None,
connect_timeout=None,
connect_kwargs=None,
inline_ssh_env=None,
key_filename=None, # part of connect_kwargs
kwargs=None
):
connect_kwargs = set_default({}, connect_kwargs, {"key_filename": File(key_filename).abspath})
config = Config(**unwrap(set_default({}, config, {"overrides": {"run": {
# "hide": True,
"err_stream": LogStream(host, "stderr"),
"out_stream": LogStream(host, "stdout")
}}})))
def __init__(self, name, fabric_settings):
"""
:param name:
:param fabric_settings:
"""
self.instance = Python(name, config={})
self.instance.import_module("fabric.context_managers", ["cd", "hide"])
self.instance.import_module("fabric.contrib", ["files"])
self.instance.import_module("fabric.operations", ["sudo", "run", "put", "get"])
self.instance.import_module("fabric.state", ["env"])
self.conn = _Connection(
host,
user,
port,
config,
gateway,
forward_agent,
connect_timeout,
connect_kwargs,
inline_ssh_env
)
result = self.conn.run("pwd")
self.cwd = result.stdout.split("\n")[0]
for k, v in fabric_settings:
self.instance.execute_script("env["+quote(k)+"]="+value2json(v))
def __getattr__(self, command):
def _add_private_file(self):
run('rm -f /home/ubuntu/private.json')
put('~/private_active_data_etl.json', '/home/ubuntu/private.json')
with cd("/home/ubuntu"):
run("chmod o-r private.json")
def cd(self, path):
return Context(self.instance, )
context = self.instance.cd(path)
exit = self.instance.execute_script(context, "__enter__", [])
class Context(object):
def __init__(self, proc, *args):
self.python=proc
proc.e
self.var = var
def exists(self, path):
with TempFile() as t:
try:
result = self.conn.get(path, t.abspath)
return t.exists
except IOError:
return False
def __enter__(self):
self.context = self.python.execute_script(self.var+".__enter__()")
return self
def __exit__(self):
self.python.execute_script(self.context+".__exit__()")
def __exit__(self, *exc):
self.conn.close()
def __getattr__(self, item):
return getattr(self.conn, item)
EMPTY = str("")
CR = str("\n")
class LogStream(object):
def __init__(self, name, type):
self.name = name
self.type = type
self.part_line = EMPTY
def write(self, value):
lines = value.split(CR)
if len(lines) == 1:
self.part_line += lines[0]
return
prefix = self.part_line
for line in lines[0:-1]:
note(u"{{name}} ({{type}}): {{line}}", name=self.name, type=self.type, line=prefix + line)
prefix = EMPTY
self.part_line = lines[-1]
def flush(self):
pass
def note(
template,
**params
):
if not isinstance(template, text_type):
Log.error("Log.note was expecting a unicode template")
if len(template) > 10000:
template = template[:10000]
log_params = wrap({
"template": template,
"params": params,
"timestamp": datetime.utcnow(),
"machine": machine_metadata,
"context": exceptions.NOTE
})
if not template.startswith("\n") and template.find("\n") > -1:
template = "\n" + template
if Log.trace:
log_template = "{{machine.name}} (pid {{machine.pid}}) - {{timestamp|datetime}} - {{thread.name}} - \"{{location.file}}:{{location.line}}\" ({{location.method}}) - " + template.replace("{{", "{{params.")
f = sys._getframe(1)
log_params.location = {
"line": f.f_lineno,
"file": text_type(f.f_code.co_filename.split(os.sep)[-1]),
"method": text_type(f.f_code.co_name)
}
else:
log_template = "{{timestamp|datetime}} - " + template.replace("{{", "{{params.")
Log.main_log.write(log_template, log_params)

36
vendor/mo_kwargs/__init__.py поставляемый
Просмотреть файл

@ -46,14 +46,18 @@ def override(func):
if err.startswith(func_name) and ("takes at least" in err or "required positional argument" in err):
missing = [p for p in params if str(p) not in packed]
given = [p for p in params if str(p) in packed]
get_logger().error(
"Problem calling {{func_name}}: Expecting parameter {{missing}}, given {{given}}",
func_name=func_name,
missing=missing,
given=given,
stack_depth=2
)
get_logger().error("Error dispatching call", e)
if not missing:
raise e
else:
get_logger().error(
"Problem calling {{func_name}}: Expecting parameter {{missing}}, given {{given}}",
func_name=func_name,
missing=missing,
given=given,
stack_depth=2,
cause=e
)
raise e
if "kwargs" not in params:
# WE ASSUME WE ARE ONLY ADDING A kwargs PARAMETER TO SOME REGULAR METHOD
@ -70,15 +74,15 @@ def override(func):
elif func_name in ("__init__", "__new__"):
def w_constructor(*args, **kwargs):
if "kwargs" in kwargs:
packed = params_pack(params, kwargs, dict_zip(params[1:], args[1:]), kwargs["kwargs"], defaults)
packed = params_pack(params, kwargs, dict_zip(params, args), kwargs["kwargs"], defaults)
elif len(args) == 2 and len(kwargs) == 0 and isinstance(args[1], Mapping):
# ASSUME SECOND UNNAMED PARAM IS kwargs
packed = params_pack(params, args[1], defaults)
packed = params_pack(params, {"self": args[0]}, args[1], defaults)
else:
# DO NOT INCLUDE self IN kwargs
packed = params_pack(params, kwargs, dict_zip(params[1:], args[1:]), defaults)
packed = params_pack(params, kwargs, dict_zip(params, args), defaults)
try:
return func(args[0], **packed)
return func(**packed)
except TypeError as e:
raise_error(e, packed)
return w_constructor
@ -126,8 +130,12 @@ def params_pack(params, *args):
k = text_type(k)
if k in settings:
continue
settings[k] = v
settings[k] = v if v != None else None
settings["kwargs"] = settings
output = wrap({str(k): settings[k] for k in params if k in settings})
output = {
str(k): settings[k] if k != "kwargs" else wrap(settings)
for k in params
if k in settings
}
return output

7
vendor/mo_threads/threads.py поставляемый
Просмотреть файл

@ -356,8 +356,13 @@ class Thread(BaseThread):
output = ALL.get(ident)
if output is None:
thread = BaseThread(ident)
thread.cprofiler = CProfiler()
thread.cprofiler.__enter__()
with ALL_LOCK:
ALL[ident] = thread
Log.warning("this thread is not known. Register this thread at earliest known entry point.")
return BaseThread(get_ident())
return thread
return output