Bug 943465 - esFrontLine does not filter paths strict enough

This commit is contained in:
Kyle Lahnakoski 2013-11-26 20:47:31 -05:00
Родитель 204300da6f
Коммит 8902cdf3bb
29 изменённых файлов: 4987 добавлений и 5 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -5,6 +5,7 @@ src
settings.json
resources/logs
esFrontLine/util/.svn
tests/util/.svn
build
dist
*.egg-info

Просмотреть файл

@ -1,4 +1,4 @@
graft tests
prune tests
graft docs
prune .git
prune .svn

Просмотреть файл

@ -16,9 +16,10 @@ import random
from flask import Flask, json
import flask
import requests
import sys
from werkzeug.contrib.fixers import HeaderRewriterFix
from werkzeug.exceptions import abort
import sys
app = Flask(__name__)
@ -51,7 +52,9 @@ def catch_all(path):
es = random.choice(listwrap(settings["elasticsearch"]))
## SEND REQUEST
headers = {'content-type': 'application/json'}
headers = flask.request.headers
headers['content-type']='application/json'
response = requests.get(
es["host"] + ":" + str(es["port"]) + "/" + path,
data=data,
@ -98,7 +101,8 @@ def filter(path_string, query):
if path[-1] not in ["_mapping", "_search"]:
raise Exception("request path must end with _mapping or _search")
elif len(path) == 3:
pass #OK
if path[-1] not in ["_search"]:
raise Exception("request path must end with _mapping or _search")
else:
raise Exception('request must be of form: {index_name} "/" {type_name} "/_search" ')
@ -150,7 +154,6 @@ app.wsgi_app = WSGICopyBody(app.wsgi_app)
logger = None
if __name__ == '__main__':
try:
parser = argparse.ArgumentParser()
parser.add_argument(*["--settings", "--settings-file", "--settings_file"], **{
@ -191,3 +194,4 @@ if __name__ == '__main__':
app.run(**settings["flask"])
except Exception, e:
print(e.message)

0
tests/__init__.py Normal file
Просмотреть файл

Просмотреть файл

@ -0,0 +1,33 @@
{
"elasticsearch":[{
"host":"http://elasticsearch4.metrics.scl3.mozilla.com",
"port":9200
},{
"host":"http://elasticsearch5.metrics.scl3.mozilla.com",
"port":9200
},{
"host":"http://elasticsearch7.metrics.scl3.mozilla.com",
"port":9200
},{
"host":"http://elasticsearch8.metrics.scl3.mozilla.com",
"port":9200
}],
"flask":{
"host":"0.0.0.0",
"port":9292,
"debug":false,
"threaded":true,
"processes":1
},
"debug":{
"log":[{
"filename": "./tests/results/logs/app.log",
"maxBytes": 10000000,
"backupCount": 200,
"encoding": "utf8"
},{
"stream":"sys.stdout"
}]
}
}

65
tests/tests_by_bug_id.py Normal file
Просмотреть файл

@ -0,0 +1,65 @@
from _subprocess import CREATE_NEW_PROCESS_GROUP
import os
import subprocess
import requests
import signal
from util.logs import Log
from util.threads import Thread, Signal
url = "http://localhost:9292"
# url = "http://klahnakoski-es.corp.tor1.mozilla.com:9292"
def test_943465():
response = request("GET", url + "/_cluster/nodes/_local")
if response.status_code != 400:
Log.error("should not allow")
response = request("GET", url + "/_cluster/nodes/stats")
if response.status_code != 400:
Log.error("should not allow")
def request(type, url, **kwargs):
Log.note("CLIENT REQUEST: {{type}} {{url}} args={{args}}", {
"type": type,
"url": url,
"args": kwargs
})
response = requests.request(type, url, **kwargs)
Log.note("CLIENT RESPONSE: {{response.status_code}} {{response_text}}", {"response": response})
return response
server_is_ready = Signal()
def run_app(please_stop):
proc = subprocess.Popen(
["python", "esFrontLine\\app.py", "--settings", "tests/resources/test_settings.json"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=-1,
creationflags=CREATE_NEW_PROCESS_GROUP
)
while not please_stop:
line = proc.stdout.readline()
if not line:
continue
if line.find(" * Running on") >= 0:
server_is_ready.go()
Log.note("SERVER: {{line}}", {"line": line.strip()})
proc.send_signal(signal.CTRL_C_EVENT)
thread = Thread.run("run app", run_app)
try:
server_is_ready.wait_for_go()
test_943465()
Log.note("ALL TESTS PASS")
finally:
thread.please_stop.go()

0
tests/util/__init__.py Normal file
Просмотреть файл

221
tests/util/cnv.py Normal file
Просмотреть файл

@ -0,0 +1,221 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import StringIO
import datetime
import re
import time
from .multiset import Multiset
from .jsons import json_decoder, json_encoder
from .logs import Log
import struct
from .strings import expand_template
from .struct import StructList, Null
class CNV:
"""
DUE TO MY POOR MEMORY, THIS IS A LIST OF ALL CONVERSION ROUTINES
"""
@staticmethod
def object2JSON(obj, pretty=False):
try:
return json_encoder.encode(obj, pretty=pretty)
except Exception, e:
Log.error("Can not encode into JSON: {{value}}", {"value":repr(obj)}, e)
@staticmethod
def JSON2object(json_string, params=None, flexible=False):
try:
#REMOVE """COMMENTS""", #COMMENTS, //COMMENTS, AND \n \r
if flexible: json_string=re.sub(r"\"\"\".*?\"\"\"|\s+//.*\n|#.*?\n|\n|\r", r" ", json_string) #DERIVED FROM https://github.com/jeads/datasource/blob/master/datasource/bases/BaseHub.py#L58
if params:
params=dict([(k,CNV.value2quote(v)) for k,v in params.items()])
json_string=expand_template(json_string, params)
obj=json_decoder.decode(json_string)
if isinstance(obj, list): return StructList(obj)
return struct.wrap(obj)
except Exception, e:
Log.error("Can not decode JSON:\n\t"+json_string, e)
@staticmethod
def string2datetime(value, format):
## http://docs.python.org/2/library/datetime.html#strftime-and-strptime-behavior
try:
return datetime.datetime.strptime(value, format)
except Exception, e:
Log.error("Can not format {{value}} with {{format}}", {"value":value, "format":format}, e)
@staticmethod
def datetime2string(value, format):
try:
return value.strftime(format)
except Exception, e:
Log.error("Can not format {{value}} with {{format}}", {"value":value, "format":format}, e)
@staticmethod
def datetime2unix(d):
if d == None:
return None
return long(time.mktime(d.timetuple()))
@staticmethod
def datetime2milli(d):
try:
epoch = datetime.datetime(1970, 1, 1)
diff = d-epoch
return (diff.days * 86400000) + \
(diff.seconds * 1000) + \
(diff.microseconds / 1000) # 86400000=24*3600*1000
except Exception, e:
Log.error("Can not convert {{value}}", {"value": d})
@staticmethod
def unix2datetime(u):
return datetime.datetime.utcfromtimestamp(u)
@staticmethod
def milli2datetime(u):
return datetime.datetime.utcfromtimestamp(u/1000)
@staticmethod
def dict2Multiset(dic):
if dic == None:
return None
output = Multiset()
output.dic = struct.unwrap(dic).copy()
return output
@staticmethod
def multiset2dict(value):
"""
CONVERT MULTISET TO dict THAT MAPS KEYS TO MAPS KEYS TO KEY-COUNT
"""
if value == None:
return None
return dict(value.dic)
@staticmethod
def table2list(
column_names, #tuple of columns names
rows #list of tuples
):
return StructList([dict(zip(column_names, r)) for r in rows])
#PROPER NULL HANDLING
@staticmethod
def value2string(value):
if value == None:
return None
return unicode(value)
#RETURN PRETTY PYTHON CODE FOR THE SAME
@staticmethod
def value2quote(value):
if isinstance(value, basestring):
return CNV.string2quote(value)
else:
return repr(value)
@staticmethod
def string2quote(value):
# return repr(value)
return "\""+value.replace("\\", "\\\\").replace("\"", "\\\"")+"\""
#RETURN PYTHON CODE FOR THE SAME
@staticmethod
def value2code(value):
return repr(value)
@staticmethod
def DataFrame2string(df, columns=None):
output = StringIO.StringIO()
try:
df.to_csv(output, sep="\t", header=True, cols=columns, engine='python')
return output.getvalue()
finally:
output.close()
@staticmethod
def ascii2char(ascii):
return chr(ascii)
@staticmethod
def char2ascii(char):
return ord(char)
@staticmethod
def latin12hex(value):
return value.encode("hex")
@staticmethod
def int2hex(value, size):
return (("0"*size)+hex(value)[2:])[-size:]
@staticmethod
def value2intlist(value):
if value == None:
return None
elif hasattr(value, '__iter__'):
output=[int(d) for d in value if d!=""]
return output
elif value.strip()=="":
return None
else:
return [int(value)]
@staticmethod
def value2int(value):
if value == None:
return None
else:
return int(value)
@staticmethod
def value2number(v):
try:
if isinstance(v, float) and round(v,0)!=v:
return v
#IF LOOKS LIKE AN INT, RETURN AN INT
return int(v)
except Exception:
try:
return float(v)
except Exception, e:
Log.error("Not a number ({{value}})", {"value":v}, e)
@staticmethod
def utf82unicode(value):
return unicode(value.decode('utf8'))
@staticmethod
def latin12unicode(value):
return unicode(value.decode('iso-8859-1'))

683
tests/util/db.py Normal file
Просмотреть файл

@ -0,0 +1,683 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime
import subprocess
from pymysql import connect
from . import struct
from .maths import Math
from .strings import expand_template
from .struct import nvl
from .cnv import CNV
from .logs import Log, Except
from .queries import Q
from .strings import indent
from .strings import outdent
from .files import File
DEBUG = False
MAX_BATCH_SIZE=100
all_db=[]
class DB(object):
"""
"""
def __init__(self, settings, schema=None, preamble=None):
"""
OVERRIDE THE settings.schema WITH THE schema PARAMETER
preamble WILL BE USED TO ADD COMMENTS TO THE BEGINNING OF ALL SQL
THE INTENT IS TO HELP ADMINISTRATORS ID THE SQL RUNNING ON THE DATABASE
"""
if settings == None:
return
all_db.append(self)
if isinstance(settings, DB):
settings=settings.settings
self.settings=settings.copy()
self.settings.schema=nvl(schema, self.settings.schema)
preamble=nvl(preamble, self.settings.preamble)
if preamble == None:
self.preamble=""
else:
self.preamble=indent(preamble, "# ").strip()+"\n"
self.debug=nvl(self.settings.debug, DEBUG)
self._open()
def _open(self):
""" DO NOT USE THIS UNLESS YOU close() FIRST"""
try:
self.db=connect(
host=self.settings.host,
port=self.settings.port,
user=nvl(self.settings.username, self.settings.user),
passwd=nvl(self.settings.password, self.settings.passwd),
db=nvl(self.settings.schema, self.settings.db),
charset=u"utf8",
use_unicode=True
)
except Exception, e:
Log.error(u"Failure to connect", e)
self.cursor = None
self.partial_rollback=False
self.transaction_level=0
self.backlog=[] #accumulate the write commands so they are sent at once
def __enter__(self):
self.begin()
return self
def __exit__(self, type, value, traceback):
if isinstance(value, BaseException):
try:
if self.cursor: self.cursor.close()
self.cursor = None
self.rollback()
except Exception, e:
Log.warning(u"can not rollback()", e)
finally:
self.close()
return
try:
self.commit()
except Exception, e:
Log.warning(u"can not commit()", e)
finally:
self.close()
def transaction(self):
"""
return not-started transaction (for with statement)
"""
return Transaction(self)
def begin(self):
if self.transaction_level==0: self.cursor=self.db.cursor()
self.transaction_level+=1
self.execute("SET TIME_ZONE='+00:00'")
def close(self):
if self.transaction_level>0:
Log.error(u"expecting commit() or rollback() before close")
self.cursor = None #NOT NEEDED
try:
self.db.close()
except Exception, e:
if e.message.find("Already closed")>=0:
return
Log.warning(u"can not close()", e)
finally:
all_db.remove(self)
def commit(self):
try:
self._execute_backlog()
except Exception, e:
try:
self.rollback()
except Exception:
pass
Log.error(u"Error while processing backlog", e)
if self.transaction_level==0:
Log.error(u"No transaction has begun")
elif self.transaction_level==1:
if self.partial_rollback:
try:
self.rollback()
except Exception:
pass
Log.error(u"Commit after nested rollback is not allowed")
else:
if self.cursor: self.cursor.close()
self.cursor = None
self.db.commit()
self.transaction_level-=1
def flush(self):
try:
self.commit()
except Exception, e:
Log.error(u"Can not flush", e)
try:
self.begin()
except Exception, e:
Log.error(u"Can not flush", e)
def rollback(self):
self.backlog=[] #YAY! FREE!
if self.transaction_level==0:
Log.error(u"No transaction has begun")
elif self.transaction_level==1:
self.transaction_level-=1
if self.cursor!=None:
self.cursor.close()
self.cursor = None
self.db.rollback()
else:
self.transaction_level-=1
self.partial_rollback=True
Log.warning(u"Can not perform partial rollback!")
def call(self, proc_name, params):
self._execute_backlog()
try:
self.cursor.callproc(proc_name, params)
self.cursor.close()
self.cursor=self.db.cursor()
except Exception, e:
Log.error(u"Problem calling procedure "+proc_name, e)
def query(self, sql, param=None):
self._execute_backlog()
try:
old_cursor=self.cursor
if not old_cursor: #ALLOW NON-TRANSACTIONAL READS
self.cursor=self.db.cursor()
if param: sql=expand_template(sql, self.quote_param(param))
sql = self.preamble + outdent(sql)
if self.debug:
Log.note(u"Execute SQL:\n{{sql}}", {u"sql": indent(sql)})
self.cursor.execute(sql)
columns = [utf8_to_unicode(d[0]) for d in nvl(self.cursor.description, [])]
fixed=[[utf8_to_unicode(c) for c in row] for row in self.cursor]
result=CNV.table2list(columns, fixed)
if not old_cursor: #CLEANUP AFTER NON-TRANSACTIONAL READS
self.cursor.close()
self.cursor = None
return result
except Exception, e:
if e.message.find("InterfaceError") >= 0:
Log.error(u"Did you close the db connection?", e)
Log.error(u"Problem executing SQL:\n"+indent(sql.strip()), e, offset=1)
# EXECUTE GIVEN METHOD FOR ALL ROWS RETURNED
def forall(self, sql, param=None, _execute=None):
assert _execute
num=0
self._execute_backlog()
try:
old_cursor=self.cursor
if not old_cursor: #ALLOW NON-TRANSACTIONAL READS
self.cursor=self.db.cursor()
if param: sql=expand_template(sql,self.quote_param(param))
sql = self.preamble + outdent(sql)
if self.debug:
Log.note(u"Execute SQL:\n{{sql}}", {u"sql":indent(sql)})
self.cursor.execute(sql)
columns = tuple( [utf8_to_unicode(d[0]) for d in self.cursor.description] )
for r in self.cursor:
num+=1
_execute(struct.wrap(dict(zip(columns, [utf8_to_unicode(c) for c in r]))))
if not old_cursor: #CLEANUP AFTER NON-TRANSACTIONAL READS
self.cursor.close()
self.cursor = None
except Exception, e:
Log.error(u"Problem executing SQL:\n"+indent(sql.strip()), e, offset=1)
return num
def execute(self, sql, param=None):
if self.transaction_level == 0:
Log.error(u"Expecting transaction to be started before issuing queries")
if param:
sql = expand_template(sql, self.quote_param(param))
sql = outdent(sql)
self.backlog.append(sql)
if len(self.backlog) >= MAX_BATCH_SIZE:
self._execute_backlog()
def execute_file(self, filename, param=None):
content=File(filename).read()
self.execute(content, param)
@staticmethod
def execute_sql(settings, sql, param=None):
"""EXECUTE MANY LINES OF SQL (FROM SQLDUMP FILE, MAYBE?"""
if param:
with DB(settings) as temp:
sql=expand_template(sql, temp.quote_param(param))
# MWe have no way to execute an entire SQL file in bulk, so we
# have to shell out to the commandline client.
args = [
u"mysql",
u"-h{0}".format(settings.host),
u"-u{0}".format(settings.username),
u"-p{0}".format(settings.password),
u"{0}".format(settings.schema)
]
proc = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=-1
)
(output, _) = proc.communicate(sql)
if proc.returncode:
if len(sql)>10000:
sql=u"<"+unicode(len(sql))+u" bytes of sql>"
Log.error(u"Unable to execute sql: return code {{return_code}}, {{output}}:\n {{sql}}\n", {
u"sql":indent(sql),
u"return_code":proc.returncode,
u"output":output
})
@staticmethod
def execute_file(settings, filename, param=None):
# MySQLdb provides no way to execute an entire SQL file in bulk, so we
# have to shell out to the commandline client.
sql=File(filename).read()
DB.execute_sql(settings, sql, param)
def _execute_backlog(self):
if not self.backlog: return
(backlog, self.backlog)=(self.backlog, [])
if self.db.__module__.startswith(u"pymysql"):
# BUG IN PYMYSQL: CAN NOT HANDLE MULTIPLE STATEMENTS
# https://github.com/PyMySQL/PyMySQL/issues/157
for b in backlog:
sql = self.preamble+b
try:
if self.debug:
Log.note(u"Execute SQL:\n{{sql|indent}}", {u"sql":sql})
self.cursor.execute(b)
except Exception, e:
Log.error(u"Can not execute sql:\n{{sql}}", {u"sql":sql}, e)
self.cursor.close()
self.cursor = self.db.cursor()
else:
for i, g in Q.groupby(backlog, size=MAX_BATCH_SIZE):
sql=self.preamble+u";\n".join(g)
try:
if self.debug:
Log.note(u"Execute block of SQL:\n{{sql|indent}}", {u"sql":sql})
self.cursor.execute(sql)
self.cursor.close()
self.cursor = self.db.cursor()
except Exception, e:
Log.error(u"Problem executing SQL:\n{{sql}}", {u"sql":indent(sql.strip())}, e, offset=1)
## Insert dictionary of values into table
def insert(self, table_name, record):
keys = record.keys()
try:
command = u"INSERT INTO " + self.quote_column(table_name) + u"(" + \
u",".join([self.quote_column(k) for k in keys]) + \
u") VALUES (" + \
u",".join([self.quote_value(record[k]) for k in keys]) + \
u")"
self.execute(command)
except Exception, e:
Log.error(u"problem with record: {{record}}", {u"record": record}, e)
# candidate_key IS LIST OF COLUMNS THAT CAN BE USED AS UID (USUALLY PRIMARY KEY)
# ONLY INSERT IF THE candidate_key DOES NOT EXIST YET
def insert_new(self, table_name, candidate_key, new_record):
candidate_key=struct.listwrap(candidate_key)
condition=u" AND\n".join([self.quote_column(k)+u"="+self.quote_value(new_record[k]) if new_record[k] != None else self.quote_column(k)+u" IS Null" for k in candidate_key])
command=u"INSERT INTO "+self.quote_column(table_name)+u" ("+\
u",".join([self.quote_column(k) for k in new_record.keys()])+\
u")\n"+\
u"SELECT a.* FROM (SELECT "+u",".join([self.quote_value(v)+u" "+self.quote_column(k) for k,v in new_record.items()])+u" FROM DUAL) a\n"+\
u"LEFT JOIN "+\
u"(SELECT 'dummy' exist FROM "+self.quote_column(table_name)+u" WHERE "+condition+u" LIMIT 1) b ON 1=1 WHERE exist IS Null"
self.execute(command, {})
# ONLY INSERT IF THE candidate_key DOES NOT EXIST YET
def insert_newlist(self, table_name, candidate_key, new_records):
for r in new_records:
self.insert_new(table_name, candidate_key, r)
def insert_list(self, table_name, records):
if not records:
return
keys = set()
for r in records:
keys |= set(r.keys())
keys = Q.sort(keys)
try:
command = \
u"INSERT INTO " + self.quote_column(table_name) + u"(" + \
u",".join([self.quote_column(k) for k in keys]) + \
u") VALUES " + ",".join([
"(" + u",".join([self.quote_value(r[k]) for k in keys]) + u")"
for r in records
])
self.execute(command)
except Exception, e:
Log.error(u"problem with record: {{record}}", {u"record": records}, e)
def update(self, table_name, where_slice, new_values):
"""
where_slice IS A Struct WHICH WILL BE USED TO MATCH ALL IN table
"""
new_values = self.quote_param(new_values)
where_clause = u" AND\n".join([
self.quote_column(k) + u"=" + self.quote_value(v) if v != None else self.quote_column(k) + " IS NULL"
for k, v in where_slice.items()]
)
command=u"UPDATE "+self.quote_column(table_name)+u"\n"+\
u"SET "+\
u",\n".join([self.quote_column(k)+u"="+v for k,v in new_values.items()])+u"\n"+\
u"WHERE "+\
where_clause
self.execute(command, {})
def quote_param(self, param):
return {k:self.quote_value(v) for k, v in param.items()}
def quote_value(self, value):
"""
convert values to mysql code for the same
mostly delegate directly to the mysql lib, but some exceptions exist
"""
try:
if value == None:
return "NULL"
elif isinstance(value, SQL):
if not value.param:
#value.template CAN BE MORE THAN A TEMPLATE STRING
return self.quote_sql(value.template)
param = {k: self.quote_sql(v) for k, v in value.param.items()}
return expand_template(value.template, param)
elif isinstance(value, basestring):
return self.db.literal(value)
elif isinstance(value, datetime):
return u"str_to_date('"+value.strftime(u"%Y%m%d%H%M%S")+u"', '%Y%m%d%H%i%s')"
elif hasattr(value, '__iter__'):
return self.db.literal(CNV.object2JSON(value))
elif isinstance(value, dict):
return self.db.literal(CNV.object2JSON(value))
elif Math.is_number(value):
return unicode(value)
else:
return self.db.literal(value)
except Exception, e:
Log.error(u"problem quoting SQL", e)
def quote_sql(self, value, param=None):
"""
USED TO EXPAND THE PARAMETERS TO THE SQL() OBJECT
"""
try:
if isinstance(value, SQL):
if not param:
return value
param = {k: self.quote_sql(v) for k, v in param.items()}
return expand_template(value, param)
elif isinstance(value, basestring):
return value
elif isinstance(value, dict):
return self.db.literal(CNV.object2JSON(value))
elif hasattr(value, '__iter__'):
return u"(" + u",".join([self.quote_sql(vv) for vv in value]) + u")"
else:
return unicode(value)
except Exception, e:
Log.error(u"problem quoting SQL", e)
def quote_column(self, column_name, table=None):
if isinstance(column_name, basestring):
if table:
column_name = table + "." + column_name
return SQL(u"`" + column_name.replace(u".", u"`.`") + u"`") #MY SQL QUOTE OF COLUMN NAMES
elif isinstance(column_name, list):
if table:
return SQL(u", ".join([self.quote_column(table + "." + c) for c in column_name]))
return SQL(u", ".join([self.quote_column(c) for c in column_name]))
else:
#ASSUME {u"name":name, u"value":value} FORM
return SQL(column_name.value + u" AS " + self.quote_column(column_name.name))
def sort2sqlorderby(self, sort):
sort = Q.normalize_sort(sort)
return u",\n".join([self.quote_column(s.field) + (" DESC" if s.sort == -1 else " ASC") for s in sort])
def esfilter2sqlwhere(self, esfilter):
return SQL(self._filter2where(esfilter))
def isolate(self, separator, list):
if len(list) > 1:
return u"(\n" + indent((" "+separator+"\n").join(list)) + u"\n)"
else:
return list[0]
def _filter2where(self, esfilter):
esfilter=struct.wrap(esfilter)
if esfilter[u"and"]:
return self.isolate("AND", [self._filter2where(a) for a in esfilter[u"and"]])
elif esfilter[u"or"]:
return self.isolate("OR", [self._filter2where(a) for a in esfilter[u"or"]])
elif esfilter[u"not"]:
return u"NOT ("+self._filter2where(esfilter[u"not"])+u")"
elif esfilter.term:
return self.isolate("AND", [self.quote_column(col)+u"="+self.quote_value(val) for col, val in esfilter.term.items()])
elif esfilter.terms:
for col, v in esfilter.terms.items():
try:
int_list=CNV.value2intlist(v)
filter = int_list_packer(col, int_list)
return self._filter2where(filter)
except Exception, e:
if not hasattr(e, "contains") or not e.contains("no packing possible"):
Log.warning("WARNING: Not an int-list: {{list}}", {"list":v}, e)
return self.quote_column(col)+u" in ("+", ".join([self.quote_value(val) for val in v])+")"
elif esfilter.script:
return u"("+esfilter.script+u")"
elif esfilter.range:
name2sign={
u"gt": u">",
u"gte": u">=",
u"lte": u"<=",
u"lt": u"<"
}
def single(col, r):
min=nvl(r["gte"], r[">="])
max=nvl(r["lte"], r["<="])
if min and max:
#SPECIAL CASE (BETWEEN)
return self.quote_column(col)+u" BETWEEN "+self.quote_value(min)+u" AND "+self.quote_value(max)
else:
return " AND ".join(
self.quote_column(col) + name2sign[sign] + self.quote_value(value)
for sign, value in r.items()
)
output = self.isolate("AND", [single(col, ranges) for col, ranges in esfilter.range.items()])
return output
elif esfilter.exists:
if isinstance(esfilter.exists, basestring):
return u"("+self.quote_column(esfilter.exists)+u" IS NOT Null)"
else:
return u"("+self.quote_column(esfilter.exists.field)+u" IS NOT Null)"
else:
Log.error(u"Can not convert esfilter to SQL: {{esfilter}}", {u"esfilter":esfilter})
def utf8_to_unicode(v):
try:
if isinstance(v, str):
return v.decode(u"utf8")
else:
return v
except Exception, e:
Log.error(u"not expected", e)
#ACTUAL SQL, DO NOT QUOTE THIS STRING
class SQL(unicode):
def __init__(self, template='', param=None):
unicode.__init__(self)
self.template=template
self.param=param
def __str__(self):
Log.error(u"do not do this")
def int_list_packer(term, values):
"""
return singletons, ranges and exclusions
"""
DENSITY = 10 #a range can have holes, this is inverse of the hole density
MIN_RANGE = 20 #min members before a range is allowed to be used
singletons = set()
ranges = []
exclude = set()
sorted = Q.sort(values)
last = sorted[0]
curr_start = last
curr_excl = set()
for v in sorted[1:]:
if v <= last + 1:
pass
elif v - last > 3:
#big step, how do we deal with it?
if last == curr_start:
#not a range yet, so just add as singlton
singletons.add(last)
elif last - curr_start - len(curr_excl) < MIN_RANGE or ((last - curr_start) < len(curr_excl) * DENSITY):
#small ranges are singletons, sparse ranges are singletons
singletons |= set(range(curr_start, last + 1))
singletons -= curr_excl
else:
#big enough, and dense enough range
ranges.append({"gte": curr_start, "lte": last})
exclude |= curr_excl
curr_start = v
curr_excl = set()
else:
if 1 + last - curr_start >= len(curr_excl) * DENSITY:
#high density, keep track of excluded and continue
add_me = set(range(last + 1, v))
curr_excl |= add_me
elif 1 + last - curr_start - len(curr_excl) < MIN_RANGE:
#not big enough, convert range to singletons
new_singles = set(range(curr_start, last + 1)) - curr_excl
singletons = singletons | new_singles
curr_start = v
curr_excl = set()
else:
ranges.append({"gte": curr_start, "lte": last})
exclude |= curr_excl
curr_start = v
curr_excl = set()
last = v
if last == curr_start:
#not a range yet, so just add as singlton
singletons.add(last)
elif last - curr_start - len(curr_excl) < MIN_RANGE or ((last - curr_start) < len(curr_excl) * DENSITY):
#small ranges are singletons, sparse ranges are singletons
singletons |= set(range(curr_start, last + 1))
singletons -= curr_excl
else:
#big enough, and dense enough range
ranges.append({"gte": curr_start, "lte": last})
exclude |= curr_excl
if ranges:
r = {"or": [{"range": {term: r}} for r in ranges]}
if exclude:
r = {"and": [r, {"not": {"terms": {term: Q.sort(exclude)}}}]}
if singletons:
return {"or": [
{"terms": {term: Q.sort(singletons)}},
r
]}
else:
return r
else:
raise Except("no packing possible")
class Transaction(object):
def __init__(self, db):
self.db = db
def __enter__(self):
self.db.begin()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_val, Exception):
self.db.rollback()
else:
self.db.commit()

344
tests/util/elasticsearch.py Normal file
Просмотреть файл

@ -0,0 +1,344 @@
# encoding: utf-8
#
from datetime import datetime
import re
import sha
import time
import requests
from .threads import ThreadedQueue
import struct
from .maths import Math
from .queries import Q
from .cnv import CNV
from .logs import Log
from .struct import nvl, Null
from .struct import Struct, StructList
DEBUG=False
class ElasticSearch(object):
def __init__(self, settings):
assert settings.host
assert settings.index
assert settings.type
self.metadata = None
if not settings.port: settings.port=9200
self.debug=nvl(settings.debug, DEBUG)
globals()["DEBUG"]=DEBUG or self.debug
self.settings=settings
self.path=settings.host+":"+unicode(settings.port)+"/"+settings.index+"/"+settings.type
@staticmethod
def create_index(settings, schema):
if isinstance(schema, basestring):
schema=CNV.JSON2object(schema)
ElasticSearch.post(
settings.host+":"+unicode(settings.port)+"/"+settings.index,
data=CNV.object2JSON(schema),
headers={"Content-Type":"application/json"}
)
time.sleep(2)
es=ElasticSearch(settings)
return es
@staticmethod
def delete_index(settings, index=None):
index=nvl(index, settings.index)
ElasticSearch.delete(
settings.host+":"+unicode(settings.port)+"/"+index,
)
def get_aliases(self):
"""
RETURN LIST OF {"alias":a, "index":i} PAIRS
ALL INDEXES INCLUDED, EVEN IF NO ALIAS {"alias":Null}
"""
data=self.get_metadata().indices
output=[]
for index, desc in data.items():
if not desc["aliases"]:
output.append({"index":index, "alias":None})
else:
for a in desc["aliases"]:
output.append({"index":index, "alias":a})
return struct.wrap(output)
def get_metadata(self):
if not self.metadata:
response=self.get(self.settings.host+":"+unicode(self.settings.port)+"/_cluster/state")
self.metadata=response.metadata
return self.metadata
def get_schema(self):
return self.get_metadata().indicies[self.settings.index]
#DELETE ALL INDEXES WITH GIVEN PREFIX, EXCEPT name
def delete_all_but(self, prefix, name):
if prefix == name:
Log.note("{{index_name}} will not be deleted", {"index_name": prefix})
for a in self.get_aliases():
# MATCH <prefix>YYMMDD_HHMMSS FORMAT
if re.match(re.escape(prefix) + "\\d{8}_\\d{6}", a.index) and a.index != name:
ElasticSearch.delete_index(self.settings, a.index)
@staticmethod
def proto_name(prefix, timestamp=None):
if not timestamp:
timestamp = datetime.utcnow()
return prefix + CNV.datetime2string(timestamp, "%Y%m%d_%H%M%S")
def add_alias(self, alias):
self.metadata = None
requests.post(
self.settings.host+":"+unicode(self.settings.port)+"/_aliases",
CNV.object2JSON({
"actions":[
{"add":{"index":self.settings.index, "alias":alias}}
]
})
)
def get_proto(self, alias):
"""
RETURN ALL INDEXES THAT ARE INTENDED TO BE GIVEN alias, BUT HAVE NO
ALIAS YET BECAUSE INCOMPLETE
"""
output=Q.sort([
a.index
for a in self.get_aliases()
if re.match(re.escape(alias)+"\\d{8}_\\d{6}", a.index) and not a.alias
])
return output
def get_index(self, alias):
"""
RETURN THE INDEX USED BY THIS alias
"""
output = Q.sort([
a.index
for a in self.get_aliases()
if a.alias == alias
])
if len(output) > 1:
Log.error("only one index with given alias==\"{{alias}}\" expected", {"alias": alias})
if not output:
return Null
return output.last()
def is_proto(self, index):
"""
RETURN True IF THIS INDEX HAS NOT BEEN ASSIGNED IT'S ALIAS
"""
for a in self.get_aliases():
if a.index==index and a.alias:
return False
return True
def delete_record(self, query):
if isinstance(query, dict):
ElasticSearch.delete(
self.path+"/_query",
data=CNV.object2JSON(query)
)
else:
ElasticSearch.delete(
self.path+"/"+query
)
def extend(self, records):
# ADD LINE WITH COMMAND
lines=[]
for r in records:
id=r["id"]
if "json" in r:
json=r["json"]
elif "value" in r:
json=CNV.object2JSON(r["value"])
else:
Log.error("Expecting every record given to have \"value\" or \"json\" property")
if id == None:
id = sha.new(json).hexdigest()
lines.append(u'{"index":{"_id":'+CNV.object2JSON(id)+'}}')
lines.append(json)
if not lines: return
response=ElasticSearch.post(
self.path+"/_bulk",
data="\n".join(lines).encode("utf8")+"\n",
headers={"Content-Type":"text"}
)
items=response["items"]
for i, item in enumerate(items):
if not item.index.ok:
Log.error("{{error}} while loading line:\n{{line}}", {
"error":item.index.error,
"line":lines[i*2+1]
})
if self.debug:
Log.note("{{num}} items added", {"num":len(lines)/2})
# RECORDS MUST HAVE id AND json AS A STRING OR
# HAVE id AND value AS AN OBJECT
def add(self, record):
if isinstance(record, list):
Log.error("add() has changed to only accept one record, no lists")
self.extend([record])
# -1 FOR NO REFRESH
def set_refresh_interval(self, seconds):
if seconds <= 0:
interval = "-1"
else:
interval = unicode(seconds) + "s"
response=ElasticSearch.put(
self.settings.host + ":" + unicode(
self.settings.port) + "/" + self.settings.index + "/_settings",
data="{\"index.refresh_interval\":\"" + interval + "\"}"
)
if response.content != '{"ok":true}':
Log.error("Can not set refresh interval ({{error}})", {
"error": response.content
})
def search(self, query):
try:
return ElasticSearch.post(self.path+"/_search", data=CNV.object2JSON(query))
except Exception, e:
Log.error("Problem with search", e)
def threaded_queue(self, size):
return ThreadedQueue(self, size)
@staticmethod
def post(*list, **args):
try:
response=requests.post(*list, **args)
if DEBUG: Log.note(response.content[:130])
details=CNV.JSON2object(response.content)
if details.error:
Log.error(details.error)
return details
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def get(*list, **args):
try:
response=requests.get(*list, **args)
if DEBUG: Log.note(response.content[:130])
details=CNV.JSON2object(response.content)
if details.error:
Log.error(details.error)
return details
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def put(*list, **args):
try:
response=requests.put(*list, **args)
if DEBUG: Log.note(response.content)
return response
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def delete(*list, **args):
try:
response=requests.delete(*list, **args)
if DEBUG: Log.note(response.content)
return response
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def scrub(r):
"""
REMOVE KEYS OF DEGENERATE VALUES (EMPTY STRINGS, EMPTY LISTS, AND NULLS)
TO LOWER CASE
CONVERT STRINGS OF NUMBERS TO NUMBERS
RETURNS **COPY**, DOES NOT CHANGE ORIGINAL
"""
return struct.wrap(_scrub(r))
def _scrub(r):
try:
if r == None:
return None
elif isinstance(r, basestring):
if r == "":
return None
return r
elif Math.is_number(r):
return CNV.value2number(r)
elif isinstance(r, dict):
if isinstance(r, Struct):
r = r.dict
output = {}
for k, v in r.items():
v = _scrub(v)
if v != None:
output[k.lower()] = v
if len(output) == 0:
return None
return output
elif hasattr(r, '__iter__'):
if isinstance(r, StructList):
r = r.list
output = []
for v in r:
v = _scrub(v)
if v != None:
output.append(v)
if not output:
return None
try:
return Q.sort(output)
except Exception:
return output
else:
return r
except Exception, e:
Log.warning("Can not scrub: {{json}}", {"json": r})

101
tests/util/emailer.py Normal file
Просмотреть файл

@ -0,0 +1,101 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
import sys
from .struct import nvl
class Emailer:
def __init__(self, settings):
self.settings=settings
def send_email(self,
from_address = None,
to_addrs = None,
subject='No Subject',
text_data = None,
html_data = None
):
"""Sends an email.
from_addr is an email address; to_addrs is a list of email adresses.
Addresses can be plain (e.g. "jsmith@example.com") or with real names
(e.g. "John Smith <jsmith@example.com>").
text_data and html_data are both strings. You can specify one or both.
If you specify both, the email will be sent as a MIME multipart
alternative, i.e., the recipient will see the HTML content if his
viewer supports it; otherwise he'll see the text content.
"""
settings=self.settings
from_address=nvl(from_address, settings.from_address)
if not from_address or not to_addrs:
raise Exception("Both from_addr and to_addrs must be specified")
if not text_data and not html_data:
raise Exception("Must specify either text_data or html_data")
if settings.use_ssl:
server = smtplib.SMTP_SSL(settings.host, settings.port)
else:
server = smtplib.SMTP(settings.host, settings.port)
if settings.username and settings.password:
server.login(settings.username, settings.password)
if not html_data:
msg = MIMEText(text_data)
elif not text_data:
msg = MIMEMultipart()
msg.preamble = subject
msg.attach(MIMEText(html_data, 'html'))
else:
msg = MIMEMultipart('alternative')
msg.attach(MIMEText(text_data, 'plain'))
msg.attach(MIMEText(html_data, 'html'))
msg['Subject'] = subject
msg['From'] = from_address
msg['To'] = ', '.join(to_addrs)
server.sendmail(from_address, to_addrs, msg.as_string())
server.quit()
if sys.hexversion < 0x020603f0:
# versions earlier than 2.6.3 have a bug in smtplib when sending over SSL:
# http://bugs.python.org/issue4066
# Unfortunately the stock version of Python in Snow Leopard is 2.6.1, so
# we patch it here to avoid having to install an updated Python version.
import socket
import ssl
def _get_socket_fixed(self, host, port, timeout):
if self.debuglevel > 0: print>> sys.stderr, 'connect:', (host, port)
new_socket = socket.create_connection((host, port), timeout)
new_socket = ssl.wrap_socket(new_socket, self.keyfile, self.certfile)
self.file = smtplib.SSLFakeFile(new_socket)
return new_socket
smtplib.SMTP_SSL._get_socket = _get_socket_fixed

142
tests/util/files.py Normal file
Просмотреть файл

@ -0,0 +1,142 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import codecs
from datetime import datetime
import io
import os
import shutil
from .struct import listwrap, nvl
from .cnv import CNV
class File(object):
def __init__(self, filename, buffering=2 ** 14):
if filename == None:
from .logs import Log
Log.error("File must be given a filename")
#USE UNIX STANDARD
self._filename = "/".join(filename.split(os.sep))
self.buffering = buffering
@property
def filename(self):
return self._filename.replace("/", os.sep)
@property
def abspath(self):
return os.path.abspath(self._filename)
def backup_name(self, timestamp=None):
"""
RETURN A FILENAME THAT CAN SERVE AS A BACKUP FOR THIS FILE
"""
suffix = CNV.datetime2string(nvl(timestamp, datetime.now()), "%Y%m%d_%H%M%S")
parts = self._filename.split(".")
if len(parts) == 1:
output = self._filename + "." + suffix
elif len(parts) > 1 and parts[-2][-1] == "/":
output = self._filename + "." + suffix
else:
parts.insert(-1, suffix)
output = ".".join(parts)
return output
def read(self, encoding="utf-8"):
with codecs.open(self._filename, "r", encoding=encoding) as file:
return file.read()
def read_ascii(self):
if not self.parent.exists: self.parent.create()
with open(self._filename, "r") as file:
return file.read()
def write_ascii(self, content):
if not self.parent.exists: self.parent.create()
with open(self._filename, "w") as file:
file.write(content)
def write(self, data):
if not self.parent.exists: self.parent.create()
with open(self._filename, "w") as file:
for d in listwrap(data):
file.write(d)
def __iter__(self):
#NOT SURE HOW TO MAXIMIZE FILE READ SPEED
#http://stackoverflow.com/questions/8009882/how-to-read-large-file-line-by-line-in-python
def output():
with io.open(self._filename, "rb") as f:
for line in f:
yield line.decode("utf-8")
return output()
def append(self, content):
if not self.parent.exists:
self.parent.create()
with open(self._filename, "a") as output_file:
output_file.write(content)
def add(self, content):
return self.append(content)
def extend(self, content):
if not self.parent.exists:
self.parent.create()
with open(self._filename, "a") as output_file:
for c in content:
output_file.write(c)
def delete(self):
try:
if os.path.isdir(self._filename):
shutil.rmtree(self._filename)
elif os.path.isfile(self._filename):
os.remove(self._filename)
return self
except Exception, e:
if e.strerror=="The system cannot find the path specified":
return
from .logs import Log
Log.error("Could not remove file", e)
def backup(self):
names=self._filename.split("/")[-1].split(".")
if len(names)==1:
backup=File(self._filename+".backup "+datetime.utcnow().strftime("%Y%m%d %H%i%s"))
def create(self):
try:
os.makedirs(self._filename)
except Exception, e:
from .logs import Log
Log.error("Could not make directory {{dir_name}}", {"dir_name":self._filename}, e)
@property
def parent(self):
return File("/".join(self._filename.split("/")[:-1]))
@property
def exists(self):
if self._filename in ["", "."]: return True
try:
return os.path.exists(self._filename)
except Exception, e:
return False

184
tests/util/jsons.py Normal file
Просмотреть файл

@ -0,0 +1,184 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime
import time
from decimal import Decimal
import json
import re
try:
# StringBuilder IS ABOUT 2x FASTER THAN list()
from __pypy__.builders import StringBuilder
use_pypy = True
except Exception, e:
use_pypy = False
class StringBuilder(list):
def __init__(self, length=None):
list.__init__(self)
def build(self):
return u"".join(self)
append = StringBuilder.append
class PyPyJSONEncoder(object):
"""
pypy DOES NOT OPTIMIZE GENERATOR CODE WELL
"""
def __init__(self):
object.__init__(self)
def encode(self, value, pretty=False):
if pretty:
return unicode(json.dumps(json_scrub(value), indent=4, sort_keys=True, separators=(',', ': ')))
_buffer = StringBuilder(1024)
_value2json(value, _buffer)
output = _buffer.build()
return output
class cPythonJSONEncoder(object):
def __init__(self):
object.__init__(self)
def encode(self, value, pretty=False):
if pretty:
return unicode(json.dumps(json_scrub(value), indent=4, sort_keys=True, separators=(',', ': ')))
return unicode(json.dumps(json_scrub(value)))
# OH HUM, cPython with uJSON, OR pypy WITH BUILTIN JSON?
# http://liangnuren.wordpress.com/2012/08/13/python-json-performance/
# http://morepypy.blogspot.ca/2011/10/speeding-up-json-encoding-in-pypy.html
if use_pypy:
json_encoder = PyPyJSONEncoder()
json_decoder = json._default_decoder
else:
json_encoder = cPythonJSONEncoder()
json_decoder = json._default_decoder
def _value2json(value, _buffer):
if isinstance(value, basestring):
_string2json(value, _buffer)
elif value == None:
append(_buffer, "null")
elif value is True:
append(_buffer, 'true')
elif value is False:
append(_buffer, 'false')
elif isinstance(value, (int, long, Decimal)):
append(_buffer, str(value))
elif isinstance(value, float):
append(_buffer, repr(value))
elif isinstance(value, datetime):
append(_buffer, unicode(long(time.mktime(value.timetuple())*1000)))
elif isinstance(value, dict):
_dict2json(value, _buffer)
elif hasattr(value, '__iter__'):
_list2json(value, _buffer)
else:
raise Exception(repr(value)+" is not JSON serializable")
def _list2json(value, _buffer):
append(_buffer, "[")
first = True
for v in value:
if first:
first = False
else:
append(_buffer, ", ")
_value2json(v, _buffer)
append(_buffer, "]")
def _dict2json(value, _buffer):
items = value.iteritems()
append(_buffer, "{")
first = True
for k, v in value.iteritems():
if first:
first = False
else:
append(_buffer, ", ")
_string2json(unicode(k), _buffer)
append(_buffer, ": ")
_value2json(v, _buffer)
append(_buffer, "}")
special_find = u"\\\"\t\n\r".find
replacement = [u"\\\\", u"\\\"", u"\\t", u"\\n", u"\\r"]
ESCAPE = re.compile(r'[\x00-\x1f\\"\b\f\n\r\t]')
ESCAPE_DCT = {
'\\': '\\\\',
'"': '\\"',
'\b': '\\b',
'\f': '\\f',
'\n': '\\n',
'\r': '\\r',
'\t': '\\t',
}
for i in range(0x20):
ESCAPE_DCT.setdefault(chr(i), '\\u{0:04x}'.format(i))
def _string2json(value, _buffer):
def replace(match):
return ESCAPE_DCT[match.group(0)]
append(_buffer, "\"")
append(_buffer, ESCAPE.sub(replace, value))
append(_buffer, "\"")
#REMOVE VALUES THAT CAN NOT BE JSON-IZED
def json_scrub(value):
return _scrub(value)
def _scrub(value):
if value == None:
return None
elif isinstance(value, datetime):
return long(time.mktime(value.timetuple())*1000)
elif isinstance(value, dict):
output = {}
for k, v in value.iteritems():
v = _scrub(v)
output[k] = v
return output
elif hasattr(value, '__iter__'):
output = []
for v in value:
v = _scrub(v)
output.append(v)
return output
elif isinstance(value, Decimal):
return float(value)
else:
return value

480
tests/util/logs.py Normal file
Просмотреть файл

@ -0,0 +1,480 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime, timedelta
import traceback
import logging
import sys
from .struct import listwrap, nvl
import struct, threads
from .strings import indent, expand_template
from .threads import Thread
DEBUG_LOGGING = False
ERROR="ERROR"
WARNING="WARNING"
NOTE="NOTE"
main_log = None
logging_multi = None
class Log(object):
"""
FOR STRUCTURED LOGGING AND EXCEPTION CHAINING
"""
@classmethod
def new_instance(cls, settings):
settings=struct.wrap(settings)
if settings["class"]:
if not settings["class"].startswith("logging.handlers."):
return make_log_from_settings(settings)
# elif settings["class"]=="sys.stdout":
#CAN BE SUPER SLOW
else:
return Log_usingLogger(settings)
if settings.file: return Log_usingFile(file)
if settings.filename: return Log_usingFile(settings.filename)
if settings.stream: return Log_usingStream(settings.stream)
@classmethod
def add_log(cls, log):
logging_multi.add_log(log)
@staticmethod
def debug(template=None, params=None):
"""
USE THIS FOR DEBUGGING (AND EVENTUAL REMOVAL)
"""
Log.note(nvl(template, ""), params)
@staticmethod
def println(template, params=None):
Log.note(template, params)
@staticmethod
def note(template, params=None):
template="{{log_timestamp}} - "+template
params = nvl(params, {}).copy()
#NICE TO GATHER MANY MORE ITEMS FOR LOGGING (LIKE STACK TRACES AND LINE NUMBERS)
params["log_timestamp"]=datetime.utcnow().strftime("%H:%M:%S")
main_log.write(template, params)
@staticmethod
def warning(template, params=None, cause=None):
if isinstance(params, BaseException):
cause=params
params = None
if cause and not isinstance(cause, Except):
cause=Except(WARNING, unicode(cause), trace=format_trace(traceback.extract_tb(sys.exc_info()[2]), 0))
e = Except(WARNING, template, params, cause, format_trace(traceback.extract_stack(), 1))
Log.note(unicode(e))
#raise an exception with a trace for the cause too
@staticmethod
def error(
template, #human readable template
params=None, #parameters for template
cause=None, #pausible cause
offset=0 #stack trace offset (==1 if you do not want to report self)
):
if params and isinstance(struct.listwrap(params)[0], BaseException):
cause=params
params = None
if cause == None:
cause = []
elif isinstance(cause, list):
pass
elif isinstance(cause, Except):
cause = [cause]
else:
cause = [Except(ERROR, unicode(cause), trace=format_trace(traceback.extract_tb(sys.exc_info()[2]), offset))]
trace=format_trace(traceback.extract_stack(), 1+offset)
e=Except(ERROR, template, params, cause, trace)
raise e
#RUN ME FIRST TO SETUP THE THREADED LOGGING
@staticmethod
def start(settings=None):
##http://victorlin.me/2012/08/good-logging-practice-in-python/
if not settings: return
if not settings.log: return
globals()["logging_multi"]=Log_usingMulti()
globals()["main_log"] = Log_usingThread(logging_multi)
for log in listwrap(settings.log):
Log.add_log(Log.new_instance(log))
@staticmethod
def stop():
main_log.stop()
def write(self):
Log.error("not implemented")
def format_trace(tbs, trim=0):
tbs.reverse()
list = []
for filename, lineno, name, line in tbs[trim:]:
item = 'at File "%s", line %d, in %s\n' % (filename.replace("\\", "/"), lineno, name)
list.append(item)
return "".join(list)
#def format_trace(tb, trim=0):
# list = []
# for filename, lineno, name, line in traceback.extract_tb(tb)[0:-trim]:
# item = 'File "%s", line %d, in %s\n' % (filename,lineno,name)
# if line:
# item = item + '\t%s\n' % line.strip()
# list.append(item)
# return "".join(list)
class Except(Exception):
def __init__(self, type=ERROR, template=None, params=None, cause=None, trace=None):
super(Exception, self).__init__(self)
self.type=type
self.template=template
self.params=params
self.cause=cause
self.trace=trace
@property
def message(self):
return unicode(self)
def contains(self, value):
if self.type==value:
return True
for c in self.cause:
if c.contains(value):
return True
return False
def __str__(self):
output=self.type+": "+self.template
if self.params: output=expand_template(output, self.params)
if self.trace:
output+="\n"+indent(self.trace)
if self.cause:
output+="\ncaused by\n\t"+"\nand caused by\n\t".join([c.__str__() for c in self.cause])
return output+"\n"
class BaseLog(object):
def write(self, template, params):
pass
def stop(self):
pass
class Log_usingFile(BaseLog):
def __init__(self, file):
assert file
from files import File
self.file=File(file)
if self.file.exists:
self.file.backup()
self.file.delete()
self.file_lock=threads.Lock()
def write(self, template, params):
from files import File
with self.file_lock:
File(self.filename).append(expand_template(template, params))
#WRAP PYTHON CLASSIC logger OBJECTS
class Log_usingLogger(BaseLog):
def __init__(self, settings):
self.logger=logging.Logger("unique name", level=logging.INFO)
self.logger.addHandler(make_log_from_settings(settings))
# TURNS OUT LOGGERS ARE REALLY SLOW TOO
self.queue = threads.Queue()
self.thread = Thread("log to logger", time_delta_pusher, appender=self.logger.info, queue=self.queue, interval=timedelta(seconds=0.3))
self.thread.start()
def write(self, template, params):
# http://docs.python.org/2/library/logging.html#logging.LogRecord
self.queue.add({"template": template, "params": params})
def stop(self):
try:
if DEBUG_LOGGING:
sys.stdout.write("Log_usingLogger sees stop, adding stop to queue\n")
self.queue.add(Thread.STOP) #BE PATIENT, LET REST OF MESSAGE BE SENT
self.thread.join()
if DEBUG_LOGGING:
sys.stdout.write("Log_usingLogger done\n")
except Exception, e:
pass
try:
self.queue.close()
except Exception, f:
pass
def make_log_from_settings(settings):
assert settings["class"]
# IMPORT MODULE FOR HANDLER
path=settings["class"].split(".")
class_name=path[-1]
path=".".join(path[:-1])
temp=__import__(path, globals(), locals(), [class_name], -1)
constructor=object.__getattribute__(temp, class_name)
#IF WE NEED A FILE, MAKE SURE DIRECTORY EXISTS
if settings.filename:
from files import File
f = File(settings.filename)
if not f.parent.exists:
f.parent.create()
params = settings.dict
del params['class']
return constructor(**params)
def time_delta_pusher(please_stop, appender, queue, interval):
"""
appender - THE FUNCTION THAT ACCEPTS A STRING
queue - FILLED WITH LINES TO WRITE
interval - timedelta
USE IN A THREAD TO BATCH LOGS BY TIME INTERVAL
"""
if not isinstance(interval, timedelta):
Log.error("Expecting interval to be a timedelta")
next_run = datetime.utcnow() + interval
while not please_stop:
Thread.sleep(till=next_run)
next_run = datetime.utcnow() + interval
logs = queue.pop_all()
if logs:
lines = []
for log in logs:
try:
if log == Thread.STOP:
please_stop.go()
next_run = datetime.utcnow()
else:
lines.append(expand_template(log.get("template", None), log.get("params", None)))
except Exception, e:
if DEBUG_LOGGING:
sys.stdout.write("Trouble formatting logs: "+e.message)
raise e
try:
if DEBUG_LOGGING and please_stop:
sys.stdout.write("Last call to appender with "+str(len(lines))+" lines\n")
appender(u"\n".join(lines)+u"\n")
if DEBUG_LOGGING and please_stop:
sys.stdout.write("Done call to appender with "+str(len(lines))+" lines\n")
except Exception, e:
if DEBUG_LOGGING:
sys.stdout.write("Trouble with appender: "+e.message)
raise e
class Log_usingStream(BaseLog):
#stream CAN BE AN OBJCET WITH write() METHOD, OR A STRING
#WHICH WILL eval() TO ONE
def __init__(self, stream):
assert stream
use_UTF8 = False
if isinstance(stream, basestring):
if stream.startswith("sys."):
use_UTF8 = True #sys.* ARE OLD AND CAN NOT HANDLE unicode
self.stream = eval(stream)
name = stream
else:
self.stream = stream
name = "stream"
#WRITE TO STREAMS CAN BE *REALLY* SLOW, WE WILL USE A THREAD
from threads import Queue
if use_UTF8:
def utf8_appender(value):
if isinstance(value, unicode):
value = value.encode('utf-8')
self.stream.write(value)
appender = utf8_appender
else:
appender = self.stream.write
self.queue = Queue()
self.thread = Thread("log to " + name, time_delta_pusher, appender=appender, queue=self.queue, interval=timedelta(seconds=0.3))
self.thread.start()
def write(self, template, params):
try:
self.queue.add({"template": template, "params": params})
return self
except Exception, e:
raise e #OH NO!
def stop(self):
try:
if DEBUG_LOGGING:
sys.stdout.write("Log_usingStream sees stop, adding stop to queue\n")
self.queue.add(Thread.STOP) #BE PATIENT, LET REST OF MESSAGE BE SENT
self.thread.join()
if DEBUG_LOGGING:
sys.stdout.write("Log_usingStream done\n")
except Exception, e:
if DEBUG_LOGGING:
raise e
try:
self.queue.close()
except Exception, f:
if DEBUG_LOGGING:
raise f
class Log_usingThread(BaseLog):
def __init__(self, logger):
#DELAYED LOAD FOR THREADS MODULE
from threads import Queue
self.queue=Queue()
self.logger=logger
def worker(please_stop):
while not please_stop:
Thread.sleep(1)
logs = self.queue.pop_all()
for log in logs:
if log==Thread.STOP:
if DEBUG_LOGGING:
sys.stdout.write("Log_usingThread.worker() sees stop, filling rest of queue\n")
please_stop.go()
else:
self.logger.write(**log)
self.thread=Thread("log thread", worker)
self.thread.start()
def write(self, template, params):
try:
self.queue.add({"template":template, "params":params})
return self
except Exception, e:
sys.stdout.write("IF YOU SEE THIS, IT IS LIKELY YOU FORGOT TO RUN Log.start() FIRST\n")
raise e #OH NO!
def stop(self):
try:
if DEBUG_LOGGING:
sys.stdout.write("injecting stop into queue\n")
self.queue.add(Thread.STOP) #BE PATIENT, LET REST OF MESSAGE BE SENT
self.thread.join()
if DEBUG_LOGGING:
sys.stdout.write("Log_usingThread telling logger to stop\n")
self.logger.stop()
except Exception, e:
if DEBUG_LOGGING:
raise e
try:
self.queue.close()
except Exception, f:
if DEBUG_LOGGING:
raise f
class Log_usingMulti(BaseLog):
def __init__(self):
self.many=[]
def write(self, template, params):
for m in self.many:
try:
m.write(template, params)
except Exception, e:
pass
return self
def add_log(self, logger):
self.many.append(logger)
return self
def remove_log(self, logger):
self.many.remove(logger)
return self
def clear_log(self):
self.many=[]
def stop(self):
for m in self.many:
try:
m.stop()
except Exception, e:
pass
if not main_log:
main_log = Log_usingStream("sys.stdout")

115
tests/util/maths.py Normal file
Просмотреть файл

@ -0,0 +1,115 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import math
from . import struct
from .struct import Null, nvl
from .logs import Log
from .strings import find_first
class Math(object):
@staticmethod
def bayesian_add(a, b):
if a>=1 or b>=1 or a<=0 or b<=0: Log.error("Only allowed values *between* zero and one")
return a*b/(a*b+(1-a)*(1-b))
# FOR GOODNESS SAKE - IF YOU PROVIDE A METHOD abs(), PLEASE PROVIDE IT'S COMPLEMENT
# x = abs(x)*sign(x)
# FOUND IN numpy, BUT WE USUALLY DO NOT NEED TO BRING IN A BIG LIB FOR A SIMPLE DECISION
@staticmethod
def sign(v):
if v<0: return -1
if v>0: return +1
return 0
@staticmethod
def is_number(s):
try:
float(s)
return True
except Exception:
return False
@staticmethod
def is_integer(s):
try:
if float(s)==round(float(s), 0):
return True
return False
except Exception:
return False
@staticmethod
def round_sci(value, decimal=None, digits=None):
if digits != None:
m=pow(10, math.floor(math.log10(digits)))
return round(value/m, digits)*m
return round(value, decimal)
@staticmethod
def floor(value, mod=None):
"""
x == floor(x, a) + mod(x, a) FOR ALL a
"""
mod = nvl(mod, 1)
v = int(math.floor(value))
return v - (v % mod)
#RETURN A VALUE CLOSE TO value, BUT WITH SHORTER len(unicode(value))<len(unicode(value)):
@staticmethod
def approx_str(value):
v=unicode(value)
d=v.find(".")
if d==-1: return value
i=find_first(v, ["9999", "0000"], d)
if i==-1: return value
return Math.round_sci(value, decimal=i-d-1)
@staticmethod
def min(values):
output = Null
for v in values:
if v == None:
continue
if math.isnan(v):
continue
if output == None:
output = v
continue
output = min(output, v)
return output
@staticmethod
def max(values):
output = Null
for v in values:
if v == None:
continue
if math.isnan(v):
continue
if output == None:
output = v
continue
output = max(output, v)
return output

105
tests/util/multiprocess.py Normal file
Просмотреть файл

@ -0,0 +1,105 @@
# encoding: utf-8
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from multiprocessing.queues import Queue
from .logs import Log
class worker(object):
def __init__(func, inbound, outbound, logging):
logger = Log_usingInterProcessQueue(logging)
class Log_usingInterProcessQueue(Log):
def __init__(self, outbound):
self.outbound = outbound
def write(self, template, params):
self.outbound.put({"template": template, "param": params})
class Multiprocess(object):
# THE COMPLICATION HERE IS CONNECTING THE DISPARATE LOGGING TO
# A CENTRAL POINT
def __init__(self, functions):
self.outbound = Queue()
self.inbound = Queue()
self.inbound = Queue()
#MAKE
#MAKE THREADS
self.threads = []
for t, f in enumerate(functions):
thread = worker(
"worker " + unicode(t),
f,
self.inbound,
self.outbound,
)
self.threads.append(thread)
def __enter__(self):
return self
#WAIT FOR ALL QUEUED WORK TO BE DONE BEFORE RETURNING
def __exit__(self, a, b, c):
try:
self.inbound.close() # SEND STOPS TO WAKE UP THE WORKERS WAITING ON inbound.pop()
except Exception, e:
Log.warning("Problem adding to inbound", e)
self.join()
#IF YOU SENT A stop(), OR STOP, YOU MAY WAIT FOR SHUTDOWN
def join(self):
try:
#WAIT FOR FINISH
for t in self.threads:
t.join()
except (KeyboardInterrupt, SystemExit):
Log.note("Shutdow Started, please be patient")
except Exception, e:
Log.error("Unusual shutdown!", e)
finally:
for t in self.threads:
t.keep_running = False
for t in self.threads:
t.join()
self.inbound.close()
self.outbound.close()
#RETURN A GENERATOR THAT HAS len(parameters) RESULTS (ANY ORDER)
def execute(self, parameters):
#FILL QUEUE WITH WORK
self.inbound.extend(parameters)
num = len(parameters)
def output():
for i in xrange(num):
result = self.outbound.pop()
yield result
return output()
#EXTERNAL COMMAND THAT RETURNS IMMEDIATELY
def stop(self):
self.inbound.close() #SEND STOPS TO WAKE UP THE WORKERS WAITING ON inbound.pop()
for t in self.threads:
t.keep_running = False

202
tests/util/multiset.py Normal file
Просмотреть файл

@ -0,0 +1,202 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
def Multiset(list=None, key_field=None, count_field=None, allow_negative=False):
if allow_negative:
return _NegMultiset(list, key_field, count_field)
else:
return _Multiset(list, key_field, count_field)
class _Multiset(object):
def __init__(self, list=None, key_field=None, count_field=None):
if not key_field and not count_field:
self.dic = dict()
if list:
for i in list:
self.add(i)
return
else:
self.dic={i[key_field]:i[count_field] for i in list}
def __iter__(self):
for k, m in self.dic.items():
for i in range(m):
yield k
def items(self):
return self.dic.items()
def keys(self):
return self.dic.keys()
def add(self, value):
if value in self.dic:
self.dic[value]+=1
else:
self.dic[value]=1
return self
def extend(self, values):
for v in values:
self.add(v)
def remove(self, value):
if value not in self.dic:
from .logs import Log
Log.error("{{value}} is not in multiset", {"value":value})
self._remove(value)
def copy(self):
output = _Multiset()
output.dic=self.dic.copy()
return output
def _remove(self, value):
count=self.dic.get(value, None)
if count == None:
return
count-=1
if count==0:
del(self.dic[value])
else:
self.dic[value]=count
def __sub__(self, other):
output=self.copy()
for o in other:
output._remove(o)
return output
def __add__(self, other):
output=self.copy()
for o in other:
output.add(o)
return output
def __set__(self, other):
return set(self.dic.keys())
def __len__(self):
return sum(self.dic.values())
def __nonzero__(self):
if self.dic:
return True
return False
def count(self, value):
if value in self.dic:
return self.dic[value]
else:
return 0
class _NegMultiset(object):
def __init__(self, list=None, key_field=None, count_field=None):
if not key_field and not count_field:
self.dic = dict()
if list:
for i in list:
self.add(i)
return
else:
self.dic={i[key_field]:i[count_field] for i in list}
# def __iter__(self):
# for k, m in self.dic.items():
# for i in range(m):
# yield k
def items(self):
return self.dic.items()
def keys(self):
return self.dic.keys()
def add(self, value, amount=None):
count = self.dic.get(value, None)
if amount == None:
amount = 1
elif amount == 0:
return self
if not count:
self.dic[value] = amount
elif count == -amount:
del (self.dic[value])
else:
self.dic[value] = count + amount
return self
def extend(self, values):
for v in values:
self.add(v)
def remove(self, value):
return self.add(value, -1)
def copy(self):
output = _NegMultiset()
output.dic=self.dic.copy()
return output
def __add__(self, other):
output=self.copy()
if isinstance(other, _NegMultiset):
for k, c in other.dic.items():
output.add(k, c)
else:
for o in other:
output.add(o)
return output
def __sub__(self, other):
if not other:
return self
output=self.copy()
for o in other:
output.remove(o)
return output
def __set__(self, other):
return set(self.dic.keys())
def __len__(self):
return sum(self.dic.values())
def __nonzero__(self):
if self.dic:
return True
return False
def count(self, value):
if value in self.dic:
return self.dic[value]
else:
return 0

160
tests/util/multithread.py Normal file
Просмотреть файл

@ -0,0 +1,160 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import threading
from .struct import nvl
from .logs import Log
from .threads import Queue, Thread
DEBUG = True
class worker_thread(threading.Thread):
#in_queue MUST CONTAIN HASH OF PARAMETERS FOR load()
def __init__(self, name, in_queue, out_queue, function):
threading.Thread.__init__(self)
self.name=name
self.in_queue=in_queue
self.out_queue=out_queue
self.function=function
self.keep_running=True
self.num_runs=0
self.start()
#REQUIRED TO DETECT KEYBOARD, AND OTHER, INTERRUPTS
def join(self, timeout=None):
while self.isAlive():
Log.note("Waiting on thread {{thread}}", {"thread":self.name})
threading.Thread.join(self, nvl(timeout, 0.5))
def run(self):
got_stop=False
while self.keep_running:
request = self.in_queue.pop()
if request == Thread.STOP:
got_stop=True
if self.in_queue.queue:
Log.warning("programmer error")
break
if not self.keep_running:
break
try:
if DEBUG and hasattr(self.function, "func_name"):
Log.note("run {{function}}", {"function": self.function.func_name})
result = self.function(**request)
if self.out_queue != None:
self.out_queue.add({"response": result})
except Exception, e:
Log.warning("Can not execute with params={{params}}", {"params": request}, e)
if self.out_queue != None:
self.out_queue.add({"exception": e})
finally:
self.num_runs += 1
self.keep_running = False
if self.num_runs==0:
Log.warning("{{name}} thread did no work", {"name":self.name})
if DEBUG and self.num_runs!=1:
Log.note("{{name}} thread did {{num}} units of work", {
"name":self.name,
"num":self.num_runs
})
if got_stop and self.in_queue.queue:
Log.warning("multithread programmer error")
if DEBUG:
Log.note("{{thread}} DONE", {"thread":self.name})
def stop(self):
self.keep_running=False
#PASS A SET OF FUNCTIONS TO BE EXECUTED (ONE PER THREAD)
#PASS AN (ITERATOR/LIST) OF PARAMETERS TO BE ISSUED TO NEXT AVAILABLE THREAD
class Multithread(object):
def __init__(self, functions):
self.outbound=Queue()
self.inbound=Queue()
#MAKE THREADS
self.threads=[]
for t, f in enumerate(functions):
thread=worker_thread("worker "+unicode(t), self.inbound, self.outbound, f)
self.threads.append(thread)
def __enter__(self):
return self
#WAIT FOR ALL QUEUED WORK TO BE DONE BEFORE RETURNING
def __exit__(self, type, value, traceback):
try:
if isinstance(value, Exception):
self.inbound.close()
self.inbound.add(Thread.STOP)
self.join()
except Exception, e:
Log.warning("Problem sending stops", e)
#IF YOU SENT A stop(), OR Thread.STOP, YOU MAY WAIT FOR SHUTDOWN
def join(self):
try:
#WAIT FOR FINISH
for t in self.threads:
t.join()
except (KeyboardInterrupt, SystemExit):
Log.note("Shutdow Started, please be patient")
except Exception, e:
Log.error("Unusual shutdown!", e)
finally:
for t in self.threads:
t.keep_running=False
self.inbound.close()
self.outbound.close()
for t in self.threads:
t.join()
#RETURN A GENERATOR THAT HAS len(parameters) RESULTS (ANY ORDER)
def execute(self, request):
#FILL QUEUE WITH WORK
self.inbound.extend(request)
num=len(request)
def output():
for i in xrange(num):
result=self.outbound.pop()
if "exception" in result:
raise result["exception"]
else:
yield result["response"]
return output()
#EXTERNAL COMMAND THAT RETURNS IMMEDIATELY
def stop(self):
self.inbound.close() #SEND STOPS TO WAKE UP THE WORKERS WAITING ON inbound.pop()
for t in self.threads:
t.keep_running=False

634
tests/util/queries/Q.py Normal file
Просмотреть файл

@ -0,0 +1,634 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import sys
import __builtin__
from ..logs import Log
from ..struct import nvl, listwrap
from .. import struct
from ..strings import indent, expand_template
from ..struct import StructList, Struct, Null
from ..multiset import Multiset
# A COLLECTION OF DATABASE OPERATORS (RELATIONAL ALGEBRA OPERATORS)
def run(query):
query = struct.wrap(query)
if isinstance(query["from"], list):
_from = query["from"]
else:
_from = run(query["from"])
if query.edges != None:
Log.error("not implemented yet")
if query.filter != None:
Log.error("not implemented yet")
for param in listwrap(query.window):
window(_from, param)
if query.where != None:
w = query.where
_from = filter(_from, w)
if query.sort != None:
_from = sort(_from, query.sort)
if query.select != None:
_from = select(_from, query.select)
return _from
def groupby(data, keys=None, size=None, min_size=None, max_size=None):
#return list of (keys, values) pairs where
#group by the set of set of keys
#values IS LIST OF ALL data that has those keys
if size != None or min_size != None or max_size != None:
if size != None: max_size = size
return groupby_min_max_size(data, min_size=min_size, max_size=max_size)
try:
def keys2string(x):
#REACH INTO dict TO GET PROPERTY VALUE
return "|".join([unicode(x[k]) for k in keys])
def get_keys(d):
return struct.wrap({k: d[k] for k in keys})
agg = {}
for d in data:
key = keys2string(d)
if key in agg:
pair = agg[key]
else:
pair = (get_keys(d), StructList())
agg[key] = pair
pair[1].append(d)
return agg.values()
except Exception, e:
Log.error("Problem grouping", e)
def index(data, keys=None):
#return dict that uses keys to index data
keys = struct.unwrap(listwrap(keys))
output = dict()
for d in data:
o = output
for k in keys[:-1]:
v = d[k]
o = o.get(v, dict())
v = d[keys[-1]]
o = o.get(v, list())
o.append(d)
return output
def unique_index(data, keys=None):
"""
RETURN dict THAT USES KEYS TO INDEX DATA
ONLY ONE VALUE ALLOWED PER UNIQUE KEY
"""
o = Index(listwrap(keys))
for d in data:
try:
o.add(d)
except Exception, e:
Log.error("index {{index}} is not unique {{key}} maps to both {{value1}} and {{value2}}", {
"index": keys,
"key": select([d], keys)[0],
"value1": o[d],
"value2": d
}, e)
return o
def map(data, relation):
"""
EXPECTING A dict THAT MAPS VALUES TO lists
THE LISTS ARE EXPECTED TO POINT TO MEMBERS OF A SET
A set() IS RETURNED
"""
if data == None:
return Null
if isinstance(relation, Struct):
Log.error("Does not accept a Struct")
if isinstance(relation, dict):
try:
#relation[d] is expected to be a list
# return set(cod for d in data for cod in relation[d])
output=set()
for d in data:
for cod in relation.get(d, []):
output.add(cod)
return output
except Exception, e:
Log.error("Expecting a dict with lists in codomain", e)
else:
try:
#relation[d] is expected to be a list
# return set(cod for d in data for cod in relation[d])
output=set()
for d in data:
cod=relation(d)
if cod == None:
continue
output.add(cod)
return output
except Exception, e:
Log.error("Expecting a dict with lists in codomain", e)
return Null
def select(data, field_name):
#return list with values from field_name
if isinstance(data, Cube): Log.error("Do not know how to deal with cubes yet")
if isinstance(field_name, basestring):
return [d[field_name] for d in data]
return [dict([(k, v) for k, v in x.items() if k in field_name]) for x in data]
def get_columns(data):
output = {}
for d in data:
for k, v in d.items():
if k not in output:
c = {"name": k, "domain": Null}
output[k] = c
# IT WOULD BE NICE TO ADD DOMAIN ANALYSIS HERE
return [{"name": n} for n in output]
def stack(data, name=None, value_column=None, columns=None):
"""
STACK ALL CUBE DATA TO A SINGLE COLUMN, WITH ONE COLUMN PER DIMENSION
>>> s
a b
one 1 2
two 3 4
>>> stack(s)
one a 1
one b 2
two a 3
two b 4
STACK LIST OF HASHES, OR 'MERGE' SEPARATE CUBES
data - expected to be a list of dicts
name - give a name to the new column
value_column - Name given to the new, single value column
columns - explicitly list the value columns (USE SELECT INSTEAD)
"""
assert value_column != None
if isinstance(data, Cube): Log.error("Do not know how to deal with cubes yet")
if columns == None:
columns = data.get_columns()
data = data.select(columns)
name = nvl(name, data.name)
output = []
parts = set()
for r in data:
for c in columns:
v = r[c]
parts.add(c)
output.append({"name": c, "value": v})
edge = struct.wrap({"domain": {"type": "set", "partitions": parts}})
#UNSTACKING CUBES WILL BE SIMPLER BECAUSE THE keys ARE IMPLIED (edges-column)
def unstack(data, keys=None, column=None, value=None):
assert keys != None
assert column != None
assert value != None
if isinstance(data, Cube): Log.error("Do not know how to deal with cubes yet")
output = []
for key, values in groupby(data, keys):
for v in values:
key[v[column]] = v[value]
output.append(key)
return StructList(output)
def normalize_sort(fieldnames):
"""
CONVERT SORT PARAMETERS TO A NORMAL FORM SO EASIER TO USE
"""
if fieldnames == None:
return []
formal = []
for f in listwrap(fieldnames):
if isinstance(f, basestring):
f = {"field": f, "sort": 1}
formal.append(f)
return formal
def sort(data, fieldnames=None):
"""
PASS A FIELD NAME, OR LIST OF FIELD NAMES, OR LIST OF STRUCTS WITH {"field":field_name, "sort":direction}
"""
try:
if data == None:
return Null
if fieldnames == None:
return sorted(data)
if not isinstance(fieldnames, list):
#SPECIAL CASE, ONLY ONE FIELD TO SORT BY
if isinstance(fieldnames, basestring):
def comparer(left, right):
return cmp(nvl(left, Struct())[fieldnames], nvl(right, Struct())[fieldnames])
return sorted(data, cmp=comparer)
else:
#EXPECTING {"field":f, "sort":i} FORMAT
def comparer(left, right):
return fieldnames["sort"] * cmp(nvl(left, Struct())[fieldnames["field"]],
nvl(right, Struct())[fieldnames["field"]])
return sorted(data, cmp=comparer)
formal = normalize_sort(fieldnames)
def comparer(left, right):
left = nvl(left, Struct())
right = nvl(right, Struct())
for f in formal:
try:
result = f["sort"] * cmp(left[f["field"]], right[f["field"]])
if result != 0: return result
except Exception, e:
Log.error("problem with compare", e)
return 0
if isinstance(data, list):
output = struct.wrap(sorted(data, cmp=comparer))
elif hasattr(data, "__iter__"):
output = struct.wrap(sorted(list(data), cmp=comparer))
else:
Log.error("Do not know how to handle")
return output
except Exception, e:
Log.error("Problem sorting\n{{data}}", {"data": data}, e)
def add(*values):
total = Null
for v in values:
if total == None:
total = v
else:
if v != None:
total += v
return total
def filter(data, where):
"""
where - a function that accepts (record, rownum, rows) and return s boolean
"""
where = wrap_function(where)
return [d for i, d in enumerate(data) if where(d, i, data)]
def wrap_function(func):
"""
RETURN A THREE-PARAMETER WINDOW FUNCTION TO MATCH
"""
numarg = func.__code__.co_argcount
if numarg == 0:
def temp(row, rownum, rows):
return func()
return temp
elif numarg == 1:
def temp(row, rownum, rows):
return func(row)
return temp
elif numarg == 2:
def temp(row, rownum, rows):
return func(row, rownum)
return temp
elif numarg == 3:
return func
def window(data, param):
"""
MAYBE WE CAN DO THIS WITH NUMPY??
data - list of records
"""
name = param.name # column to assign window function result
edges = param.edges # columns to gourp by
sort = param.sort # columns to sort by
value = wrap_function(param.value) # function that takes a record and returns a value (for aggregation)
aggregate = param.aggregate # WindowFunction to apply
_range = param.range # of form {"min":-10, "max":0} to specify the size and relative position of window
if aggregate == None and sort == None and edges == None:
#SIMPLE CALCULATED VALUE
for rownum, r in enumerate(data):
r[name] = value(r, rownum, data)
return
for rownum, r in enumerate(data):
r["__temp__"] = value(r, rownum, data)
for keys, values in groupby(data, edges):
if not values:
continue # CAN DO NOTHING WITH THIS ZERO-SAMPLE
sequence = struct.wrap(sort(values, sort))
head = nvl(_range.max, _range.stop)
tail = nvl(_range.min, _range.start)
#PRELOAD total
total = aggregate()
for i in range(head):
total += sequence[i].__temp__
#WINDOW FUNCTION APPLICATION
for i, r in enumerate(sequence):
r[name] = total.end()
total.add(sequence[i + head].__temp__)
total.sub(sequence[i + tail].__temp__)
for r in data:
r["__temp__"] = Null #CLEANUP
def groupby_size(data, size):
if hasattr(data, "next"):
iterator = data
elif hasattr(data, "__iter__"):
iterator = data.__iter__()
else:
Log.error("do not know how to handle this type")
done = []
def more():
output = []
for i in range(size):
try:
output.append(iterator.next())
except StopIteration:
done.append(True)
break
return output
#THIS IS LAZY
i = 0
while True:
output = more()
yield (i, output)
if len(done) > 0: break
i += 1
def groupby_Multiset(data, min_size, max_size):
# GROUP multiset BASED ON POPULATION OF EACH KEY, TRYING TO STAY IN min/max LIMITS
if min_size == None: min_size = 0
total = 0
i = 0
g = list()
for k, c in data.items():
if total < min_size or total + c < max_size:
total += c
g.append(k)
elif total < max_size:
yield (i, g)
i += 1
total = c
g = [k]
if total >= max_size:
Log.error("({{min}}, {{max}}) range is too strict given step of {{increment}}", {
"min": min_size, "max": max_size, "increment": c
})
if g:
yield (i, g)
def groupby_min_max_size(data, min_size=0, max_size=None, ):
if max_size == None:
max_size = sys.maxint
if hasattr(data, "__iter__"):
def _iter():
g=0
out=[]
for i, d in enumerate(data):
out.append(d)
if (i+1)%max_size==0:
yield g, out
g+=1
out=[]
if out:
yield g, out
return _iter()
elif not isinstance(data, Multiset):
return groupby_size(data, max_size)
else:
return groupby_Multiset(data, min_size, max_size)
class Cube():
def __init__(self, data=None, edges=None, name=None):
if isinstance(data, Cube): Log.error("do not know how to handle cubes yet")
columns = get_columns(data)
if edges == None:
self.edges = [{"name": "index", "domain": {"type": "numeric", "min": 0, "max": len(data), "interval": 1}}]
self.data = data
self.select = columns
return
self.name = name
self.edges = edges
self.select = Null
def get_columns(self):
return self.columns
class Domain():
def __init__(self):
pass
def part2key(self, part):
pass
def part2label(self, part):
pass
def part2value(self, part):
pass
# SIMPLE TUPLE-OF-STRINGS LOOKUP TO LIST
class Index(object):
def __init__(self, keys):
self._data = {}
self._keys = keys
self.count = 0
#THIS ONLY DEPENDS ON THE len(keys), SO WE COULD SHARED lookup
#BETWEEN ALL n-key INDEXES. FOR NOW, JUST MAKE lookup()
code = "def lookup(d0):\n"
for i, k in enumerate(self._keys):
code = code + indent(expand_template(
"for k{{next}}, d{{next}} in d{{curr}}.items():\n", {
"next": i + 1,
"curr": i
}), prefix=" ", indent=i + 1)
i = len(self._keys)
code = code + indent(expand_template(
"yield d{{curr}}", {"curr": i}), prefix=" ", indent=i + 1)
exec code
self.lookup = lookup
def __getitem__(self, key):
try:
if not isinstance(key, dict):
#WE WILL BE FORGIVING IF THE KEY IS NOT IN A LIST
if len(self._keys) > 1:
Log.error("Must be given an array of keys")
key = {self._keys[0]: key}
d = self._data
for k in self._keys:
v = key[k]
if v == None:
Log.error("can not handle when {{key}} == None", {"key": k})
if v not in d:
return Null
d = d[v]
if len(key) != len(self._keys):
#NOT A COMPLETE INDEXING, SO RETURN THE PARTIAL INDEX
output = Index(self._keys[-len(key):])
output._data = d
return output
except Exception, e:
Log.error("something went wrong", e)
def __setitem__(self, key, value):
Log.error("Not implemented")
def add(self, val):
if not isinstance(val, dict): val = {(self._keys[0], val)}
d = self._data
for k in self._keys[0:-1]:
v = val[k]
if v == None:
Log.error("can not handle when {{key}} == None", {"key": k})
if v not in d:
e = {}
d[v] = e
d = d[v]
v = val[self._keys[-1]]
if v in d:
Log.error("key already filled")
d[v] = val
self.count += 1
def __contains__(self, key):
return self[key] != None
def __iter__(self):
return self.lookup(self._data)
def __sub__(self, other):
output = Index(self._keys)
for v in self:
if v not in other: output.add(v)
return output
def __and__(self, other):
output = Index(self._keys)
for v in self:
if v in other: output.add(v)
return output
def __or__(self, other):
output = Index(self._keys)
for v in self: output.add(v)
for v in other: output.add(v)
return output
def __len__(self):
return self.count
def subtract(self, other):
return self.__sub__(other)
def intersect(self, other):
return self.__and__(other)
def range(_min, _max=None, size=1):
"""
RETURN (min, max) PAIRS OF GIVEN SIZE, WHICH COVER THE _min, _max RANGE
THE LAST PAIR BE SMALLER
"""
if _max == None:
_max = _min
_min = 0
output = ((x, min(x + size, _max)) for x in __builtin__.range(_min, _max, size))
return output

Просмотреть файл

@ -0,0 +1,4 @@
# encoding: utf-8
#
__author__ = 'klahnakoski'

Просмотреть файл

@ -0,0 +1,162 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from ..logs import Log
from ..maths import Math
from ..multiset import Multiset
from ..stats import Z_moment, stats2z_moment, z_moment2stats
class AggregationFunction(object):
def __init__(self):
"""
RETURN A ZERO-STATE AGGREGATE
"""
Log.error("not implemented yet")
def add(self, value):
"""
ADD value TO AGGREGATE
"""
Log.error("not implemented yet")
def merge(self, agg):
"""
ADD TWO AGGREGATES TOGETHER
"""
Log.error("not implemented yet")
def end(self):
"""
RETURN AGGREGATE
"""
class WindowFunction(AggregationFunction):
def __init__(self):
"""
RETURN A ZERO-STATE AGGREGATE
"""
Log.error("not implemented yet")
def sub(self, value):
"""
REMOVE value FROM AGGREGATE
"""
Log.error("not implemented yet")
class Stats(WindowFunction):
def __init__(self):
object.__init__(self)
self.total=Z_moment(0,0,0)
def add(self, value):
if value == None:
return
self.total+=stats2z_moment(value)
def sub(self, value):
if value == None:
return
self.total-=stats2z_moment(value)
def merge(self, agg):
self.total+=agg.total
def end(self):
return z_moment2stats(self.total)
class Min(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = Multiset()
def add(self, value):
if value == None:
return
self.total.add(value)
def sub(self, value):
if value == None:
return
self.total.remove(value)
def end(self):
return Math.min(self.total)
class Max(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = Multiset()
def add(self, value):
if value == None:
return
self.total.add(value)
def sub(self, value):
if value == None:
return
self.total.remove(value)
def end(self):
return Math.max(self.total)
class Count(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = 0
def add(self, value):
if value == None:
return
self.total += 1
def sub(self, value):
if value == None:
return
self.total -= 1
def end(self):
return self.total
class Sum(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = 0
def add(self, value):
if value == None:
return
self.total += value
def sub(self, value):
if value == None:
return
self.total -= value
def end(self):
return self.total

31
tests/util/randoms.py Normal file
Просмотреть файл

@ -0,0 +1,31 @@
# encoding: utf-8
#
import random
import string
SIMPLE_ALPHABET=string.ascii_letters + string.digits
SEED=random.Random()
class Random(object):
@staticmethod
def string(length, alphabet=SIMPLE_ALPHABET):
result = ''
for i in range(0, length):
result += SEED.choice(alphabet)
return result
@staticmethod
def hex(length):
return Random.string(length, string.digits + 'ABCDEF')
@staticmethod
def int(*args):
return random.randrange(*args)
@staticmethod
def sample(data, count):
num=len(data)
return [data[Random.int(num)] for i in range(count)]

75
tests/util/sql.py Normal file
Просмотреть файл

@ -0,0 +1,75 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from dzAlerts.util import struct
def find_holes(db, table_name, column_name, filter, _range):
"""
FIND HOLES IN A DENSE COLUMN OF INTEGERS
RETURNS A LIST OF {"min"min, "max":max} OBJECTS
"""
_range = struct.wrap(_range)
params = {
"min": _range.min,
"max": _range.max - 1,
"column_name": db.quote_column(column_name),
"table_name": db.quote_column(table_name),
"filter": db.esfilter2sqlwhere(filter)
}
min_max=db.query("""
SELECT
min({{column_name}}) `min`,
max({{column_name}})+1 `max`
FROM
{{table_name}} a
WHERE
a.{{column_name}} BETWEEN {{min}} AND {{max}} AND
{{filter}}
""", params)[0]
db.execute("SET @last={{min}}-1", {"min": _range.min})
ranges = db.query("""
SELECT
prev_rev+1 `min`,
curr_rev `max`
FROM (
SELECT
a.{{column_name}}-@last diff,
@last prev_rev,
@last:=a.{{column_name}} curr_rev
FROM
{{table_name}} a
WHERE
a.{{column_name}} BETWEEN {{min}} AND {{max}} AND
{{filter}}
ORDER BY
a.{{column_name}}
) a
WHERE
diff>1
""", params)
if ranges:
ranges.append({"min": min_max.max, "max": _range.max})
else:
if min_max.min:
ranges.append({"min": _range.min, "max": min_max.min})
ranges.append({"min": min_max.max, "max": _range.max})
else:
ranges.append(_range)
return ranges

77
tests/util/startup.py Normal file
Просмотреть файл

@ -0,0 +1,77 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import argparse
import struct
from .struct import listwrap
from .cnv import CNV
from .logs import Log
from .files import File
#PARAMETERS MATCH argparse.ArgumentParser.add_argument()
#http://docs.python.org/dev/library/argparse.html#the-add-argument-method
#name or flags - Either a name or a list of option strings, e.g. foo or -f, --foo.
#action - The basic type of action to be taken when this argument is encountered at the command line.
#nargs - The number of command-line arguments that should be consumed.
#const - A constant value required by some action and nargs selections.
#default - The value produced if the argument is absent from the command line.
#type - The type to which the command-line argument should be converted.
#choices - A container of the allowable values for the argument.
#required - Whether or not the command-line option may be omitted (optionals only).
#help - A brief description of what the argument does.
#metavar - A name for the argument in usage messages.
#dest - The name of the attribute to be added to the object returned by parse_args().
def _argparse(defs):
parser = argparse.ArgumentParser()
for d in listwrap(defs):
args = d.copy()
name = args.name
args.name = None
parser.add_argument(*listwrap(name).list, **args.dict)
namespace=parser.parse_args()
output={k: getattr(namespace, k) for k in vars(namespace)}
return struct.wrap(output)
def read_settings(filename=None, defs=None):
# READ SETTINGS
if filename:
settings_file = File(filename)
if not settings_file.exists:
Log.error("Can not file settings file {{filename}}", {
"filename": settings_file.abspath
})
json = settings_file.read()
settings = CNV.JSON2object(json, flexible=True)
if defs:
settings.args = _argparse(defs)
return settings
else:
defs=listwrap(defs)
defs.append({
"name": ["--settings", "--settings-file", "--settings_file"],
"help": "path to JSON file with settings",
"type": str,
"dest": "filename",
"default": "./settings.json",
"required": False
})
args = _argparse(defs)
settings_file = File(args.filename)
if not settings_file.exists:
Log.error("Can not file settings file {{filename}}", {
"filename": settings_file.abspath
})
json = settings_file.read()
settings = CNV.JSON2object(json, flexible=True)
settings.args = args
return settings

170
tests/util/stats.py Normal file
Просмотреть файл

@ -0,0 +1,170 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from math import sqrt
from .cnv import CNV
from .struct import nvl
from .logs import Log
DEBUG=True
EPSILON=0.000001
def stats2z_moment(stats):
# MODIFIED FROM http://statsmodels.sourceforge.net/devel/_modules/statsmodels/stats/moment_helpers.html
# ADDED count
# FIXED ERROR IN COEFFICIENTS
mc0, mc1, mc2, skew, kurt = (stats.count, stats.mean, stats.variance, stats.skew, stats.kurtosis)
mz0 = mc0
mz1 = mc1 * mc0
mz2 = (mc2 + mc1*mc1)*mc0
mc3 = skew*(mc2**1.5) # 3rd central moment
mz3 = (mc3 + 3*mc1*mc2 - mc1**3)*mc0 # 3rd non-central moment
mc4 = (kurt+3.0)*(mc2**2.0) # 4th central moment
mz4 = (mc4 + 4*mc1*mc3 + 6*mc1*mc1*mc2 + mc1**4) * mc0
m=Z_moment(stats.count, mz1, mz2, mz3, mz4)
if DEBUG:
v = z_moment2stats(m, unbiased=False)
if not closeEnough(v.count, stats.count): Log.error("convertion error")
if not closeEnough(v.mean, stats.mean): Log.error("convertion error")
if not closeEnough(v.variance, stats.variance):
Log.error("convertion error")
return m
def closeEnough(a, b):
if abs(a-b)<=EPSILON*(abs(a)+abs(b)+1): return True
return False
def z_moment2stats(z_moment, unbiased=True):
free=0
if unbiased: free=1
N=z_moment.S[0]
if N==0: return Stats()
return Stats(
count=N,
mean=z_moment.S[1] / N if N > 0 else float('nan'),
variance=(z_moment.S[2] - (z_moment.S[1] ** 2) / N) / (N - free) if N - free > 0 else float('nan'),
unbiased=unbiased
)
class Stats(object):
def __init__(self, **args):
if "count" not in args:
self.count=0
self.mean=0
self.variance=0
self.skew=0
self.kurtosis=0
elif "mean" not in args:
self.count=args["count"]
self.mean=0
self.variance=0
self.skew=0
self.kurtosis=0
elif "variance" not in args and "std" not in args:
self.count=args["count"]
self.mean=args["mean"]
self.variance=0
self.skew=0
self.kurtosis=0
elif "skew" not in args:
self.count=args["count"]
self.mean=args["mean"]
self.variance=args["variance"] if "variance" in args else args["std"]**2
self.skew=0
self.kurtosis=0
elif "kurtosis" not in args:
self.count=args["count"]
self.mean=args["mean"]
self.variance=args["variance"] if "variance" in args else args["std"]**2
self.skew=args["skew"]
self.kurtosis=0
else:
self.count=args["count"]
self.mean=args["mean"]
self.variance=args["variance"] if "variance" in args else args["std"]**2
self.skew=args["skew"]
self.kurtosis=args["kurtosis"]
self.unbiased=\
args["unbiased"] if "unbiased" in args else \
not args["biased"] if "biased" in args else \
False
@property
def std(self):
return sqrt(self.variance)
class Z_moment(object):
"""
ZERO-CENTERED MOMENTS
"""
def __init__(self, *args):
self.S=tuple(args)
def __add__(self, other):
return Z_moment(*map(add, self.S, other.S))
def __sub__(self, other):
return Z_moment(*map(sub, self.S, other.S))
@property
def tuple(self):
#RETURN AS ORDERED TUPLE
return self.S
@property
def dict(self):
#RETURN HASH OF SUMS
return {"s"+unicode(i): m for i, m in enumerate(self.S)}
@staticmethod
def new_instance(values=None):
if values == None: return Z_moment()
values=[float(v) for v in values if v != None]
return Z_moment(
len(values),
sum([n for n in values]),
sum([pow(n, 2) for n in values]),
sum([pow(n, 3) for n in values]),
sum([pow(n, 4) for n in values])
)
def add(a,b):
return nvl(a, 0)+nvl(b,0)
def sub(a,b):
return nvl(a, 0)-nvl(b,0)
def z_moment2dict(z):
#RETURN HASH OF SUMS
return {"s" + unicode(i): m for i, m in enumerate(z.S)}
setattr(CNV, "z_moment2dict", staticmethod(z_moment2dict))

129
tests/util/strings.py Normal file
Просмотреть файл

@ -0,0 +1,129 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import re
from .jsons import json_encoder
import struct
from .struct import Struct
def indent(value, prefix=u"\t", indent=None):
if indent != None:
prefix=prefix*indent
try:
content=value.rstrip()
suffix=value[len(content):]
lines=content.splitlines()
return prefix+(u"\n"+prefix).join(lines)+suffix
except Exception, e:
raise Exception(u"Problem with indent of value ("+e.message+u")\n"+unicode(value))
def outdent(value):
try:
num=100
lines=value.splitlines()
for l in lines:
trim=len(l.lstrip())
if trim>0: num=min(num, len(l)-len(l.lstrip()))
return u"\n".join([l[num:] for l in lines])
except Exception, e:
from .logs import Log
Log.error("can not outdent value", e)
def between(value, prefix, suffix):
s = value.find(prefix)
if s==-1: return None
s+=len(prefix)
e=value.find(suffix, s)
if e==-1:
return None
s=value.rfind(prefix, 0, e)+len(prefix) #WE KNOW THIS EXISTS, BUT THERE MAY BE A RIGHT-MORE ONE
return value[s:e]
def right(value, len):
if len<=0: return u""
return value[-len:]
def find_first(value, find_arr, start=0):
i=len(value)
for f in find_arr:
temp=value.find(f, start)
if temp==-1: continue
i=min(i, temp)
if i==len(value): return -1
return i
pattern=re.compile(r"\{\{([\w_\.]+(\|[\w_]+)*)\}\}")
def expand_template(template, values):
values=struct.wrap(values)
def replacer(found):
seq=found.group(1).split("|")
var=seq[0]
try:
val=values[var]
val=toString(val)
for filter in seq[1:]:
val=eval(filter+"(val)")
return val
except Exception, e:
try:
if e.message.find(u"is not JSON serializable"):
#WORK HARDER
val=toString(val)
return val
except Exception:
raise Exception(u"Can not expand "+"|".join(seq)+u" in template:\n"+indent(template), e)
return pattern.sub(replacer, template)
def toString(val):
if isinstance(val, Struct):
return json_encoder.encode(val.dict, pretty=True)
elif isinstance(val, dict) or isinstance(val, list) or isinstance(val, set):
val=json_encoder.encode(val, pretty=True)
return val
return unicode(val)
def edit_distance(s1, s2):
"""
FROM http://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Levenshtein_distance#Python
LICENCE http://creativecommons.org/licenses/by-sa/3.0/
"""
if len(s1) < len(s2):
return edit_distance(s2, s1)
# len(s1) >= len(s2)
if len(s2) == 0:
return 1.0
previous_row = xrange(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1 # j+1 instead of j since previous_row and current_row are one character longer
deletions = current_row[j] + 1 # than s2
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return float(previous_row[-1])/len(s1)

430
tests/util/struct.py Normal file
Просмотреть файл

@ -0,0 +1,430 @@
# encoding: utf-8
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
SPECIAL = ["keys", "values", "items", "iteritems", "dict", "copy"]
class Struct(dict):
"""
Struct is an anonymous class with some properties good for manipulating JSON
0) a.b==a["b"]
1) the IDE does tab completion, so my spelling mistakes get found at "compile time"
2) it deals with missing keys gracefully, so I can put it into set operations (database operations) without choking
2b) missing keys is important when dealing with JSON, which is often almost anything
3) also, which I hardly use, is storing JSON paths in a variable, so : a["b.c"]==a.b.c
MORE ON MISSING VALUES: http://www.numpy.org/NA-overview.html
IT ONLY CONSIDERS THE LEGITIMATE-FIELD-WITH-MISSING-VALUE (Statistical Null)
AND DOES NOT LOOK AT FIELD-DOES-NOT-EXIST-IN-THIS-CONTEXT (Database Null)
This is a common pattern in many frameworks (I am still working on this list)
jinja2.environment.Environment.getattr()
argparse.Environment() - code performs setattr(e, name, value) on instances of Environment
"""
def __init__(self, **map):
"""
THIS WILL MAKE A COPY, WHICH IS UNLIKELY TO BE USEFUL
USE struct.wrap() INSTEAD
"""
dict.__init__(self)
object.__setattr__(self, "__dict__", map) #map IS A COPY OF THE PARAMETERS
def __bool__(self):
return True
def __nonzero__(self):
return True
def __str__(self):
return dict.__str__(object.__getattribute__(self, "__dict__"))
def __getitem__(self, key):
if not isinstance(key, str):
key = key.encode("utf-8")
d = object.__getattribute__(self, "__dict__")
if key.find(".") >= 0:
key = key.replace("\.", "\a")
seq = [k.replace("\a", ".") for k in key.split(".")]
for n in seq:
d = getdefault(d, n)
return wrap(d)
return wrap(getdefault(d, key))
def __setattr__(self, key, value):
Struct.__setitem__(self, key, value)
def __setitem__(self, key, value):
if not isinstance(key, str):
key = key.encode("utf-8")
try:
d = object.__getattribute__(self, "__dict__")
value = unwrap(value)
if key.find(".") == -1:
if value is None:
d.pop(key, None)
else:
d[key] = value
return self
key = key.replace("\.", "\a")
seq = [k.replace("\a", ".") for k in key.split(".")]
for k in seq[:-1]:
d = getdefault(d, k)
if value == None:
d.pop(seq[-1], None)
else:
d[seq[-1]] = value
return self
except Exception, e:
raise e
def __getattribute__(self, key):
if not isinstance(key, str):
key = key.encode("utf-8")
d = object.__getattribute__(self, "__dict__")
if key not in SPECIAL:
return wrap(getdefault(d, key))
#SOME dict FUNCTIONS
if key == "items":
def temp():
_is = dict.__getattribute__(d, "items")
return [(k, wrap(v)) for k, v in _is()]
return temp
if key == "iteritems":
#LOW LEVEL ITERATION
return d.iteritems
if key == "keys":
def temp():
k = dict.__getattribute__(d, "keys")
return set(k())
return temp
if key == "values":
def temp():
vs = dict.__getattribute__(d, "values")
return [wrap(v) for v in vs()]
return temp
if key == "dict":
return d
if key == "copy":
o = wrap({k: v for k, v in d.items()})
def output():
return o
return output
def __delitem__(self, key):
if not isinstance(key, str):
key = key.encode("utf-8")
d = object.__getattribute__(self, "__dict__")
if key.find(".") == -1:
d.pop(key, None)
key = key.replace("\.", "\a")
seq = [k.replace("\a", ".") for k in key.split(".")]
for k in seq[:-1]:
d = d[k]
d.pop(seq[-1], None)
def keys(self):
d = object.__getattribute__(self, "__dict__")
return d.keys()
# KEEP TRACK OF WHAT ATTRIBUTES ARE REQUESTED, MAYBE SOME (BUILTIN) ARE STILL USEFUL
requested = set()
def setdefault(obj, key, value):
"""
DO NOT USE __dict__.setdefault(obj, key, value), IT DOES NOT CHECK FOR obj[key] == None
"""
v = obj.get(key, None)
if v == None:
obj[key] = value
return value
return v
def getdefault(obj, key):
o = obj.get(key, None)
if o == None:
return NullStruct(obj, key)
return unwrap(o)
def _assign(null, key, value, force=True):
"""
value IS ONLY ASSIGNED IF self.obj[self.path][key] DOES NOT EXIST
"""
d = object.__getattribute__(null, "__dict__")
o = d["obj"]
if isinstance(o, NullStruct):
o = _assign(o, d["path"], {}, False)
else:
o = setdefault(o, d["path"], {})
if force:
o[key] = value
else:
value = setdefault(o, key, value)
return value
class NullStruct(object):
"""
Structural Null provides closure under the dot (.) operator
Null[x] == Null
Null.x == Null
"""
def __init__(self, obj=None, path=None):
d = object.__getattribute__(self, "__dict__")
d["obj"] = obj
d["path"] = path
def __bool__(self):
return False
def __nonzero__(self):
return False
def __gt__(self, other):
return False
def __ge__(self, other):
return False
def __le__(self, other):
return False
def __lt__(self, other):
return False
def __eq__(self, other):
return other is None or isinstance(other, NullStruct)
def __ne__(self, other):
return other is not None and not isinstance(other, NullStruct)
def __getitem__(self, key):
return NullStruct(self, key)
def __len__(self):
return 0
def __iter__(self):
return ZeroList.__iter__()
def __getattribute__(self, key):
if key not in SPECIAL:
return NullStruct(self, key)
#SOME dict FUNCTIONS
if key == "items":
def temp():
return ZeroList
return temp
if key == "iteritems":
#LOW LEVEL ITERATION
return self.__iter__()
if key == "keys":
def temp():
return ZeroList
return temp
if key == "values":
def temp():
return ZeroList
return temp
if key == "dict":
return Null
if key == "copy":
#THE INTENT IS USUALLY PREPARE FOR UPDATES
def output():
return Struct()
return output
def __setattr__(self, key, value):
NullStruct.__setitem__(self, key, value)
def __setitem__(self, key, value):
try:
value = unwrap(value)
if key.find(".") == -1:
_assign(self, key, value)
return self
key = key.replace("\.", "\a")
seq = [k.replace("\a", ".") for k in key.split(".")]
d = _assign(self, seq[0], {}, False)
for k in seq[1:-1]:
o = {}
d[k] = o
d = o
d[seq[-1]] = value
return self
except Exception, e:
raise e
def keys(self):
return set()
def pop(self, key, default=None):
return None
def __str__(self):
return "None"
Null = NullStruct()
ZeroList = []
class StructList(list):
def __init__(self, vals=None):
""" USE THE vals, NOT A COPY """
list.__init__(self)
if vals == None:
self.list = []
elif isinstance(vals, StructList):
self.list = vals.list
else:
self.list = vals
def __getitem__(self, index):
if index < 0 or len(self.list) <= index:
return Null
return wrap(self.list[index])
def __setitem__(self, i, y):
self.list[i] = unwrap(y)
def __iter__(self):
return (wrap(v) for v in self.list)
def append(self, val):
self.list.append(unwrap(val))
return self
def __str__(self):
return self.list.__str__()
def __len__(self):
return self.list.__len__()
def __getslice__(self, i, j):
return wrap(self.list[i:j])
def remove(self, x):
self.list.remove(x)
return self
def extend(self, values):
for v in values:
self.list.append(unwrap(v))
return self
def pop(self):
return self.list.pop()
def wrap(v):
if v is None:
return Null
if isinstance(v, (Struct, NullStruct, StructList)):
return v
if isinstance(v, dict):
m = Struct()
object.__setattr__(m, "__dict__", v) #INJECT m.__dict__=v SO THERE IS NO COPY
return m
if isinstance(v, list):
return StructList(v)
return v
def unwrap(v):
if isinstance(v, Struct):
return object.__getattribute__(v, "__dict__")
if isinstance(v, StructList):
return v.list
if v == None:
return None
return v
def inverse(d):
"""
reverse the k:v pairs
"""
output = {}
for k, v in unwrap(d).iteritems():
output[v] = output.get(v, [])
output[v].append(k)
return output
def nvl(*args):
#pick the first none-null value
for a in args:
if a != None:
return a
return Null
def listwrap(value):
"""
OFTEN IT IS NICE TO ALLOW FUNCTION PARAMETERS TO BE ASSIGNED A VALUE,
OR A list-OF-VALUES, OR NULL. CHECKING FOR THIS IS TEDIOUS AND WE WANT TO CAST
FROM THOSE THREE CASES TO THE SINGLE CASE OF A LIST
Null -> []
value -> [value]
[...] -> [...] (unchanged list)
#BEFORE
if a is not None:
if not isinstance(a, list):
a=[a]
for x in a:
#do something
#AFTER
for x in listwrap(a):
#do something
"""
if value == None:
return []
elif isinstance(value, list):
return wrap(value)
else:
return wrap([value])

382
tests/util/threads.py Normal file
Просмотреть файл

@ -0,0 +1,382 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime, timedelta
import threading
import thread
import time
from .struct import nvl
DEBUG = True
class Lock(object):
"""
SIMPLE LOCK (ACTUALLY, A PYTHON threadind.Condition() WITH notify() BEFORE EVERY RELEASE)
"""
def __init__(self, name=""):
self.monitor=threading.Condition()
self.name=name
def __enter__(self):
self.monitor.acquire()
return self
def __exit__(self, a, b, c):
self.monitor.notify()
self.monitor.release()
def wait(self, timeout=None, till=None):
if till:
timeout=(datetime.utcnow()-till).total_seconds()
if timeout<0:
return
self.monitor.wait(timeout=timeout)
def notify_all(self):
self.monitor.notify_all()
class Queue(object):
"""
SIMPLE MESSAGE QUEUE, multiprocessing.Queue REQUIRES SERIALIZATION, WHICH IS HARD TO USE JUST BETWEEN THREADS
"""
def __init__(self, max=None):
"""
max - LIMIT THE NUMBER IN THE QUEUE, IF TOO MANY add() AND extend() WILL BLOCK
"""
self.max = nvl(max, 2**30)
self.keep_running = True
self.lock = Lock("lock for queue")
self.queue = []
def __iter__(self):
while self.keep_running:
try:
value=self.pop()
if value!=Thread.STOP:
yield value
except Exception, e:
from .logs import Log
Log.warning("Tell me about what happened here", e)
def add(self, value):
with self.lock:
if self.keep_running:
self.queue.append(value)
while self.keep_running and len(self.queue) > self.max:
self.lock.wait()
return self
def extend(self, values):
with self.lock:
if self.keep_running:
self.queue.extend(values)
while self.keep_running and len(self.queue) > self.max:
self.lock.wait()
def __len__(self):
with self.lock:
return len(self.queue)
def pop(self):
with self.lock:
while self.keep_running:
if self.queue:
value=self.queue.pop(0)
if value==Thread.STOP: #SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION
self.keep_running=False
return value
self.lock.wait()
return Thread.STOP
def pop_all(self):
"""
NON-BLOCKING POP ALL IN QUEUE, IF ANY
"""
with self.lock:
if not self.keep_running:
return [Thread.STOP]
if not self.queue:
return []
for v in self.queue:
if v == Thread.STOP: #SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION
self.keep_running = False
output = list(self.queue)
del self.queue[:] #CLEAR
return output
def close(self):
with self.lock:
self.keep_running=False
class AllThread(object):
"""
RUN ALL ADDED FUNCTIONS IN PARALLEL, BE SURE TO HAVE JOINED BEFORE EXIT
"""
def __init__(self):
self.threads=[]
def __enter__(self):
return self
#WAIT FOR ALL QUEUED WORK TO BE DONE BEFORE RETURNING
def __exit__(self, type, value, traceback):
self.join()
def join(self):
exceptions=[]
try:
for t in self.threads:
response=t.join()
if "exception" in response:
exceptions.append(response["exception"])
except Exception, e:
from .logs import Log
Log.warning("Problem joining", e)
if exceptions:
from .logs import Log
Log.error("Problem in child threads", exceptions)
def add(self, target, *args, **kwargs):
"""
target IS THE FUNCTION TO EXECUTE IN THE THREAD
"""
t=Thread.run(target.__name__, target, *args, **kwargs)
self.threads.append(t)
class Thread(object):
"""
join() ENHANCED TO ALLOW CAPTURE OF CTRL-C, AND RETURN POSSIBLE THREAD EXCEPTIONS
run() ENHANCED TO CAPTURE EXCEPTIONS
"""
num_threads=0
STOP="stop"
TIMEOUT="TIMEOUT"
def __init__(self, name, target, *args, **kwargs):
self.name = name
self.target = target
self.response = None
self.synch_lock=Lock()
self.args = args
#ENSURE THERE IS A SHARED please_stop SIGNAL
self.kwargs = kwargs.copy()
self.kwargs["please_stop"]=self.kwargs.get("please_stop", Signal())
self.please_stop = self.kwargs["please_stop"]
self.stopped = Signal()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if isinstance(type, BaseException):
self.please_stop.go()
# TODO: AFTER A WHILE START KILLING THREAD
self.join()
self.args = None
self.kwargs = None
def start(self):
try:
self.thread=thread.start_new_thread(Thread._run, (self, ))
except Exception, e:
from .logs import Log
Log.error("Can not start thread", e)
def stop(self):
self.please_stop.go()
def _run(self):
try:
if self.target is not None:
response=self.target(*self.args, **self.kwargs)
with self.synch_lock:
self.response={"response":response}
except Exception, e:
with self.synch_lock:
self.response={"exception":e}
from .logs import Log
Log.error("Problem in thread", e)
finally:
self.stopped.go()
del self.target, self.args, self.kwargs
def is_alive(self):
return not self.stopped
def join(self, timeout=None, till=None):
"""
RETURN THE RESULT OF THE THREAD EXECUTION (INCLUDING EXCEPTION)
"""
if not till and timeout:
till=datetime.utcnow()+timedelta(seconds=timeout)
if till is None:
while True:
with self.synch_lock:
for i in range(10):
if self.stopped:
return self.response
self.synch_lock.wait(0.5)
from .logs import Log
if DEBUG:
Log.note("Waiting on thread {{thread}}", {"thread":self.name})
else:
self.stopped.wait_for_go(till=till)
if self.stopped:
return self.response
else:
from logs import Except
raise Except(type=Thread.TIMEOUT)
@staticmethod
def run(name, target, *args, **kwargs):
#ENSURE target HAS please_stop ARGUMENT
if "please_stop" not in target.__code__.co_varnames:
from logs import Log
Log.error("function must have please_stop argument for signalling emergency shutdown")
Thread.num_threads += 1
output = Thread(name, target, *args, **kwargs)
output.start()
return output
@staticmethod
def sleep(seconds=None, till=None):
if seconds is not None:
time.sleep(seconds)
if till is not None:
duration = (till - datetime.utcnow()).total_seconds()
if duration > 0:
time.sleep(duration)
class Signal(object):
"""
SINGLE-USE THREAD SAFE SIGNAL
"""
def __init__(self):
self.lock = Lock()
self._go = False
self.job_queue=[]
def __bool__(self):
with self.lock:
return self._go
def __nonzero__(self):
with self.lock:
return self._go
def wait_for_go(self, timeout=None, till=None):
with self.lock:
while not self._go:
self.lock.wait(timeout=timeout, till=till)
return True
def go(self):
with self.lock:
if self._go:
return
self._go = True
jobs=self.job_queue
self.job_queue=[]
self.lock.notify_all()
for j in jobs:
j()
def is_go(self):
with self.lock:
return self._go
def on_go(self, target):
"""
RUN target WHEN SIGNALED
"""
with self.lock:
if self._go:
target()
else:
self.job_queue.append(target)
class ThreadedQueue(Queue):
"""
TODO: Check that this queue is not dropping items at shutdown
DISPATCH TO ANOTHER (SLOWER) queue IN BATCHES OF GIVEN size
"""
def __init__(self, queue, size, max=None):
if max == None:
#REASONABLE DEFAULT
max = size*2
Queue.__init__(self, max=max)
def size_pusher(please_stop):
please_stop.on_go(lambda: self.add(Thread.STOP))
#queue IS A MULTI-THREADED QUEUE, SO THIS WILL BLOCK UNTIL THE size ARE READY
from .queries import Q
for i, g in Q.groupby(self, size=size):
try:
queue.extend(g)
if please_stop:
from logs import Log
Log.warning("ThreadedQueue stopped early, with {{num}} items left in queue", {
"num":len(self)
})
return
except Exception, e:
from logs import Log
Log.warning("Can not push data to given queue", e)
self.thread = Thread.run("threaded queue", size_pusher)
def __enter__(self):
return self
def __exit__(self, a, b, c):
self.add(Thread.STOP)
if isinstance(b, BaseException):
self.thread.please_stop.go()
self.thread.join()

48
tests/util/timer.py Normal file
Просмотреть файл

@ -0,0 +1,48 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import time
from .strings import expand_template
from .logs import Log
class Timer:
"""
USAGE:
with Timer("doing hard time"):
something_that_takes_long()
OUTPUT:
doing hard time took 45.468 sec
"""
def __init__(self, description, param=None):
self.description=expand_template(description, param) #WE WOULD LIKE TO KEEP THIS TEMPLATE, AND PASS IT TO THE LOGGER ON __exit__(), WE FAKE IT FOR NOW
def __enter__(self):
Log.note("Timer start: {{description}}", {
"description":self.description
})
self.start = time.clock()
return self
def __exit__(self, type, value, traceback):
self.end = time.clock()
self.interval = self.end - self.start
Log.note("Timer end : {{description}} (took {{duration}} sec)", {
"description":self.description,
"duration":round(self.interval, 3)
})
@property
def duration(self):
return self.interval