This commit is contained in:
Kyle Lahnakoski 2013-11-06 18:43:57 -05:00
Родитель 6ee0d72490
Коммит 9446909801
24 изменённых файлов: 306 добавлений и 4340 удалений

Просмотреть файл

@ -10,6 +10,7 @@
import argparse
import codecs
import logging
from logging.handlers import RotatingFileHandler
import os
import random
from flask import Flask, json
@ -28,6 +29,7 @@ def stream(raw_response):
return
yield block
def random_sample(data, count):
num = len(data)
return [data[random.randrange(num)] for i in range(count)]
@ -42,7 +44,6 @@ def listwrap(value):
return [value]
@app.route('/', defaults={'path': ''}, methods=['GET', 'POST'])
@app.route('/<path:path>', methods=['GET', 'POST'])
def catch_all(path):
@ -56,7 +57,7 @@ def catch_all(path):
## SEND REQUEST
headers = {'content-type': 'application/json'}
response = requests.get(
es.host + ":" + str(es.port) + "/" + path,
es["host"] + ":" + str(es["port"]) + "/" + path,
data=data,
stream=True, #FOR STREAMING
headers=headers,
@ -66,12 +67,12 @@ def catch_all(path):
# ALLOW CROSS DOMAIN (BECAUSE ES IS USUALLY NOT ON SAME SERVER AS PAGE)
outbound_header = dict(response.headers)
outbound_header["access-control-allow-origin"] = "*"
logger.note("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
logger.debug("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
path=path,
request_headers= dict(response.headers),
request_content_length= len(data),
response_headers= outbound_header,
response_content_length= outbound_header["content-length"]
# request_headers=dict(response.headers),
request_content_length=len(data),
# response_headers=outbound_header,
response_content_length=outbound_header["content-length"]
))
## FORWARD RESPONSE
@ -82,7 +83,8 @@ def catch_all(path):
headers=outbound_header
)
except Exception, e:
logger.warning("processing problem", e)
logger.warning("processing problem")
logger.exception(e.message)
abort(400)
@ -105,7 +107,7 @@ def filter(path_string, query):
logger.error('request must be of form: {index_name} "/" {type_name} "/_search" ')
## EXPECTING THE QUERY TO AT LEAST HAVE .query ATTRIBUTE
if path[-1] == "_search" and json._default_decoder.decode(query).get("query", None) is None:
if path[-1] == "_search" and json.loads(query).get("query", None) is None:
logger.error("_search must have query")
## NO CONTENT ALLOWED WHEN ASKING FOR MAPPING
@ -148,6 +150,8 @@ class WSGICopyBody(object):
app.wsgi_app = WSGICopyBody(app.wsgi_app)
logger = None
if __name__ == '__main__':
try:
@ -159,35 +163,34 @@ if __name__ == '__main__':
"default": "./settings.json",
"required": False
})
namespace=parser.parse_args()
namespace = parser.parse_args()
args = {k: getattr(namespace, k) for k in vars(namespace)}
if not os.path.exists(args["filename"]):
print("Can not file settings file {filename}".format(filename=args["filename"]))
else:
with codecs.open(args["filename"], "r", encoding="utf-8") as file:
json = file.read()
settings = json._default_decoder.decode(json)
json_data = file.read()
settings = json.loads(json_data)
settings["args"] = args
logger = logging.getLogger('esFrontLine')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
for d in listwrap(settings["debug"]):
if d["filename"]:
fh = logging.FileHandler('spam.log')
for d in listwrap(settings["debug"]["log"]):
if d.get("filename", None):
fh = RotatingFileHandler(**d)
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
logger.addHandler(fh)
elif d["stream"]=="sys.stdout":
elif d.get("stream", None) == "sys.stdout":
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
HeaderRewriterFix(app, remove_headers=['Date', 'Server'])
app.run(**settings["flask"])
app = HeaderRewriterFix(app, remove_headers=['Date', 'Server'])
except Exception, e:
if logger:
logger.exception("Startup problem")
print(e.message)

Просмотреть файл

Просмотреть файл

@ -1,216 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import StringIO
import datetime
import re
import time
from .multiset import Multiset
from .jsons import json_decoder, json_encoder
from .logs import Log
from . import struct
from .strings import expand_template
from .struct import StructList
class CNV:
"""
DUE TO MY POOR MEMORY, THIS IS A LIST OF ALL CONVERSION ROUTINES
"""
@staticmethod
def object2JSON(obj, pretty=False):
try:
return json_encoder.encode(obj, pretty=pretty)
except Exception, e:
Log.error("Can not encode into JSON: {{value}}", {"value":repr(obj)}, e)
@staticmethod
def JSON2object(json_string, params=None, flexible=False):
try:
#REMOVE """COMMENTS""", #COMMENTS, //COMMENTS, AND \n \r
if flexible: json_string=re.sub(r"\"\"\".*?\"\"\"|\s+//.*\n|#.*?\n|\n|\r", r" ", json_string) #DERIVED FROM https://github.com/jeads/datasource/blob/master/datasource/bases/BaseHub.py#L58
if params:
params=dict([(k,CNV.value2quote(v)) for k,v in params.items()])
json_string=expand_template(json_string, params)
obj=json_decoder.decode(json_string)
if isinstance(obj, list): return StructList(obj)
return struct.wrap(obj)
except Exception, e:
Log.error("Can not decode JSON:\n\t"+json_string, e)
@staticmethod
def string2datetime(value, format):
## http://docs.python.org/2/library/datetime.html#strftime-and-strptime-behavior
try:
return datetime.datetime.strptime(value, format)
except Exception, e:
Log.error("Can not format {{value}} with {{format}}", {"value":value, "format":format}, e)
@staticmethod
def datetime2string(value, format):
try:
return value.strftime(format)
except Exception, e:
Log.error("Can not format {{value}} with {{format}}", {"value":value, "format":format}, e)
@staticmethod
def datetime2unix(d):
if d == None:
return None
return long(time.mktime(d.timetuple()))
@staticmethod
def datetime2milli(d):
try:
epoch = datetime.datetime(1970, 1, 1)
diff = d-epoch
return (diff.days * 86400000) + \
(diff.seconds * 1000) + \
(diff.microseconds / 1000) # 86400000=24*3600*1000
except Exception, e:
Log.error("Can not convert {{value}}", {"value": d})
@staticmethod
def unix2datetime(u):
return datetime.datetime.utcfromtimestamp(u)
@staticmethod
def milli2datetime(u):
return datetime.datetime.utcfromtimestamp(u/1000)
@staticmethod
def dict2Multiset(dic):
if dic == None:
return None
output = Multiset()
output.dic = struct.unwrap(dic).copy()
return output
@staticmethod
def multiset2dict(value):
"""
CONVERT MULTISET TO dict THAT MAPS KEYS TO MAPS KEYS TO KEY-COUNT
"""
if value == None:
return None
return dict(value.dic)
@staticmethod
def table2list(
column_names, #tuple of columns names
rows #list of tuples
):
return StructList([dict(zip(column_names, r)) for r in rows])
#PROPER NULL HANDLING
@staticmethod
def value2string(value):
if value == None:
return None
return unicode(value)
#RETURN PRETTY PYTHON CODE FOR THE SAME
@staticmethod
def value2quote(value):
if isinstance(value, basestring):
return CNV.string2quote(value)
else:
return repr(value)
@staticmethod
def string2quote(value):
# return repr(value)
return "\""+value.replace("\\", "\\\\").replace("\"", "\\\"")+"\""
#RETURN PYTHON CODE FOR THE SAME
@staticmethod
def value2code(value):
return repr(value)
@staticmethod
def DataFrame2string(df, columns=None):
output = StringIO.StringIO()
try:
df.to_csv(output, sep="\t", header=True, cols=columns, engine='python')
return output.getvalue()
finally:
output.close()
@staticmethod
def ascii2char(ascii):
return chr(ascii)
@staticmethod
def char2ascii(char):
return ord(char)
@staticmethod
def int2hex(value, size):
return (("0"*size)+hex(value)[2:])[-size:]
@staticmethod
def value2intlist(value):
if value == None:
return None
elif hasattr(value, '__iter__'):
output=[int(d) for d in value if d!=""]
return output
elif value.strip()=="":
return None
else:
return [int(value)]
@staticmethod
def value2int(value):
if value == None:
return None
else:
return int(value)
@staticmethod
def value2number(v):
try:
if isinstance(v, float) and round(v,0)!=v:
return v
#IF LOOKS LIKE AN INT, RETURN AN INT
return int(v)
except Exception:
try:
return float(v)
except Exception, e:
Log.error("Not a number ({{value}})", {"value":v}, e)
@staticmethod
def utf82unicode(value):
return unicode(value.decode('utf8'))
@staticmethod
def latin12unicode(value):
return unicode(value.decode('iso-8859-1'))

Просмотреть файл

@ -1,611 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime
import subprocess
from pymysql import connect
from . import struct
from .maths import Math
from .strings import expand_template
from .struct import nvl
from .cnv import CNV
from .logs import Log, Except
from .queries import Q
from .strings import indent
from .strings import outdent
from .files import File
DEBUG = False
MAX_BATCH_SIZE=100
all_db=[]
class DB(object):
"""
"""
def __init__(self, settings, schema=None):
"""OVERRIDE THE settings.schema WITH THE schema PARAMETER"""
all_db.append(self)
if isinstance(settings, DB):
settings=settings.settings
self.settings=settings.copy()
self.settings.schema=nvl(schema, self.settings.schema)
self.debug=nvl(self.settings.debug, DEBUG)
self._open()
def _open(self):
""" DO NOT USE THIS UNLESS YOU close() FIRST"""
try:
self.db=connect(
host=self.settings.host,
port=self.settings.port,
user=nvl(self.settings.username, self.settings.user),
passwd=nvl(self.settings.password, self.settings.passwd),
db=nvl(self.settings.schema, self.settings.db),
charset=u"utf8",
use_unicode=True
)
except Exception, e:
Log.error(u"Failure to connect", e)
self.cursor = None
self.partial_rollback=False
self.transaction_level=0
self.backlog=[] #accumulate the write commands so they are sent at once
def __enter__(self):
self.begin()
return self
def __exit__(self, type, value, traceback):
if isinstance(value, BaseException):
try:
if self.cursor: self.cursor.close()
self.cursor = None
self.rollback()
except Exception, e:
Log.warning(u"can not rollback()", e)
finally:
self.close()
return
try:
self.commit()
except Exception, e:
Log.warning(u"can not commit()", e)
finally:
self.close()
def begin(self):
if self.transaction_level==0: self.cursor=self.db.cursor()
self.transaction_level+=1
self.execute("SET TIME_ZONE='+00:00'")
def close(self):
if self.transaction_level>0:
Log.error(u"expecting commit() or rollback() before close")
self.cursor = None #NOT NEEDED
try:
self.db.close()
except Exception, e:
if e.message.find("Already closed")>=0:
return
Log.warning(u"can not close()", e)
finally:
all_db.remove(self)
def commit(self):
try:
self._execute_backlog()
except Exception, e:
try:
self.rollback()
except Exception:
pass
Log.error(u"Error while processing backlog", e)
if self.transaction_level==0:
Log.error(u"No transaction has begun")
elif self.transaction_level==1:
if self.partial_rollback:
try:
self.rollback()
except Exception:
pass
Log.error(u"Commit after nested rollback is not allowed")
else:
if self.cursor: self.cursor.close()
self.cursor = None
self.db.commit()
self.transaction_level-=1
def flush(self):
try:
self.commit()
except Exception, e:
Log.error(u"Can not flush", e)
try:
self.begin()
except Exception, e:
Log.error(u"Can not flush", e)
def rollback(self):
self.backlog=[] #YAY! FREE!
if self.transaction_level==0:
Log.error(u"No transaction has begun")
elif self.transaction_level==1:
self.transaction_level-=1
if self.cursor!=None:
self.cursor.close()
self.cursor = None
self.db.rollback()
else:
self.transaction_level-=1
self.partial_rollback=True
Log.warning(u"Can not perform partial rollback!")
def call(self, proc_name, params):
self._execute_backlog()
try:
self.cursor.callproc(proc_name, params)
self.cursor.close()
self.cursor=self.db.cursor()
except Exception, e:
Log.error(u"Problem calling procedure "+proc_name, e)
def query(self, sql, param=None):
self._execute_backlog()
try:
old_cursor=self.cursor
if not old_cursor: #ALLOW NON-TRANSACTIONAL READS
self.cursor=self.db.cursor()
if param: sql=expand_template(sql, self.quote_param(param))
sql=outdent(sql)
if self.debug: Log.note(u"Execute SQL:\n{{sql}}", {u"sql":indent(sql)})
self.cursor.execute(sql)
columns = [utf8_to_unicode(d[0]) for d in nvl(self.cursor.description, [])]
fixed=[[utf8_to_unicode(c) for c in row] for row in self.cursor]
result=CNV.table2list(columns, fixed)
if not old_cursor: #CLEANUP AFTER NON-TRANSACTIONAL READS
self.cursor.close()
self.cursor = None
return result
except Exception, e:
if e.message.find("InterfaceError") >= 0:
Log.error(u"Did you close the db connection?", e)
Log.error(u"Problem executing SQL:\n"+indent(sql.strip()), e, offset=1)
# EXECUTE GIVEN METHOD FOR ALL ROWS RETURNED
def execute(self, sql, param=None, execute=None):
assert execute
num=0
self._execute_backlog()
try:
old_cursor=self.cursor
if not old_cursor: #ALLOW NON-TRANSACTIONAL READS
self.cursor=self.db.cursor()
if param: sql=expand_template(sql,self.quote_param(param))
sql=outdent(sql)
if self.debug: Log.note(u"Execute SQL:\n{{sql}}", {u"sql":indent(sql)})
self.cursor.execute(sql)
columns = tuple( [utf8_to_unicode(d[0]) for d in self.cursor.description] )
for r in self.cursor:
num+=1
execute(struct.wrap(dict(zip(columns, [utf8_to_unicode(c) for c in r]))))
if not old_cursor: #CLEANUP AFTER NON-TRANSACTIONAL READS
self.cursor.close()
self.cursor = None
except Exception, e:
Log.error(u"Problem executing SQL:\n"+indent(sql.strip()), e, offset=1)
return num
def execute(self, sql, param=None):
if self.transaction_level==0: Log.error(u"Expecting transaction to be started before issuing queries")
if param: sql=expand_template(sql, self.quote_param(param))
sql=outdent(sql)
self.backlog.append(sql)
if len(self.backlog)>=MAX_BATCH_SIZE:
self._execute_backlog()
def execute_file(self, filename, param=None):
content=File(filename).read()
self.execute(content, param)
@staticmethod
def execute_sql(settings, sql, param=None):
"""EXECUTE MANY LINES OF SQL (FROM SQLDUMP FILE, MAYBE?"""
if param:
with DB(settings) as temp:
sql=expand_template(sql, temp.quote_param(param))
# MWe have no way to execute an entire SQL file in bulk, so we
# have to shell out to the commandline client.
args = [
u"mysql",
u"-h{0}".format(settings.host),
u"-u{0}".format(settings.username),
u"-p{0}".format(settings.password),
u"{0}".format(settings.schema)
]
proc = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=-1
)
(output, _) = proc.communicate(sql)
if proc.returncode:
if len(sql)>10000:
sql=u"<"+unicode(len(sql))+u" bytes of sql>"
Log.error(u"Unable to execute sql: return code {{return_code}}, {{output}}:\n {{sql}}\n", {
u"sql":indent(sql),
u"return_code":proc.returncode,
u"output":output
})
@staticmethod
def execute_file(settings, filename, param=None):
# MySQLdb provides no way to execute an entire SQL file in bulk, so we
# have to shell out to the commandline client.
sql=File(filename).read()
DB.execute_sql(settings, sql, param)
def _execute_backlog(self):
if not self.backlog: return
(backlog, self.backlog)=(self.backlog, [])
if self.db.__module__.startswith(u"pymysql"):
# BUG IN PYMYSQL: CAN NOT HANDLE MULTIPLE STATEMENTS
# https://github.com/PyMySQL/PyMySQL/issues/157
for b in backlog:
try:
self.cursor.execute(b)
except Exception, e:
Log.error(u"Can not execute sql:\n{{sql}}", {u"sql":b}, e)
self.cursor.close()
self.cursor = self.db.cursor()
else:
for i, g in Q.groupby(backlog, size=MAX_BATCH_SIZE):
sql=u";\n".join(g)
try:
if self.debug: Log.note(u"Execute block of SQL:\n"+indent(sql))
self.cursor.execute(sql)
self.cursor.close()
self.cursor = self.db.cursor()
except Exception, e:
Log.error(u"Problem executing SQL:\n{{sql}}", {u"sql":indent(sql.strip())}, e, offset=1)
## Insert dictionary of values into table
def insert(self, table_name, record):
keys = record.keys()
try:
command = u"INSERT INTO " + self.quote_column(table_name) + u"(" + \
u",".join([self.quote_column(k) for k in keys]) + \
u") VALUES (" + \
u",".join([self.quote_value(record[k]) for k in keys]) + \
u")"
self.execute(command)
except Exception, e:
Log.error(u"problem with record: {{record}}", {u"record": record}, e)
# candidate_key IS LIST OF COLUMNS THAT CAN BE USED AS UID (USUALLY PRIMARY KEY)
# ONLY INSERT IF THE candidate_key DOES NOT EXIST YET
def insert_new(self, table_name, candidate_key, new_record):
candidate_key=struct.listwrap(candidate_key)
condition=u" AND\n".join([self.quote_column(k)+u"="+self.quote_value(new_record[k]) if new_record[k] != None else self.quote_column(k)+u" IS Null" for k in candidate_key])
command=u"INSERT INTO "+self.quote_column(table_name)+u" ("+\
u",".join([self.quote_column(k) for k in new_record.keys()])+\
u")\n"+\
u"SELECT a.* FROM (SELECT "+u",".join([self.quote_value(v)+u" "+self.quote_column(k) for k,v in new_record.items()])+u" FROM DUAL) a\n"+\
u"LEFT JOIN "+\
u"(SELECT 'dummy' exist FROM "+self.quote_column(table_name)+u" WHERE "+condition+u" LIMIT 1) b ON 1=1 WHERE exist IS Null"
self.execute(command, {})
# ONLY INSERT IF THE candidate_key DOES NOT EXIST YET
def insert_newlist(self, table_name, candidate_key, new_records):
for r in new_records:
self.insert_new(table_name, candidate_key, r)
def insert_list(self, table_name, records):
keys = set()
for r in records:
keys |= set(r.keys())
keys = Q.sort(keys)
try:
command = \
u"INSERT INTO " + self.quote_column(table_name) + u"(" + \
u",".join([self.quote_column(k) for k in keys]) + \
u") VALUES " + ",".join([
"(" + u",".join([self.quote_value(r[k]) for k in keys]) + u")"
for r in records
])
self.execute(command)
except Exception, e:
Log.error(u"problem with record: {{record}}", {u"record": records}, e)
def update(self, table_name, where_slice, new_values):
"""
where_slice IS A Struct WHICH WILL BE USED TO MATCH ALL IN table
"""
new_values = self.quote_param(new_values)
where_clause = u" AND\n".join([
self.quote_column(k) + u"=" + self.quote_value(v) if v != None else self.quote_column(k) + " IS NULL"
for k, v in where_slice.items()]
)
command=u"UPDATE "+self.quote_column(table_name)+u"\n"+\
u"SET "+\
u",\n".join([self.quote_column(k)+u"="+v for k,v in new_values.items()])+u"\n"+\
u"WHERE "+\
where_clause
self.execute(command, {})
def quote_param(self, param):
return {k:self.quote_value(v) for k, v in param.items()}
def quote_value(self, value):
"""
convert values to mysql code for the same
mostly delegate directly to the mysql lib, but some exceptions exist
"""
try:
if value == None:
return "NULL"
elif isinstance(value, SQL):
if not value.param:
#value.template CAN BE MORE THAN A TEMPLATE STRING
return self.quote_sql(value.template)
param = {k: self.quote_sql(v) for k, v in value.param.items()}
return expand_template(value.template, param)
elif isinstance(value, basestring):
return self.db.literal(value)
elif isinstance(value, datetime):
return u"str_to_date('"+value.strftime(u"%Y%m%d%H%M%S")+u"', '%Y%m%d%H%i%s')"
elif hasattr(value, '__iter__'):
return self.db.literal(CNV.object2JSON(value))
elif isinstance(value, dict):
return self.db.literal(CNV.object2JSON(value))
elif Math.is_number(value):
return unicode(value)
else:
return self.db.literal(value)
except Exception, e:
Log.error(u"problem quoting SQL", e)
def quote_sql(self, value, param=None):
"""
USED TO EXPAND THE PARAMETERS TO THE SQL() OBJECT
"""
try:
if isinstance(value, SQL):
if not param:
return value
param = {k: self.quote_sql(v) for k, v in param.items()}
return expand_template(value, param)
elif isinstance(value, basestring):
return value
elif isinstance(value, dict):
return self.db.literal(CNV.object2JSON(value))
elif hasattr(value, '__iter__'):
return u"(" + u",".join([self.quote_sql(vv) for vv in value]) + u")"
else:
return unicode(value)
except Exception, e:
Log.error(u"problem quoting SQL", e)
def quote_column(self, column_name, table=None):
if isinstance(column_name, basestring):
if table:
column_name = table + "." + column_name
return SQL(u"`" + column_name.replace(u".", u"`.`") + u"`") #MY SQL QUOTE OF COLUMN NAMES
elif isinstance(column_name, list):
if table:
return SQL(u", ".join([self.quote_column(table + "." + c) for c in column_name]))
return SQL(u", ".join([self.quote_column(c) for c in column_name]))
else:
#ASSUME {u"name":name, u"value":value} FORM
return SQL(column_name.value + u" AS " + self.quote_column(column_name.name))
def sort2sqlorderby(self, sort):
sort = Q.normalize_sort(sort)
return u",\n".join([self.quote_column(s.field) + (" DESC" if s.sort == -1 else " ASC") for s in sort])
def esfilter2sqlwhere(self, esfilter):
return SQL(self._filter2where(esfilter))
def _filter2where(self, esfilter):
esfilter=struct.wrap(esfilter)
if esfilter[u"and"] != None:
return u"("+u" AND ".join([self._filter2where(a) for a in esfilter[u"and"]])+u")"
elif esfilter[u"or"] != None:
return u"("+u" OR ".join([self._filter2where(a) for a in esfilter[u"or"]])+u")"
elif esfilter[u"not"]:
return u"NOT ("+self._filter2where(esfilter[u"not"])+u")"
elif esfilter.term != None:
return u"("+u" AND ".join([self.quote_column(col)+u"="+self.quote_value(val) for col, val in esfilter.term.items()])+u")"
elif esfilter.terms:
for col, v in esfilter.terms.items():
try:
int_list=CNV.value2intlist(v)
filter=int_list_packer(col, int_list)
return self._filter2where(filter)
except Exception, e:
return self.quote_column(col)+u" in ("+", ".join([self.quote_value(val) for val in v])+")"
elif esfilter.script != None:
return u"("+esfilter.script+u")"
elif esfilter.range != None:
name2sign={
u"gt": u">",
u"gte": u">=",
u"lte": u"<=",
u"lt": u"<"
}
return u"(" + u" AND ".join([
" AND ".join([
self.quote_column(col) + name2sign[sign] + self.quote_value(value)
for sign, value in ranges.items()
])
for col, ranges in esfilter.range.items()
]) + u")"
elif esfilter.exists != None:
if isinstance(esfilter.exists, basestring):
return u"("+self.quote_column(esfilter.exists)+u" IS NOT Null)"
else:
return u"("+self.quote_column(esfilter.exists.field)+u" IS NOT Null)"
else:
Log.error(u"Can not convert esfilter to SQL: {{esfilter}}", {u"esfilter":esfilter})
def utf8_to_unicode(v):
try:
if isinstance(v, str):
return v.decode(u"utf8")
else:
return v
except Exception, e:
Log.error(u"not expected", e)
#ACTUAL SQL, DO NOT QUOTE THIS STRING
class SQL(unicode):
def __init__(self, template='', param=None):
unicode.__init__(self)
self.template=template
self.param=param
def __str__(self):
Log.error(u"do not do this")
def int_list_packer(term, values):
"""
return singletons, ranges and exclusions
"""
singletons=set()
ranges=[]
exclude=set()
sorted=Q.sort(values)
last=sorted[0]
curr_start=last
curr_excl=set()
for v in sorted[1:]:
if v<=last+1:
pass
elif v-last > 3:
if last==curr_start:
singletons.add(last)
elif last-curr_start - len(curr_excl) < 6 or ((last-curr_start) < len(curr_excl)*3):
#small ranges are singletons, sparse ranges are singletons
singletons |= set(range(curr_start, last+1))
singletons -= curr_excl
else:
ranges.append({"gte":curr_start, "lte":last})
exclude |= curr_excl
curr_start=v
curr_excl=set()
else:
if v-curr_start >= len(curr_excl)*3:
add_me = set(range(last + 1, v))
curr_excl |= add_me
else:
ranges.append({"range":{term:{"gte":curr_start, "lte":last}}})
exclude |= curr_excl
curr_start=v
curr_excl=set()
last=v
if last > curr_start+1:
ranges.append({"gte":curr_start, "lte":last})
else:
singletons.add(curr_start)
singletons.add(last)
if ranges:
r={"or":[{"range":{term:r}} for r in ranges]}
if exclude:
r = {"and":[r, {"not":{"terms":{term:Q.sort(exclude)}}}]}
if singletons:
return {"or":[
{"terms":{term: Q.sort(singletons)}},
r
]}
else:
return r
else:
raise Except("no packing possible")

Просмотреть файл

@ -1,314 +0,0 @@
# encoding: utf-8
#
from datetime import datetime
import re
import sha
import time
import requests
import struct
from .maths import Math
from .queries import Q
from .cnv import CNV
from .logs import Log
from .struct import nvl
from .struct import Struct, StructList
DEBUG=False
class ElasticSearch(object):
def __init__(self, settings):
assert settings.host
assert settings.index
assert settings.type
self.metadata = None
if not settings.port: settings.port=9200
self.debug=nvl(settings.debug, DEBUG)
globals()["DEBUG"]=DEBUG or self.debug
self.settings=settings
self.path=settings.host+":"+unicode(settings.port)+"/"+settings.index+"/"+settings.type
@staticmethod
def create_index(settings, schema):
if isinstance(schema, basestring):
schema=CNV.JSON2object(schema)
ElasticSearch.post(
settings.host+":"+unicode(settings.port)+"/"+settings.index,
data=CNV.object2JSON(schema),
headers={"Content-Type":"application/json"}
)
time.sleep(2)
es=ElasticSearch(settings)
return es
@staticmethod
def delete_index(settings, index=None):
index=nvl(index, settings.index)
ElasticSearch.delete(
settings.host+":"+unicode(settings.port)+"/"+index,
)
def get_aliases(self):
"""
RETURN LIST OF {"alias":a, "index":i} PAIRS
ALL INDEXES INCLUDED, EVEN IF NO ALIAS {"alias":Null}
"""
data=self.get_metadata().indices
output=[]
for index, desc in data.items():
if not desc["aliases"]:
output.append({"index":index, "alias":None})
else:
for a in desc["aliases"]:
output.append({"index":index, "alias":a})
return struct.wrap(output)
def get_metadata(self):
if not self.metadata:
response=self.get(self.settings.host+":"+unicode(self.settings.port)+"/_cluster/state")
self.metadata=response.metadata
return self.metadata
def get_schema(self):
return self.get_metadata().indicies[self.settings.index]
#DELETE ALL INDEXES WITH GIVEN PREFIX, EXCEPT name
def delete_all_but(self, prefix, name):
for a in self.get_aliases():
# MATCH <prefix>YYMMDD_HHMMSS FORMAT
if re.match(re.escape(prefix)+"\\d{8}_\\d{6}", a.index) and a.index!=name:
ElasticSearch.delete_index(self.settings, a.index)
@staticmethod
def proto_name(prefix, timestamp=None):
if not timestamp:
timestamp = datetime.utcnow()
return prefix + CNV.datetime2string(timestamp, "%Y%m%d_%H%M%S")
def add_alias(self, alias):
self.metadata = None
requests.post(
self.settings.host+":"+unicode(self.settings.port)+"/_aliases",
CNV.object2JSON({
"actions":[
{"add":{"index":self.settings.index, "alias":alias}}
]
})
)
def get_proto(self, alias):
output=Q.sort([
a.index
for a in self.get_aliases()
if re.match(re.escape(alias)+"\\d{8}_\\d{6}", a.index) and not a.alias
])
return output
def is_proto(self, index):
for a in self.get_aliases():
if a.index==index and a.alias:
return False
return True
def delete_record(self, query):
if isinstance(query, dict):
ElasticSearch.delete(
self.path+"/_query",
data=CNV.object2JSON(query)
)
else:
ElasticSearch.delete(
self.path+"/"+query
)
def extend(self, records):
self.add(records)
# RECORDS MUST HAVE id AND json AS A STRING OR
# HAVE id AND value AS AN OBJECT
def add(self, records):
# ADD LINE WITH COMMAND
lines=[]
for r in records:
id=r["id"]
if "json" in r:
json=r["json"]
elif "value" in r:
json=CNV.object2JSON(r["value"])
else:
Log.error("Expecting every record given to have \"value\" or \"json\" property")
if id == None:
id = sha.new(json).hexdigest()
lines.append('{"index":{"_id":'+CNV.object2JSON(id)+'}}')
lines.append(json)
if not lines: return
response=ElasticSearch.post(
self.path+"/_bulk",
data="\n".join(lines).encode("utf8")+"\n",
headers={"Content-Type":"text"}
)
items=response["items"]
for i, item in enumerate(items):
if not item.index.ok:
Log.error("{{error}} while loading line:\n{{line}}", {
"error":item.index.error,
"line":lines[i*2+1]
})
if self.debug:
Log.note("{{num}} items added", {"num":len(lines)/2})
# -1 FOR NO REFRESH
def set_refresh_interval(self, seconds):
if seconds <= 0:
interval = "-1"
else:
interval = unicode(seconds) + "s"
response=ElasticSearch.put(
self.settings.host + ":" + unicode(
self.settings.port) + "/" + self.settings.index + "/_settings",
data="{\"index.refresh_interval\":\"" + interval + "\"}"
)
if response.content != '{"ok":true}':
Log.error("Can not set refresh interval ({{error}})", {
"error": response.content
})
def search(self, query):
try:
return ElasticSearch.post(self.path+"/_search", data=CNV.object2JSON(query))
except Exception, e:
Log.error("Problem with search", e)
def threaded_queue(self, size):
return Threaded_Queue(self, size)
@staticmethod
def post(*list, **args):
try:
response=requests.post(*list, **args)
if DEBUG: Log.note(response.content[:130])
details=CNV.JSON2object(response.content)
if details.error:
Log.error(details.error)
return details
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def get(*list, **args):
try:
response=requests.get(*list, **args)
if DEBUG: Log.note(response.content[:130])
details=CNV.JSON2object(response.content)
if details.error:
Log.error(details.error)
return details
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def put(*list, **args):
try:
response=requests.put(*list, **args)
if DEBUG: Log.note(response.content)
return response
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def delete(*list, **args):
try:
response=requests.delete(*list, **args)
if DEBUG: Log.note(response.content)
return response
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def scrub(r):
"""
REMOVE KEYS OF DEGENERATE VALUES (EMPTY STRINGS, EMPTY LISTS, AND NULLS)
TO LOWER CASE
CONVERT STRINGS OF NUMBERS TO NUMBERS
RETURNS **COPY**, DOES NOT CHANGE ORIGINAL
"""
return struct.wrap(_scrub(r))
def _scrub(r):
try:
if r == None:
return None
elif isinstance(r, basestring):
if r == "":
return None
return r.lower()
elif Math.is_number(r):
return CNV.value2number(r)
elif isinstance(r, dict):
if isinstance(r, Struct):
r = r.dict
output = {}
for k, v in r.items():
v = _scrub(v)
if v != None:
output[k.lower()] = v
if len(output) == 0:
return None
return output
elif hasattr(r, '__iter__'):
if isinstance(r, StructList):
r = r.list
output = []
for v in r:
v = _scrub(v)
if v != None:
output.append(v)
if not output:
return None
try:
return Q.sort(output)
except Exception:
return output
else:
return r
except Exception, e:
Log.warning("Can not scrub: {{json}}", {"json": r})

Просмотреть файл

@ -1,101 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
import sys
from .struct import nvl
class Emailer:
def __init__(self, settings):
self.settings=settings
def send_email(self,
from_address = None,
to_addrs = None,
subject='No Subject',
text_data = None,
html_data = None
):
"""Sends an email.
from_addr is an email address; to_addrs is a list of email adresses.
Addresses can be plain (e.g. "jsmith@example.com") or with real names
(e.g. "John Smith <jsmith@example.com>").
text_data and html_data are both strings. You can specify one or both.
If you specify both, the email will be sent as a MIME multipart
alternative, i.e., the recipient will see the HTML content if his
viewer supports it; otherwise he'll see the text content.
"""
settings=self.settings
from_address=nvl(from_address, settings.from_address)
if not from_address or not to_addrs:
raise Exception("Both from_addr and to_addrs must be specified")
if not text_data and not html_data:
raise Exception("Must specify either text_data or html_data")
if settings.use_ssl:
server = smtplib.SMTP_SSL(settings.host, settings.port)
else:
server = smtplib.SMTP(settings.host, settings.port)
if settings.username and settings.password:
server.login(settings.username, settings.password)
if not html_data:
msg = MIMEText(text_data)
elif not text_data:
msg = MIMEMultipart()
msg.preamble = subject
msg.attach(MIMEText(html_data, 'html'))
else:
msg = MIMEMultipart('alternative')
msg.attach(MIMEText(text_data, 'plain'))
msg.attach(MIMEText(html_data, 'html'))
msg['Subject'] = subject
msg['From'] = from_address
msg['To'] = ', '.join(to_addrs)
server.sendmail(from_address, to_addrs, msg.as_string())
server.quit()
if sys.hexversion < 0x020603f0:
# versions earlier than 2.6.3 have a bug in smtplib when sending over SSL:
# http://bugs.python.org/issue4066
# Unfortunately the stock version of Python in Snow Leopard is 2.6.1, so
# we patch it here to avoid having to install an updated Python version.
import socket
import ssl
def _get_socket_fixed(self, host, port, timeout):
if self.debuglevel > 0: print>> sys.stderr, 'connect:', (host, port)
new_socket = socket.create_connection((host, port), timeout)
new_socket = ssl.wrap_socket(new_socket, self.keyfile, self.certfile)
self.file = smtplib.SSLFakeFile(new_socket)
return new_socket
smtplib.SMTP_SSL._get_socket = _get_socket_fixed

Просмотреть файл

@ -1,120 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import codecs
from datetime import datetime
import os
import shutil
from .struct import listwrap, nvl
from .cnv import CNV
class File(object):
def __init__(self, filename):
if filename == None:
from .logs import Log
Log.error("File must be given a filename")
#USE UNIX STANDARD
self._filename = "/".join(filename.split(os.sep))
@property
def filename(self):
return self._filename.replace("/", os.sep)
@property
def abspath(self):
return os.path.abspath(self._filename)
def backup_name(self, timestamp=None):
"""
RETURN A FILENAME THAT CAN SERVE AS A BACKUP FOR THIS FILE
"""
suffix = CNV.datetime2string(nvl(timestamp, datetime.now()), "%Y%m%d_%H%M%S")
parts = self._filename.split(".")
if len(parts) == 1:
output = self._filename + "." + suffix
elif len(parts) > 1 and parts[-2][-1] == "/":
output = self._filename + "." + suffix
else:
parts.insert(-1, suffix)
output = ".".join(parts)
return output
def read(self, encoding="utf-8"):
with codecs.open(self._filename, "r", encoding=encoding) as file:
return file.read()
def read_ascii(self):
if not self.parent.exists: self.parent.create()
with open(self._filename, "r") as file:
return file.read()
def write_ascii(self, content):
if not self.parent.exists: self.parent.create()
with open(self._filename, "w") as file:
file.write(content)
def write(self, data):
if not self.parent.exists: self.parent.create()
with open(self._filename, "w") as file:
for d in listwrap(data):
file.write(d)
def iter(self):
return codecs.open(self._filename, "r")
def append(self, content):
if not self.parent.exists: self.parent.create()
with open(self._filename, "a") as output_file:
output_file.write(content)
def delete(self):
try:
if os.path.isdir(self._filename):
shutil.rmtree(self._filename)
elif os.path.isfile(self._filename):
os.remove(self._filename)
return self
except Exception, e:
if e.strerror=="The system cannot find the path specified":
return
from .logs import Log
Log.error("Could not remove file", e)
def backup(self):
names=self._filename.split("/")[-1].split(".")
if len(names)==1:
backup=File(self._filename+".backup "+datetime.utcnow().strftime("%Y%m%d %H%i%s"))
def create(self):
try:
os.makedirs(self._filename)
except Exception, e:
from .logs import Log
Log.error("Could not make directory {{dir_name}}", {"dir_name":self._filename}, e)
@property
def parent(self):
return File("/".join(self._filename.split("/")[:-1]))
@property
def exists(self):
if self._filename in ["", "."]: return True
try:
return os.path.exists(self._filename)
except Exception, e:
return False

Просмотреть файл

@ -1,181 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime, time
from decimal import Decimal
import json
import re
try:
# StringBuilder IS ABOUT 2x FASTER THAN list()
from __pypy__.builders import StringBuilder
use_pypy = True
except Exception, e:
use_pypy = False
class StringBuilder(list):
def __init__(self, length=None):
list.__init__(self)
def build(self):
return "".join(self)
class PyPyJSONEncoder(object):
"""
pypy DOES NOT OPTIMIZE GENERATOR CODE WELL
"""
def __init__(self):
object.__init__(self)
def encode(self, value, pretty=False):
if pretty:
return json.dumps(json_scrub(value), indent=4, sort_keys=True, separators=(',', ': '))
_buffer = StringBuilder(1024)
_value2json(value, _buffer.append)
output = _buffer.build()
return output
class cPythonJSONEncoder(object):
def __init__(self):
object.__init__(self)
def encode(self, value, pretty=False):
if pretty:
return json.dumps(json_scrub(value), indent=4, sort_keys=True, separators=(',', ': '))
return json.dumps(json_scrub(value))
# OH HUM, cPython with uJSON, OR pypy WITH BUILTIN JSON?
# http://liangnuren.wordpress.com/2012/08/13/python-json-performance/
# http://morepypy.blogspot.ca/2011/10/speeding-up-json-encoding-in-pypy.html
if use_pypy:
json_encoder = PyPyJSONEncoder()
json_decoder = json._default_decoder
else:
json_encoder = cPythonJSONEncoder()
json_decoder = json._default_decoder
def _value2json(value, appender):
if isinstance(value, basestring):
_string2json(value, appender)
elif value == None:
appender("null")
elif value is True:
appender('true')
elif value is False:
appender('false')
elif isinstance(value, (int, long, Decimal)):
appender(str(value))
elif isinstance(value, float):
appender(repr(value))
elif isinstance(value, datetime):
appender(unicode(long(time.mktime(value.timetuple())*1000)))
elif isinstance(value, dict):
_dict2json(value, appender)
elif hasattr(value, '__iter__'):
_list2json(value, appender)
else:
raise Exception(repr(value)+" is not JSON serializable")
def _list2json(value, appender):
appender("[")
first = True
for v in value:
if first:
first = False
else:
appender(", ")
_value2json(v, appender)
appender("]")
def _dict2json(value, appender):
items = value.iteritems()
appender("{")
first = True
for k, v in value.iteritems():
if first:
first = False
else:
appender(", ")
_string2json(unicode(k), appender)
appender(": ")
_value2json(v, appender)
appender("}")
special_find = u"\\\"\t\n\r".find
replacement = [u"\\\\", u"\\\"", u"\\t", u"\\n", u"\\r"]
ESCAPE = re.compile(r'[\x00-\x1f\\"\b\f\n\r\t]')
ESCAPE_DCT = {
'\\': '\\\\',
'"': '\\"',
'\b': '\\b',
'\f': '\\f',
'\n': '\\n',
'\r': '\\r',
'\t': '\\t',
}
for i in range(0x20):
ESCAPE_DCT.setdefault(chr(i), '\\u{0:04x}'.format(i))
def _string2json(value, appender):
def replace(match):
return ESCAPE_DCT[match.group(0)]
appender("\"")
appender(ESCAPE.sub(replace, value))
appender("\"")
#REMOVE VALUES THAT CAN NOT BE JSON-IZED
def json_scrub(r):
return _scrub(r)
def _scrub(r):
if r == None:
return None
elif isinstance(r, dict):
output = {}
for k, v in r.iteritems():
v = _scrub(v)
output[k] = v
return output
elif hasattr(r, '__iter__'):
output = []
for v in r:
v = _scrub(v)
output.append(v)
return output
elif isinstance(r, Decimal):
return float(r)
else:
return r

Просмотреть файл

@ -1,400 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime, timedelta
import traceback
import logging
import sys
from .struct import listwrap, nvl
import struct, threads
from .strings import indent, expand_template
from .threads import Thread
ERROR="ERROR"
WARNING="WARNING"
NOTE="NOTE"
main_log = None
logging_multi = None
class Log(object):
"""
FOR STRUCTURED LOGGING AND EXCEPTION CHAINING
"""
@classmethod
def new_instance(cls, settings):
settings=struct.wrap(settings)
if settings["class"]:
if not settings["class"].startswith("logging.handlers."):
return make_log_from_settings(settings)
# elif settings["class"]=="sys.stdout":
#CAN BE SUPER SLOW
else:
return Log_usingLogger(settings)
if settings.file: return Log_usingFile(file)
if settings.filename: return Log_usingFile(settings.filename)
if settings.stream: return Log_usingStream(settings.stream)
@classmethod
def add_log(cls, log):
logging_multi.add_log(log)
@staticmethod
def debug(template=None, params=None):
"""
USE THIS FOR DEBUGGING (AND EVENTUAL REMOVAL)
"""
Log.note(nvl(template, ""), params)
@staticmethod
def println(template, params=None):
Log.note(template, params)
@staticmethod
def note(template, params=None):
template="{{log_timestamp}} - "+template
params = nvl(params, {}).copy()
#NICE TO GATHER MANY MORE ITEMS FOR LOGGING (LIKE STACK TRACES AND LINE NUMBERS)
params["log_timestamp"]=datetime.utcnow().strftime("%H:%M:%S")
main_log.write(template, params)
@staticmethod
def warning(template, params=None, cause=None):
if isinstance(params, BaseException):
cause=params
params = None
if cause and not isinstance(cause, Except):
cause=Except(WARNING, unicode(cause), trace=format_trace(traceback.extract_tb(sys.exc_info()[2]), 0))
e = Except(WARNING, template, params, cause, format_trace(traceback.extract_stack(), 1))
Log.note(unicode(e))
#raise an exception with a trace for the cause too
@staticmethod
def error(
template, #human readable template
params=None, #parameters for template
cause=None, #pausible cause
offset=0 #stack trace offset (==1 if you do not want to report self)
):
if params and isinstance(struct.listwrap(params)[0], BaseException):
cause=params
params = None
if cause and not isinstance(cause, Except):
cause=[Except(ERROR, unicode(cause), trace=format_trace(traceback.extract_tb(sys.exc_info()[2]), offset))]
else:
cause=listwrap(cause)
trace=format_trace(traceback.extract_stack(), 1+offset)
e=Except(ERROR, template, params, cause, trace)
raise e
#RUN ME FIRST TO SETUP THE THREADED LOGGING
@staticmethod
def start(settings=None):
##http://victorlin.me/2012/08/good-logging-practice-in-python/
if not settings: return
if not settings.log: return
globals()["logging_multi"]=Log_usingMulti()
globals()["main_log"]=logging_multi
for log in listwrap(settings.log):
Log.add_log(Log.new_instance(log))
@staticmethod
def stop():
main_log.stop()
def write(self):
Log.error("not implemented")
def format_trace(tbs, trim=0):
tbs.reverse()
list = []
for filename, lineno, name, line in tbs[trim:]:
item = 'at %s:%d (%s)\n' % (filename,lineno,name)
list.append(item)
return "".join(list)
#def format_trace(tb, trim=0):
# list = []
# for filename, lineno, name, line in traceback.extract_tb(tb)[0:-trim]:
# item = 'File "%s", line %d, in %s\n' % (filename,lineno,name)
# if line:
# item = item + '\t%s\n' % line.strip()
# list.append(item)
# return "".join(list)
class Except(Exception):
def __init__(self, type=ERROR, template=None, params=None, cause=None, trace=None):
super(Exception, self).__init__(self)
self.type=type
self.template=template
self.params=params
self.cause=cause
self.trace=trace
@property
def message(self):
return unicode(self)
def __str__(self):
output=self.template
if self.params: output=expand_template(output, self.params)
if self.trace:
output+="\n"+indent(self.trace)
if self.cause:
output+="\ncaused by\n\t"+"\nand caused by\n\t".join([c.__str__() for c in self.cause])
return output+"\n"
class BaseLog(object):
def write(self, template, params):
pass
def stop(self):
pass
class Log_usingFile(BaseLog):
def __init__(self, file):
assert file
from files import File
self.file=File(file)
if self.file.exists:
self.file.backup()
self.file.delete()
self.file_lock=threads.Lock()
def write(self, template, params):
from files import File
with self.file_lock:
File(self.filename).append(expand_template(template, params))
#WRAP PYTHON CLASSIC logger OBJECTS
class Log_usingLogger(BaseLog):
def __init__(self, settings):
self.logger=logging.Logger("unique name", level=logging.INFO)
self.logger.addHandler(make_log_from_settings(settings))
def write(self, template, params):
# http://docs.python.org/2/library/logging.html#logging.LogRecord
self.logger.info(expand_template(template, params))
def make_log_from_settings(settings):
assert settings["class"]
# IMPORT MODULE FOR HANDLER
path = settings["class"].split(".")
class_name = path[-1]
path = ".".join(path[:-1])
try:
temp = __import__(path, globals(), locals(), [class_name], -1)
except Exception, e:
raise Exception("can not find path class: " + path)
constructor = object.__getattribute__(temp, class_name)
#IF WE NEED A FILE, MAKE SURE DIRECTORY EXISTS
if settings.filename:
from files import File
f = File(settings.filename)
if not f.parent.exists:
f.parent.create()
params = settings.dict
del params['class']
return constructor(**params)
class Log_usingStream(BaseLog):
#stream CAN BE AN OBJCET WITH write() METHOD, OR A STRING
#WHICH WILL eval() TO ONE
def __init__(self, stream):
assert stream
if isinstance(stream, basestring):
self.stream=eval(stream)
name=stream
else:
self.stream=stream
name="stream"
#WRITE TO STREAMS CAN BE *REALLY* SLOW, WE WILL USE A THREAD
from threads import Queue
self.queue=Queue()
def worker(please_stop):
queue=self.queue
while not please_stop:
next_run = datetime.utcnow() + timedelta(seconds=0.3)
logs = queue.pop_all()
if logs:
lines=[]
for log in logs:
try:
if log==Thread.STOP:
please_stop.go()
next_run = datetime.utcnow()
break
lines.append(expand_template(log.get("template", None), log.get("params", None)))
except Exception, e:
pass
try:
self.stream.write("\n".join(lines))
self.stream.write("\n")
except Exception, e:
pass
Thread.sleep(till=next_run)
self.thread=Thread("log to "+name, worker)
self.thread.start()
def write(self, template, params):
try:
self.queue.add({"template":template, "params":params})
return self
except Exception, e:
raise e #OH NO!
def stop(self):
try:
self.queue.add(Thread.STOP) #BE PATIENT, LET REST OF MESSAGE BE SENT
self.thread.join()
except Exception, e:
pass
try:
self.queue.close()
except Exception, f:
pass
class Log_usingThread(BaseLog):
def __init__(self, logger):
#DELAYED LOAD FOR THREADS MODULE
from threads import Queue
self.queue=Queue()
def worker():
while True:
logs = self.queue.pop_all()
for log in logs:
if log==Thread.STOP:
break
logger.write(**log)
Thread.sleep(1)
self.thread=Thread("log thread", worker)
self.thread.start()
def write(self, template, params):
try:
self.queue.add({"template":template, "params":params})
return self
except Exception, e:
sys.stdout.write("IF YOU SEE THIS, IT IS LIKELY YOU FORGOT TO RUN Log.start() FIRST")
raise e #OH NO!
def stop(self):
try:
self.queue.add(Thread.STOP) #BE PATIENT, LET REST OF MESSAGE BE SENT
self.thread.join()
except Exception, e:
pass
try:
self.queue.close()
except Exception, f:
pass
class Log_usingMulti(BaseLog):
def __init__(self):
self.many=[]
def write(self, template, params):
for m in self.many:
try:
m.write(template, params)
except Exception, e:
pass
return self
def add_log(self, logger):
self.many.append(logger)
return self
def remove_log(self, logger):
self.many.remove(logger)
return self
def clear_log(self):
self.many=[]
def stop(self):
for m in self.many:
try:
m.stop()
except Exception, e:
pass
if not main_log:
main_log = Log_usingStream("sys.stdout")

Просмотреть файл

@ -1,115 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import math
from . import struct
from .struct import Null, nvl
from .logs import Log
from .strings import find_first
class Math(object):
@staticmethod
def bayesian_add(a, b):
if a>=1 or b>=1 or a<=0 or b<=0: Log.error("Only allowed values *between* zero and one")
return a*b/(a*b+(1-a)*(1-b))
# FOR GOODNESS SAKE - IF YOU PROVIDE A METHOD abs(), PLEASE PROVIDE IT'S COMPLEMENT
# x = abs(x)*sign(x)
# FOUND IN numpy, BUT WE USUALLY DO NOT NEED TO BRING IN A BIG LIB FOR A SIMPLE DECISION
@staticmethod
def sign(v):
if v<0: return -1
if v>0: return +1
return 0
@staticmethod
def is_number(s):
try:
float(s)
return True
except Exception:
return False
@staticmethod
def is_integer(s):
try:
if float(s)==round(float(s), 0):
return True
return False
except Exception:
return False
@staticmethod
def round_sci(value, decimal=None, digits=None):
if digits != None:
m=pow(10, math.floor(math.log10(digits)))
return round(value/m, digits)*m
return round(value, decimal)
@staticmethod
def floor(value, mod=None):
"""
x == floor(x, a) + mod(x, a) FOR ALL a
"""
mod = nvl(mod, 1)
v = int(math.floor(value))
return v - (v % mod)
#RETURN A VALUE CLOSE TO value, BUT WITH SHORTER len(unicode(value))<len(unicode(value)):
@staticmethod
def approx_str(value):
v=unicode(value)
d=v.find(".")
if d==-1: return value
i=find_first(v, ["9999", "0000"], d)
if i==-1: return value
return Math.round_sci(value, decimal=i-d-1)
@staticmethod
def min(values):
output = Null
for v in values:
if v == None:
continue
if math.isnan(v):
continue
if output == None:
output = v
continue
output = min(output, v)
return output
@staticmethod
def max(values):
output = Null
for v in values:
if v == None:
continue
if math.isnan(v):
continue
if output == None:
output = v
continue
output = max(output, v)
return output

Просмотреть файл

@ -1,95 +0,0 @@
# encoding: utf-8
#
from multiprocessing.queues import Queue
from .logs import Log
class worker(object):
def __init__(func, inbound, outbound, logging):
logger = Log_usingInterProcessQueue(logging)
class Log_usingInterProcessQueue(Log):
def __init__(self, outbound):
self.outbound = outbound
def write(self, template, params):
self.outbound.put({"template": template, "param": params})
class Multiprocess(object):
def __init__(self, functions):
self.outbound = Queue()
self.inbound = Queue()
self.inbound = Queue()
#MAKE
#MAKE THREADS
self.threads = []
for t, f in enumerate(functions):
thread = worker(
"worker " + unicode(t),
f,
self.inbound,
self.outbound,
)
self.threads.append(thread)
def __enter__(self):
return self
#WAIT FOR ALL QUEUED WORK TO BE DONE BEFORE RETURNING
def __exit__(self, a, b, c):
try:
self.inbound.close() # SEND STOPS TO WAKE UP THE WORKERS WAITING ON inbound.pop()
except Exception, e:
Log.warning("Problem adding to inbound", e)
self.join()
#IF YOU SENT A stop(), OR STOP, YOU MAY WAIT FOR SHUTDOWN
def join(self):
try:
#WAIT FOR FINISH
for t in self.threads:
t.join()
except (KeyboardInterrupt, SystemExit):
Log.note("Shutdow Started, please be patient")
except Exception, e:
Log.error("Unusual shutdown!", e)
finally:
for t in self.threads:
t.keep_running = False
for t in self.threads:
t.join()
self.inbound.close()
self.outbound.close()
#RETURN A GENERATOR THAT HAS len(parameters) RESULTS (ANY ORDER)
def execute(self, parameters):
#FILL QUEUE WITH WORK
self.inbound.extend(parameters)
num = len(parameters)
def output():
for i in xrange(num):
result = self.outbound.pop()
yield result
return output()
#EXTERNAL COMMAND THAT RETURNS IMMEDIATELY
def stop(self):
self.inbound.close() #SEND STOPS TO WAKE UP THE WORKERS WAITING ON inbound.pop()
for t in self.threads:
t.keep_running = False

Просмотреть файл

@ -1,86 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from .struct import Null
from .logs import Log
class Multiset(object):
def __init__(self, list=Null, key_field=None, count_field=None):
if not key_field and not count_field:
self.dic = dict()
for i in list:
self.add(i)
return
else:
self.dic={i[key_field]:i[count_field] for i in list}
def __iter__(self):
for k, m in self.dic.items():
for i in range(m):
yield k
def items(self):
return self.dic.items()
def keys(self):
return self.dic.keys()
def add(self, value):
if value in self.dic:
self.dic[value]+=1
else:
self.dic[value]=1
def extend(self, values):
for v in values:
self.add(v)
def remove(self, value):
if value not in self.dic:
Log.error("{{value}} is not in multiset", {"value":value})
self._remove(value)
def copy(self):
output = Multiset()
output.dic=self.dic.copy()
return output
def _remove(self, value):
count=self.dic.get(value, None)
if count == None:
return
count-=1
if count==0:
del(self.dic[value])
else:
self.dic[value]=count
def __sub__(self, other):
output=self.copy()
for o in other:
output._remove(o)
return output
def __set__(self, other):
return set(self.dic.keys())
def __len__(self):
return sum(self.dic.values())
def count(self, value):
if value in self.dic:
return self.dic[value]
else:
return 0

Просмотреть файл

@ -1,160 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import threading
from .struct import nvl
from .logs import Log
from .threads import Queue, Thread
DEBUG = True
class worker_thread(threading.Thread):
#in_queue MUST CONTAIN HASH OF PARAMETERS FOR load()
def __init__(self, name, in_queue, out_queue, function):
threading.Thread.__init__(self)
self.name=name
self.in_queue=in_queue
self.out_queue=out_queue
self.function=function
self.keep_running=True
self.num_runs=0
self.start()
#REQUIRED TO DETECT KEYBOARD, AND OTHER, INTERRUPTS
def join(self, timeout=None):
while self.isAlive():
Log.note("Waiting on thread {{thread}}", {"thread":self.name})
threading.Thread.join(self, nvl(timeout, 0.5))
def run(self):
got_stop=False
while self.keep_running:
request = self.in_queue.pop()
if request == Thread.STOP:
got_stop=True
if self.in_queue.queue:
Log.warning("programmer error")
break
if not self.keep_running:
break
try:
if DEBUG and hasattr(self.function, "func_name"):
Log.note("run {{function}}", {"function": self.function.func_name})
result = self.function(**request)
if self.out_queue:
self.out_queue.add({"response": result})
except Exception, e:
Log.warning("Can not execute with params={{params}}", {"params": request}, e)
if self.out_queue:
self.out_queue.add({"exception": e})
finally:
self.num_runs += 1
self.keep_running = False
if self.num_runs==0:
Log.warning("{{name}} thread did no work", {"name":self.name})
if DEBUG and self.num_runs!=1:
Log.note("{{name}} thread did {{num}} units of work", {
"name":self.name,
"num":self.num_runs
})
if got_stop and self.in_queue.queue:
Log.warning("multithread programmer error")
if DEBUG:
Log.note("{{thread}} DONE", {"thread":self.name})
def stop(self):
self.keep_running=False
#PASS A SET OF FUNCTIONS TO BE EXECUTED (ONE PER THREAD)
#PASS AN (ITERATOR/LIST) OF PARAMETERS TO BE ISSUED TO NEXT AVAILABLE THREAD
class Multithread(object):
def __init__(self, functions):
self.outbound=Queue()
self.inbound=Queue()
#MAKE THREADS
self.threads=[]
for t, f in enumerate(functions):
thread=worker_thread("worker "+unicode(t), self.inbound, self.outbound, f)
self.threads.append(thread)
def __enter__(self):
return self
#WAIT FOR ALL QUEUED WORK TO BE DONE BEFORE RETURNING
def __exit__(self, type, value, traceback):
try:
if isinstance(value, Exception):
self.inbound.close()
self.inbound.add(Thread.STOP)
self.join()
except Exception, e:
Log.warning("Problem sending stops", e)
#IF YOU SENT A stop(), OR Thread.STOP, YOU MAY WAIT FOR SHUTDOWN
def join(self):
try:
#WAIT FOR FINISH
for t in self.threads:
t.join()
except (KeyboardInterrupt, SystemExit):
Log.note("Shutdow Started, please be patient")
except Exception, e:
Log.error("Unusual shutdown!", e)
finally:
for t in self.threads:
t.keep_running=False
self.inbound.close()
self.outbound.close()
for t in self.threads:
t.join()
#RETURN A GENERATOR THAT HAS len(parameters) RESULTS (ANY ORDER)
def execute(self, request):
#FILL QUEUE WITH WORK
self.inbound.extend(request)
num=len(request)
def output():
for i in xrange(num):
result=self.outbound.pop()
if "exception" in result:
raise result["exception"]
else:
yield result["response"]
return output()
#EXTERNAL COMMAND THAT RETURNS IMMEDIATELY
def stop(self):
self.inbound.close() #SEND STOPS TO WAKE UP THE WORKERS WAITING ON inbound.pop()
for t in self.threads:
t.keep_running=False

Просмотреть файл

@ -1,596 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import sys
from ..logs import Log
from ..struct import nvl, listwrap
from .. import struct
from ..strings import indent, expand_template
from ..struct import StructList, Struct, Null
from ..multiset import Multiset
# A COLLECTION OF DATABASE OPERATORS (RELATIONAL ALGEBRA OPERATORS)
def run(query):
query = struct.wrap(query)
if isinstance(query["from"], list):
_from = query["from"]
else:
_from = run(query["from"])
if query.edges != None:
Log.error("not implemented yet")
if query.filter != None:
Log.error("not implemented yet")
for param in listwrap(query.window):
window(_from, param)
if query.where != None:
w = query.where
_from = filter(_from, w)
if query.sort != None:
_from = sort(_from, query.sort)
if query.select != None:
_from = select(_from, query.select)
return _from
def groupby(data, keys=None, size=None, min_size=None, max_size=None):
#return list of (keys, values) pairs where
#group by the set of set of keys
#values IS LIST OF ALL data that has those keys
if size != None or min_size != None or max_size != None:
if size != None: max_size = size
return groupby_min_max_size(data, min_size=min_size, max_size=max_size)
try:
def keys2string(x):
#REACH INTO dict TO GET PROPERTY VALUE
return "|".join([unicode(x[k]) for k in keys])
def get_keys(d):
return struct.wrap({k: d[k] for k in keys})
agg = {}
for d in data:
key = keys2string(d)
if key in agg:
pair = agg[key]
else:
pair = (get_keys(d), StructList())
agg[key] = pair
pair[1].append(d)
return agg.values()
except Exception, e:
Log.error("Problem grouping", e)
def index(data, keys=None):
#return dict that uses keys to index data
keys = struct.unwrap(listwrap(keys))
output = dict()
for d in data:
o = output
for k in keys[:-1]:
v = d[k]
o = o.get(v, dict())
v = d[keys[-1]]
o = o.get(v, list())
o.append(d)
return output
def unique_index(data, keys=None):
"""
RETURN dict THAT USES KEYS TO INDEX DATA
ONLY ONE VALUE ALLOWED PER UNIQUE KEY
"""
o = Index(listwrap(keys))
for d in data:
try:
o.add(d)
except Exception, e:
Log.error("index {{index}} is not unique {{key}} maps to both {{value1}} and {{value2}}", {
"index": keys,
"key": select([d], keys)[0],
"value1": o[d],
"value2": d
}, e)
return o
def map(data, relation):
if data == None:
return Null
if isinstance(relation, dict):
return [relation[d] for d in data]
else:
Log.error("Do not know how to handle relation of type {{type}}", {
"type":type(relation)
})
return Null
def select(data, field_name):
#return list with values from field_name
if isinstance(data, Cube): Log.error("Do not know how to deal with cubes yet")
if isinstance(field_name, basestring):
return [d[field_name] for d in data]
return [dict([(k, v) for k, v in x.items() if k in field_name]) for x in data]
def get_columns(data):
output = {}
for d in data:
for k, v in d.items():
if k not in output:
c = {"name": k, "domain": Null}
output[k] = c
# IT WOULD BE NICE TO ADD DOMAIN ANALYSIS HERE
return [{"name": n} for n in output]
def stack(data, name=None, value_column=None, columns=None):
"""
STACK ALL CUBE DATA TO A SINGLE COLUMN, WITH ONE COLUMN PER DIMENSION
>>> s
a b
one 1 2
two 3 4
>>> stack(s)
one a 1
one b 2
two a 3
two b 4
STACK LIST OF HASHES, OR 'MERGE' SEPARATE CUBES
data - expected to be a list of dicts
name - give a name to the new column
value_column - Name given to the new, single value column
columns - explicitly list the value columns (USE SELECT INSTEAD)
"""
assert value_column != None
if isinstance(data, Cube): Log.error("Do not know how to deal with cubes yet")
if columns == None:
columns = data.get_columns()
data = data.select(columns)
name = nvl(name, data.name)
output = []
parts = set()
for r in data:
for c in columns:
v = r[c]
parts.add(c)
output.append({"name": c, "value": v})
edge = struct.wrap({"domain": {"type": "set", "partitions": parts}})
#UNSTACKING CUBES WILL BE SIMPLER BECAUSE THE keys ARE IMPLIED (edges-column)
def unstack(data, keys=None, column=None, value=None):
assert keys != None
assert column != None
assert value != None
if isinstance(data, Cube): Log.error("Do not know how to deal with cubes yet")
output = []
for key, values in groupby(data, keys):
for v in values:
key[v[column]] = v[value]
output.append(key)
return StructList(output)
def normalize_sort(fieldnames):
"""
CONVERT SORT PARAMETERS TO A NORMAL FORM SO EASIER TO USE
"""
if fieldnames == None:
return []
formal = []
for f in listwrap(fieldnames):
if isinstance(f, basestring):
f = {"field": f, "sort": 1}
formal.append(f)
return formal
def sort(data, fieldnames=None):
"""
PASS A FIELD NAME, OR LIST OF FIELD NAMES, OR LIST OF STRUCTS WITH {"field":field_name, "sort":direction}
"""
try:
if data == None:
return Null
if fieldnames == None:
return sorted(data)
if not isinstance(fieldnames, list):
#SPECIAL CASE, ONLY ONE FIELD TO SORT BY
if isinstance(fieldnames, basestring):
def comparer(left, right):
return cmp(nvl(left, Struct())[fieldnames], nvl(right, Struct())[fieldnames])
return sorted(data, cmp=comparer)
else:
#EXPECTING {"field":f, "sort":i} FORMAT
def comparer(left, right):
return fieldnames["sort"] * cmp(nvl(left, Struct())[fieldnames["field"]],
nvl(right, Struct())[fieldnames["field"]])
return sorted(data, cmp=comparer)
formal = normalize_sort(fieldnames)
def comparer(left, right):
left = nvl(left, Struct())
right = nvl(right, Struct())
for f in formal:
try:
result = f["sort"] * cmp(left[f["field"]], right[f["field"]])
if result != 0: return result
except Exception, e:
Log.error("problem with compare", e)
return 0
if isinstance(data, list):
output = sorted(data, cmp=comparer)
elif hasattr(data, "__iter__"):
output = sorted(list(data), cmp=comparer)
else:
Log.error("Do not know how to handle")
return output
except Exception, e:
Log.error("Problem sorting\n{{data}}", {"data": data}, e)
def add(*values):
total = Null
for v in values:
if total == None:
total = v
else:
if v != None:
total += v
return total
def filter(data, where):
"""
where - a function that accepts (record, rownum, rows) and return s boolean
"""
where = wrap_function(where)
return [d for i, d in enumerate(data) if where(d, i, data)]
def wrap_function(func):
"""
RETURN A THREE-PARAMETER WINDOW FUNCTION TO MATCH
"""
numarg = func.__code__.co_argcount
if numarg == 0:
def temp(row, rownum, rows):
return func()
return temp
elif numarg == 1:
def temp(row, rownum, rows):
return func(row)
return temp
elif numarg == 2:
def temp(row, rownum, rows):
return func(row, rownum)
return temp
elif numarg == 3:
return func
def window(data, param):
"""
MAYBE WE CAN DO THIS WITH NUMPY??
data - list of records
"""
name = param.name # column to assign window function result
edges = param.edges # columns to gourp by
sort = param.sort # columns to sort by
value = wrap_function(param.value) # function that takes a record and returns a value (for aggregation)
aggregate = param.aggregate # WindowFunction to apply
_range = param.range # of form {"min":-10, "max":0} to specify the size and relative position of window
if aggregate == None and sort == None and edges == None:
#SIMPLE CALCULATED VALUE
for rownum, r in enumerate(data):
r[name] = value(r, rownum, data)
return
for rownum, r in enumerate(data):
r["__temp__"] = value(r, rownum, data)
for keys, values in groupby(data, edges):
if not values:
continue # CAN DO NOTHING WITH THIS ZERO-SAMPLE
sequence = struct.wrap(sort(values, sort))
head = nvl(_range.max, _range.stop)
tail = nvl(_range.min, _range.start)
#PRELOAD total
total = aggregate()
for i in range(head):
total += sequence[i].__temp__
#WINDOW FUNCTION APPLICATION
for i, r in enumerate(sequence):
r[name] = total.end()
total.add(sequence[i + head].__temp__)
total.sub(sequence[i + tail].__temp__)
for r in data:
r["__temp__"] = Null #CLEANUP
def groupby_size(data, size):
if hasattr(data, "next"):
iterator = data
elif hasattr(data, "__iter__"):
iterator = data.__iter__()
else:
Log.error("do not know how to handle this type")
done = []
def more():
output = []
for i in range(size):
try:
output.append(iterator.next())
except StopIteration:
done.append(True)
break
return output
#THIS IS LAZY
i = 0
while True:
output = more()
yield (i, output)
if len(done) > 0: break
i += 1
def groupby_Multiset(data, min_size, max_size):
# GROUP multiset BASED ON POPULATION OF EACH KEY, TRYING TO STAY IN min/max LIMITS
if min_size == None: min_size = 0
total = 0
i = 0
g = list()
for k, c in data.items():
if total < min_size or total + c < max_size:
total += c
g.append(k)
elif total < max_size:
yield (i, g)
i += 1
total = c
g = [k]
if total >= max_size:
Log.error("({{min}}, {{max}}) range is too strict given step of {{increment}}", {
"min": min_size, "max": max_size, "increment": c
})
if g:
yield (i, g)
def groupby_min_max_size(data, min_size=0, max_size=None, ):
if max_size == None:
max_size = sys.maxint
if hasattr(data, "__iter__"):
def _iter():
g=0
out=[]
for i, d in enumerate(data):
out.append(d)
if (i+1)%max_size==0:
yield g, out
g+=1
out=[]
if out:
yield g, out
return _iter()
elif not isinstance(data, Multiset):
return groupby_size(data, max_size)
else:
return groupby_Multiset(data, min_size, max_size)
class Cube():
def __init__(self, data=None, edges=None, name=None):
if isinstance(data, Cube): Log.error("do not know how to handle cubes yet")
columns = get_columns(data)
if edges == None:
self.edges = [{"name": "index", "domain": {"type": "numeric", "min": 0, "max": len(data), "interval": 1}}]
self.data = data
self.select = columns
return
self.name = name
self.edges = edges
self.select = Null
def get_columns(self):
return self.columns
class Domain():
def __init__(self):
pass
def part2key(self, part):
pass
def part2label(self, part):
pass
def part2value(self, part):
pass
# SIMPLE TUPLE-OF-STRINGS LOOKUP TO LIST
class Index(object):
def __init__(self, keys):
self._data = {}
self._keys = keys
self.count = 0
#THIS ONLY DEPENDS ON THE len(keys), SO WE COULD SHARED lookup
#BETWEEN ALL n-key INDEXES. FOR NOW, JUST MAKE lookup()
code = "def lookup(d0):\n"
for i, k in enumerate(self._keys):
code = code + indent(expand_template(
"for k{{next}}, d{{next}} in d{{curr}}.items():\n", {
"next": i + 1,
"curr": i
}), prefix=" ", indent=i + 1)
i = len(self._keys)
code = code + indent(expand_template(
"yield d{{curr}}", {"curr": i}), prefix=" ", indent=i + 1)
exec code
self.lookup = lookup
def __getitem__(self, key):
try:
if not isinstance(key, dict):
#WE WILL BE FORGIVING IF THE KEY IS NOT IN A LIST
if len(self._keys) > 1:
Log.error("Must be given an array of keys")
key = {self._keys[0]: key}
d = self._data
for k in self._keys:
v = key[k]
if v == None:
Log.error("can not handle when {{key}} == None", {"key": k})
if v not in d:
return Null
d = d[v]
if len(key) != len(self._keys):
#NOT A COMPLETE INDEXING, SO RETURN THE PARTIAL INDEX
output = Index(self._keys[-len(key):])
output._data = d
return output
except Exception, e:
Log.error("something went wrong", e)
def __setitem__(self, key, value):
Log.error("Not implemented")
def add(self, val):
if not isinstance(val, dict): val = {(self._keys[0], val)}
d = self._data
for k in self._keys[0:-1]:
v = val[k]
if v == None:
Log.error("can not handle when {{key}} == None", {"key": k})
if v not in d:
e = {}
d[v] = e
d = d[v]
v = val[self._keys[-1]]
if v in d:
Log.error("key already filled")
d[v] = val
self.count += 1
def __contains__(self, key):
return self[key] != None
def __iter__(self):
return self.lookup(self._data)
def __sub__(self, other):
output = Index(self._keys)
for v in self:
if v not in other: output.add(v)
return output
def __and__(self, other):
output = Index(self._keys)
for v in self:
if v in other: output.add(v)
return output
def __or__(self, other):
output = Index(self._keys)
for v in self: output.add(v)
for v in other: output.add(v)
return output
def __len__(self):
return self.count
def subtract(self, other):
return self.__sub__(other)
def intersect(self, other):
return self.__and__(other)

Просмотреть файл

@ -1,4 +0,0 @@
# encoding: utf-8
#
__author__ = 'klahnakoski'

Просмотреть файл

@ -1,162 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from ..logs import Log
from ..maths import Math
from ..multiset import Multiset
from ..stats import Z_moment, stats2z_moment, z_moment2stats
class AggregationFunction(object):
def __init__(self):
"""
RETURN A ZERO-STATE AGGREGATE
"""
Log.error("not implemented yet")
def add(self, value):
"""
ADD value TO AGGREGATE
"""
Log.error("not implemented yet")
def merge(self, agg):
"""
ADD TWO AGGREGATES TOGETHER
"""
Log.error("not implemented yet")
def end(self):
"""
RETURN AGGREGATE
"""
class WindowFunction(AggregationFunction):
def __init__(self):
"""
RETURN A ZERO-STATE AGGREGATE
"""
Log.error("not implemented yet")
def sub(self, value):
"""
REMOVE value FROM AGGREGATE
"""
Log.error("not implemented yet")
class Stats(WindowFunction):
def __init__(self):
object.__init__(self)
self.total=Z_moment(0,0,0)
def add(self, value):
if value == None:
return
self.total+=stats2z_moment(value)
def sub(self, value):
if value == None:
return
self.total-=stats2z_moment(value)
def merge(self, agg):
self.total+=agg.total
def end(self):
return z_moment2stats(self.total)
class Min(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = Multiset()
def add(self, value):
if value == None:
return
self.total.add(value)
def sub(self, value):
if value == None:
return
self.total.remove(value)
def end(self):
return Math.min(self.total)
class Max(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = Multiset()
def add(self, value):
if value == None:
return
self.total.add(value)
def sub(self, value):
if value == None:
return
self.total.remove(value)
def end(self):
return Math.max(self.total)
class Count(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = 0
def add(self, value):
if value == None:
return
self.total += 1
def sub(self, value):
if value == None:
return
self.total -= 1
def end(self):
return self.total
class Sum(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = 0
def add(self, value):
if value == None:
return
self.total += value
def sub(self, value):
if value == None:
return
self.total -= value
def end(self):
return self.total

Просмотреть файл

@ -1,31 +0,0 @@
# encoding: utf-8
#
import random
import string
SIMPLE_ALPHABET=string.ascii_letters + string.digits
SEED=random.Random()
class Random(object):
@staticmethod
def string(length, alphabet=SIMPLE_ALPHABET):
result = ''
for i in range(0, length):
result += SEED.choice(alphabet)
return result
@staticmethod
def hex(length):
return Random.string(length, string.digits + 'ABCDEF')
@staticmethod
def int(*args):
return random.randrange(*args)
@staticmethod
def sample(data, count):
num = len(data)
return [data[Random.int(num)] for i in range(count)]

Просмотреть файл

@ -1,77 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import argparse
import struct
from .struct import listwrap
from .cnv import CNV
from .logs import Log
from .files import File
#PARAMETERS MATCH argparse.ArgumentParser.add_argument()
#http://docs.python.org/dev/library/argparse.html#the-add-argument-method
#name or flags - Either a name or a list of option strings, e.g. foo or -f, --foo.
#action - The basic type of action to be taken when this argument is encountered at the command line.
#nargs - The number of command-line arguments that should be consumed.
#const - A constant value required by some action and nargs selections.
#default - The value produced if the argument is absent from the command line.
#type - The type to which the command-line argument should be converted.
#choices - A container of the allowable values for the argument.
#required - Whether or not the command-line option may be omitted (optionals only).
#help - A brief description of what the argument does.
#metavar - A name for the argument in usage messages.
#dest - The name of the attribute to be added to the object returned by parse_args().
def _argparse(defs):
parser = argparse.ArgumentParser()
for d in listwrap(defs):
args = d.copy()
name = args.name
args.name = None
parser.add_argument(*listwrap(name).list, **args.dict)
namespace=parser.parse_args()
output={k: getattr(namespace, k) for k in vars(namespace)}
return struct.wrap(output)
def read_settings(filename=None, defs=None):
# READ SETTINGS
if filename:
settings_file = File(filename)
if not settings_file.exists:
Log.error("Can not file settings file {{filename}}", {
"filename": settings_file.abspath
})
json = settings_file.read()
settings = CNV.JSON2object(json, flexible=True)
if defs:
settings.args = _argparse(defs)
return settings
else:
defs=listwrap(defs)
defs.append({
"name": ["--settings", "--settings-file", "--settings_file"],
"help": "path to JSON file with settings",
"type": str,
"dest": "filename",
"default": "./settings.json",
"required": False
})
args = _argparse(defs)
settings_file = File(args.filename)
if not settings_file.exists:
Log.error("Can not file settings file {{filename}}", {
"filename": settings_file.abspath
})
json = settings_file.read()
settings = CNV.JSON2object(json, flexible=True)
settings.args = args
return settings

Просмотреть файл

@ -1,162 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from math import sqrt
from .struct import nvl
from .logs import Log
DEBUG=True
EPSILON=0.000001
def stats2z_moment(stats):
# MODIFIED FROM http://statsmodels.sourceforge.net/devel/_modules/statsmodels/stats/moment_helpers.html
# ADDED count
# FIXED ERROR IN COEFFICIENTS
mc0, mc1, mc2, skew, kurt = (stats.count, stats.mean, stats.variance, stats.skew, stats.kurtosis)
mz0 = mc0
mz1 = mc1 * mc0
mz2 = (mc2 + mc1*mc1)*mc0
mc3 = skew*(mc2**1.5) # 3rd central moment
mz3 = (mc3 + 3*mc1*mc2 - mc1**3)*mc0 # 3rd non-central moment
mc4 = (kurt+3.0)*(mc2**2.0) # 4th central moment
mz4 = (mc4 + 4*mc1*mc3 + 6*mc1*mc1*mc2 + mc1**4) * mc0
m=Z_moment(stats.count, mz1, mz2, mz3, mz4)
if DEBUG:
v = z_moment2stats(m, unbiased=False)
if not closeEnough(v.count, stats.count): Log.error("convertion error")
if not closeEnough(v.mean, stats.mean): Log.error("convertion error")
if not closeEnough(v.variance, stats.variance):
Log.error("convertion error")
return m
def closeEnough(a, b):
if abs(a-b)<=EPSILON*(abs(a)+abs(b)+1): return True
return False
def z_moment2stats(z_moment, unbiased=True):
free=0
if unbiased: free=1
N=z_moment.S[0]
if N==0: return Stats()
return Stats(
count=N,
mean=z_moment.S[1] / N if N > 0 else float('nan'),
variance=(z_moment.S[2] - (z_moment.S[1] ** 2) / N) / (N - free) if N - free > 0 else float('nan'),
unbiased=unbiased
)
class Stats(object):
def __init__(self, **args):
if "count" not in args:
self.count=0
self.mean=0
self.variance=0
self.skew=0
self.kurtosis=0
elif "mean" not in args:
self.count=args["count"]
self.mean=0
self.variance=0
self.skew=0
self.kurtosis=0
elif "variance" not in args and "std" not in args:
self.count=args["count"]
self.mean=args["mean"]
self.variance=0
self.skew=0
self.kurtosis=0
elif "skew" not in args:
self.count=args["count"]
self.mean=args["mean"]
self.variance=args["variance"] if "variance" in args else args["std"]**2
self.skew=0
self.kurtosis=0
elif "kurtosis" not in args:
self.count=args["count"]
self.mean=args["mean"]
self.variance=args["variance"] if "variance" in args else args["std"]**2
self.skew=args["skew"]
self.kurtosis=0
else:
self.count=args["count"]
self.mean=args["mean"]
self.variance=args["variance"] if "variance" in args else args["std"]**2
self.skew=args["skew"]
self.kurtosis=args["kurtosis"]
self.unbiased=\
args["unbiased"] if "unbiased" in args else \
not args["biased"] if "biased" in args else \
False
@property
def std(self):
return sqrt(self.variance)
class Z_moment(object):
"""
ZERO-CENTERED MOMENTS
"""
def __init__(self, *args):
self.S=tuple(args)
def __add__(self, other):
return Z_moment(*map(add, self.S, other.S))
def __sub__(self, other):
return Z_moment(*map(sub, self.S, other.S))
@property
def tuple(self):
#RETURN AS ORDERED TUPLE
return self.S
@property
def dict(self):
#RETURN HASH OF SUMS
return {"s"+unicode(i): m for i, m in enumerate(self.S)}
@staticmethod
def new_instance(values=None):
if values == None: return Z_moment()
values=[float(v) for v in values if v != None]
return Z_moment(
len(values),
sum([n for n in values]),
sum([pow(n, 2) for n in values]),
sum([pow(n, 3) for n in values]),
sum([pow(n, 4) for n in values])
)
def add(a,b):
return nvl(a, 0)+nvl(b,0)
def sub(a,b):
return nvl(a, 0)-nvl(b,0)

Просмотреть файл

@ -1,133 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import re
from .jsons import json_encoder
import struct
from .struct import Struct
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
def indent(value, prefix=u"\t", indent=None):
if indent != None:
prefix=prefix*indent
try:
content=value.rstrip()
suffix=value[len(content):]
lines=content.splitlines()
return prefix+(u"\n"+prefix).join(lines)+suffix
except Exception, e:
raise Exception(u"Problem with indent of value ("+e.message+u")\n"+unicode(value))
def outdent(value):
try:
num=100
lines=value.splitlines()
for l in lines:
trim=len(l.lstrip())
if trim>0: num=min(num, len(l)-len(l.lstrip()))
return u"\n".join([l[num:] for l in lines])
except Exception, e:
from .logs import Log
Log.error("can not outdent value", e)
def between(value, prefix, suffix):
s = value.find(prefix)
if s==-1: return None
s+=len(prefix)
e=value.find(suffix, s)
if e==-1:
return None
s=value.rfind(prefix, 0, e)+len(prefix) #WE KNOW THIS EXISTS, BUT THERE MAY BE A RIGHT-MORE ONE
return value[s:e]
def right(value, len):
if len<=0: return u""
return value[-len:]
def find_first(value, find_arr, start=0):
i=len(value)
for f in find_arr:
temp=value.find(f, start)
if temp==-1: continue
i=min(i, temp)
if i==len(value): return -1
return i
pattern=re.compile(r"(\{\{[\w_\.]+\}\})")
def expand_template(template, values):
values=struct.wrap(values)
def replacer(found):
var=found.group(1)
try:
val=values[var[2:-2]]
val=toString(val)
return val
except Exception, e:
try:
if e.message.find(u"is not JSON serializable"):
#WORK HARDER
val=toString(val)
return val
except Exception:
raise Exception(u"Can not find "+var[2:-2]+u" in template:\n"+indent(template), e)
return pattern.sub(replacer, template)
def toString(val):
if isinstance(val, Struct):
return json_encoder.encode(val.dict, pretty=True)
elif isinstance(val, dict) or isinstance(val, list) or isinstance(val, set):
val=json_encoder.encode(val, pretty=True)
return val
return unicode(val)
def edit_distance(s1, s2):
"""
FROM http://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Levenshtein_distance#Python
LICENCE http://creativecommons.org/licenses/by-sa/3.0/
"""
if len(s1) < len(s2):
return edit_distance(s2, s1)
# len(s1) >= len(s2)
if len(s2) == 0:
return 1.0
previous_row = xrange(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1 # j+1 instead of j since previous_row and current_row are one character longer
deletions = current_row[j] + 1 # than s2
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return float(previous_row[-1])/len(s1)

Просмотреть файл

@ -1,343 +0,0 @@
# encoding: utf-8
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
SPECIAL=["keys", "values", "items", "iteritems", "dict", "copy"]
class Struct(dict):
"""
Struct is an anonymous class with some properties good for manipulating JSON
0) a.b==a["b"]
1) the IDE does tab completion, so my spelling mistakes get found at "compile time"
2) it deals with missing keys gracefully, so I can put it into set operations (database operations) without choking
2b) missing keys is important when dealing with JSON, which is often almost anything
3) also, which I hardly use, is storing JSON paths in a variable, so : a["b.c"]==a.b.c
MORE ON MISSING VALUES: http://www.numpy.org/NA-overview.html
IT ONLY CONSIDERS THE LEGITIMATE-FIELD-WITH-MISSING-VALUE (Statistical Null)
AND DOES NOT LOOK AT FIELD-DOES-NOT-EXIST-IN-THIS-CONTEXT (Database Null)
"""
def __init__(self, **kwargs):
dict.__init__(self)
object.__setattr__(self, "__dict__", kwargs) #map IS A COPY OF THE PARAMETERS
def __bool__(self):
return True
def __nonzero__(self):
return True
def __str__(self):
return dict.__str__(object.__getattribute__(self, "__dict__"))
def __getitem__(self, key):
d=object.__getattribute__(self, "__dict__")
if key.find(".")>=0:
key=key.replace("\.", "\a")
seq=[k.replace("\a", ".") for k in key.split(".")]
for n in seq:
d=d.get(n, None)
if d == None:
return Null
d=unwrap(d)
return wrap(d)
return wrap(d.get(key, Null))
def __setitem__(self, key, value):
try:
d=object.__getattribute__(self, "__dict__")
value=unwrap(value)
if key.find(".") == -1:
if value is None:
d.pop(key, None)
else:
d[key] = value
return self
key=key.replace("\.", "\a")
seq=[k.replace("\a", ".") for k in key.split(".")]
for k in seq[:-1]:
d = d[k]
if value == None:
d.pop(seq[-1], None)
else:
d[seq[-1]] = value
return self
except Exception, e:
raise e
def __getattribute__(self, key):
d=object.__getattribute__(self, "__dict__")
if key not in SPECIAL:
return wrap(d.get(key, Null))
#SOME dict FUNCTIONS
if key == "items":
def temp():
_is = dict.__getattribute__(d, "items")
return [(k, wrap(v)) for k, v in _is()]
return temp
if key == "iteritems":
#LOW LEVEL ITERATION
return d.iteritems
if key=="keys":
def temp():
k=dict.__getattribute__(d, "keys")
return set(k())
return temp
if key=="values":
def temp():
vs=dict.__getattribute__(d, "values")
return [wrap(v) for v in vs()]
return temp
if key=="dict":
return d
if key=="copy":
o = wrap({k: v for k, v in d.items()})
def output():
return o
return output
def __setattr__(self, key, value):
Struct.__setitem__(self, key, value)
# dict.__setattr__(self, unicode(key), value)
def __delitem__(self, key):
d=object.__getattribute__(self, "__dict__")
if key.find(".") == -1:
d.pop(key, None)
key=key.replace("\.", "\a")
seq=[k.replace("\a", ".") for k in key.split(".")]
for k in seq[:-1]:
d = d[k]
d.pop(seq[-1], None)
def keys(self):
d=object.__getattribute__(self, "__dict__")
return d.keys()
# KEEP TRACK OF WHAT ATTRIBUTES ARE REQUESTED, MAYBE SOME (BUILTIN) ARE STILL USEFUL
requested = set()
class NullStruct(object):
"""
Structural Null provides closure under the dot (.) operator
Null[x] == Null
Null.x == Null
"""
def __init__(self):
pass
def __bool__(self):
return False
def __nonzero__(self):
return False
def __gt__(self, other):
return False
def __ge__(self, other):
return False
def __le__(self, other):
return False
def __lt__(self, other):
return False
def __eq__(self, other):
return other is Null or other is None
def __ne__(self, other):
return other is not Null and other is not None
def __getitem__(self, key):
return self
def __len__(self):
return 0
def __iter__(self):
return ZeroList.__iter__()
def __getattribute__(self, key):
if key not in SPECIAL:
return Null
#SOME dict FUNCTIONS
if key == "items":
def temp():
return ZeroList
return temp
if key == "iteritems":
#LOW LEVEL ITERATION
return self.__iter__()
if key=="keys":
def temp():
return ZeroList
return temp
if key=="values":
def temp():
return ZeroList
return temp
if key=="dict":
return Null
if key=="copy":
#THE INTENT IS USUALLY PREPARE FOR UPDATES
def output():
return Struct()
return output
def keys(self):
return set()
def __str__(self):
return "None"
Null = NullStruct()
ZeroList=[]
class StructList(list):
def __init__(self, vals=None):
""" USE THE vals, NOT A COPY """
list.__init__(self)
if vals == None:
self.list=[]
elif isinstance(vals, StructList):
self.list=vals.list
else:
self.list=vals
def __getitem__(self, index):
if index < 0 or len(self.list) <= index:
return Null
return wrap(self.list[index])
def __setitem__(self, i, y):
self.list[i]=unwrap(y)
def __iter__(self):
i=self.list.__iter__()
while True:
yield wrap(i.next())
def append(self, val):
self.list.append(unwrap(val))
return self
def __str__(self):
return self.list.__str__()
def __len__(self):
return self.list.__len__()
def __getslice__(self, i, j):
return wrap(self.list[i:j])
def remove(self, x):
self.list.remove(x)
return self
def extend(self, values):
for v in values:
self.list.append(unwrap(v))
return self
def wrap(v):
if v == None:
return Null
if isinstance(v, (Struct, StructList)):
return v
if isinstance(v, dict):
m = Struct()
object.__setattr__(m, "__dict__", v) #INJECT m.__dict__=v SO THERE IS NO COPY
return m
if isinstance(v, list):
return StructList(v)
return v
def unwrap(v):
if isinstance(v, Struct):
return object.__getattribute__(v, "__dict__")
if isinstance(v, StructList):
return v.list
if v == None:
return None
return v
def inverse(d):
"""
reverse the k:v pairs
"""
output = {}
for k, v in unwrap(d).iteritems():
output[v] = output.get(v, [])
output[v].append(k)
return wrap(output)
def nvl(*args):
#pick the first none-null value
for a in args:
if a != None:
return a
return Null
def listwrap(value):
"""
OFTEN IT IS NICE TO ALLOW FUNCTION PARAMETERS TO BE ASSIGNED A VALUE,
OR A list-OF-VALUES, OR NULL. CHECKING FOR THIS IS TEDIOUS AND WE WANT TO CAST
FROM THOSE THREE CASES TO THE SINGLE CASE OF A LIST
Null -> []
value -> [value]
[...] -> [...] (unchanged list)
#BEFORE
if a is not None:
if not isinstance(a, list):
a=[a]
for x in a:
#do something
#AFTER
for x in listwrap(a):
#do something
"""
if value == None:
return []
elif isinstance(value, list):
return wrap(value)
else:
return wrap([value])

Просмотреть файл

@ -1,365 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime, timedelta
import threading
import thread
import time
DEBUG = True
class Lock(object):
"""
SIMPLE LOCK (ACTUALLY, A PYTHON threadind.Condition() WITH notify() BEFORE EVERY RELEASE)
"""
def __init__(self, name=""):
self.monitor=threading.Condition()
self.name=name
def __enter__(self):
self.monitor.acquire()
return self
def __exit__(self, a, b, c):
self.monitor.notify()
self.monitor.release()
def wait(self, timeout=None, till=None):
if till:
timeout=(datetime.utcnow()-till).total_seconds()
if timeout<0:
return
self.monitor.wait(timeout=timeout)
def notify_all(self):
self.monitor.notify_all()
# SIMPLE MESSAGE QUEUE, multiprocessing.Queue REQUIRES SERIALIZATION, WHICH IS HARD TO USE JUST BETWEEN THREADS
class Queue(object):
def __init__(self):
self.keep_running=True
self.lock=Lock("lock for queue")
self.queue=[]
def __iter__(self):
while self.keep_running:
try:
value=self.pop()
if value!=Thread.STOP:
yield value
except Exception, e:
from .logs import Log
Log.warning("Tell me about what happend here", e)
def add(self, value):
with self.lock:
if self.keep_running:
self.queue.append(value)
return self
def extend(self, values):
with self.lock:
if self.keep_running:
self.queue.extend(values)
def __len__(self):
with self.lock:
return len(self.queue)
def pop(self):
with self.lock:
while self.keep_running:
if self.queue:
value=self.queue.pop(0)
if value==Thread.STOP: #SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION
self.keep_running=False
return value
self.lock.wait()
return Thread.STOP
def pop_all(self):
"""
NON-BLOCKING POP ALL IN QUEUE, IF ANY
"""
with self.lock:
if not self.keep_running:
return [Thread.STOP]
if not self.queue:
return []
for v in self.queue:
if v == Thread.STOP: #SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION
self.keep_running = False
output = list(self.queue)
del self.queue[:] #CLEAR
return output
def close(self):
with self.lock:
self.keep_running=False
class AllThread(object):
"""
RUN ALL ADDED FUNCTIONS IN PARALLEL, BE SURE TO HAVE JOINED BEFORE EXIT
"""
def __init__(self):
self.threads=[]
def __enter__(self):
return self
#WAIT FOR ALL QUEUED WORK TO BE DONE BEFORE RETURNING
def __exit__(self, type, value, traceback):
self.join()
def join(self):
exceptions=[]
try:
for t in self.threads:
response=t.join()
if "exception" in response:
exceptions.append(response["exception"])
except Exception, e:
from .logs import Log
Log.warning("Problem joining", e)
if exceptions:
from .logs import Log
Log.error("Problem in child threads", exceptions)
def add(self, target, *args, **kwargs):
"""
target IS THE FUNCTION TO EXECUTE IN THE THREAD
"""
t=Thread.run(target, *args, **kwargs)
self.threads.append(t)
class Thread(object):
"""
join() ENHANCED TO ALLOW CAPTURE OF CTRL-C, AND RETURN POSSIBLE THREAD EXCEPTIONS
run() ENHANCED TO CAPTURE EXCEPTIONS
"""
num_threads=0
STOP="stop"
TIMEOUT="TIMEOUT"
def __init__(self, name, target, *args, **kwargs):
self.name = name
self.target = target
self.response = None
self.synch_lock=Lock()
self.args = args
#ENSURE THERE IS A SHARED please_stop SIGNAL
self.kwargs = kwargs.copy()
self.kwargs["please_stop"]=self.kwargs.get("please_stop", Signal())
self.please_stop=self.kwargs["please_stop"]
self.stopped=Signal()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if isinstance(type, BaseException):
self.please_stop.go()
# TODO: AFTER A WHILE START KILLING THREAD
self.join()
def start(self):
try:
self.thread=thread.start_new_thread(Thread._run, (self, ))
except Exception, e:
from .logs import Log
Log.error("Can not start thread", e)
def stop(self):
self.please_stop.go()
def _run(self):
try:
if self.target is not None:
response=self.target(*self.args, **self.kwargs)
with self.synch_lock:
self.response={"response":response}
except Exception, e:
with self.synch_lock:
self.response={"exception":e}
from .logs import Log
Log.error("Problem in thread", e)
finally:
self.stopped.go()
del self.target, self.args, self.kwargs
def is_alive(self):
return not self.stopped
def join(self, timeout=None, till=None):
"""
RETURN THE RESULT OF THE THREAD EXECUTION (INCLUDING EXCEPTION)
"""
if not till and timeout:
till=datetime.utcnow()+timedelta(seconds=timeout)
if till is None:
while True:
with self.synch_lock:
for i in range(10):
if self.stopped:
return self.response
self.synch_lock.wait(0.5)
from .logs import Log
if DEBUG:
Log.note("Waiting on thread {{thread}}", {"thread":self.name})
else:
self.stopped.wait_for_go(till=till)
if self.stopped:
return self.response
else:
from logs import Except
raise Except(type=Thread.TIMEOUT)
@staticmethod
def run(target, *args, **kwargs):
#ENSURE target HAS please_stop ARGUMENT
if "please_stop" not in target.__code__.co_varnames:
from logs import Log
Log.error("function must have please_stop argument for signalling emergency shutdown")
if hasattr(target, "func_name") and target.func_name != "<lambda>":
name = "thread-" + str(Thread.num_threads) + " (" + target.func_name + ")"
else:
name = "thread-" + str(Thread.num_threads)
Thread.num_threads += 1
output = Thread(name, target, *args, **kwargs)
output.start()
return output
@staticmethod
def sleep(seconds=None, till=None):
if seconds is not None:
time.sleep(seconds)
if till is not None:
duration = (till - datetime.utcnow()).total_seconds()
if duration > 0:
time.sleep(duration)
class Signal(object):
"""
SINGLE-USE THREAD SAFE SIGNAL
"""
def __init__(self):
self.lock = Lock()
self._go = False
self.job_queue=[]
def __bool__(self):
with self.lock:
return self._go
def __nonzero__(self):
with self.lock:
return self._go
def wait_for_go(self, timeout=None, till=None):
with self.lock:
while not self._go:
self.lock.wait(timeout=timeout, till=till)
return True
def go(self):
with self.lock:
if self._go:
return
self._go = True
jobs=self.job_queue
self.job_queue=[]
self.lock.notify_all()
for j in jobs:
j()
def is_go(self):
with self.lock:
return self._go
def on_go(self, target):
"""
RUN target WHEN SIGNALED
"""
with self.lock:
if self._go:
target()
else:
self.job_queue.append(target)
class ThreadedQueue(Queue):
"""
TODO: Check that this queue is not dropping items at shutdown
DISPATCH TO ANOTHER (SLOWER) queue IN BATCHES OF GIVEN size
"""
def __init__(self, queue, size):
Queue.__init__(self)
def push_to_queue(please_stop):
please_stop.on_go(lambda : self.add(Thread.STOP))
#output_queue IS A MULTI-THREADED QUEUE, SO THIS WILL BLOCK UNTIL THE 5K ARE READY
from .queries import Q
for i, g in Q.groupby(self, size=size):
queue.extend(g)
if please_stop:
from logs import Log
Log.warning("ThreadedQueue stopped early, with {{num}} items left in queue", {
"num":len(self)
})
return
self.thread=Thread.run(push_to_queue)
def __enter__(self):
return self
def __exit__(self, a, b, c):
self.add(Thread.STOP)
if isinstance(b, BaseException):
self.thread.please_stop.go()
self.thread.join()

Просмотреть файл

@ -1,49 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import time
from .strings import expand_template
from .logs import Log
class Timer:
"""
USAGE:
with Timer("doing hard time"):
something_that_takes_long()
OUTPUT:
doing hard time took 45.468 sec
"""
def __init__(self, description, param=None):
self.description=expand_template(description, param) #WE WOULD LIKE TO KEEP THIS TEMPLATE, AND PASS IT TO THE LOGGER ON __exit__(), WE FAKE IT FOR NOW
def __enter__(self):
Log.note("Timer start: {{description}}", {
"description":self.description
})
self.start = time.clock()
return self
def __exit__(self, type, value, traceback):
self.end = time.clock()
self.interval = self.end - self.start
Log.note("Timer end : {{description}} (took {{duration}} sec)", {
"description":self.description,
"duration":round(self.interval, 3)
})
@property
def duration(self):
return self.interval

284
spam.log Normal file
Просмотреть файл

@ -0,0 +1,284 @@
2013-11-06 18:30:45,719 - esFrontLine - WARNING - processing problem
2013-11-06 18:30:45,721 - esFrontLine - ERROR - 'dict' object has no attribute 'host'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 59, in catch_all
es.host + ":" + str(es.port) + "/" + path,
AttributeError: 'dict' object has no attribute 'host'
2013-11-06 18:30:45,835 - esFrontLine - WARNING - processing problem
2013-11-06 18:30:45,838 - esFrontLine - WARNING - processing problem
2013-11-06 18:30:45,851 - esFrontLine - WARNING - processing problem
2013-11-06 18:30:45,855 - esFrontLine - ERROR - 'dict' object has no attribute 'host'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 59, in catch_all
es.host + ":" + str(es.port) + "/" + path,
AttributeError: 'dict' object has no attribute 'host'
2013-11-06 18:30:45,859 - esFrontLine - ERROR - 'dict' object has no attribute 'host'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 59, in catch_all
es.host + ":" + str(es.port) + "/" + path,
AttributeError: 'dict' object has no attribute 'host'
2013-11-06 18:30:45,868 - esFrontLine - ERROR - 'dict' object has no attribute 'host'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 59, in catch_all
es.host + ":" + str(es.port) + "/" + path,
AttributeError: 'dict' object has no attribute 'host'
2013-11-06 18:30:45,884 - esFrontLine - WARNING - processing problem
2013-11-06 18:30:45,904 - esFrontLine - ERROR - 'dict' object has no attribute 'host'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 59, in catch_all
es.host + ":" + str(es.port) + "/" + path,
AttributeError: 'dict' object has no attribute 'host'
2013-11-06 18:30:46,092 - esFrontLine - WARNING - processing problem
2013-11-06 18:30:46,095 - esFrontLine - ERROR - 'dict' object has no attribute 'host'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 59, in catch_all
es.host + ":" + str(es.port) + "/" + path,
AttributeError: 'dict' object has no attribute 'host'
2013-11-06 18:30:46,193 - esFrontLine - WARNING - processing problem
2013-11-06 18:30:46,194 - esFrontLine - WARNING - processing problem
2013-11-06 18:30:46,207 - esFrontLine - ERROR - 'dict' object has no attribute 'host'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 59, in catch_all
es.host + ":" + str(es.port) + "/" + path,
AttributeError: 'dict' object has no attribute 'host'
2013-11-06 18:30:46,209 - esFrontLine - ERROR - 'dict' object has no attribute 'host'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 59, in catch_all
es.host + ":" + str(es.port) + "/" + path,
AttributeError: 'dict' object has no attribute 'host'
2013-11-06 18:32:10,369 - esFrontLine - WARNING - processing problem
2013-11-06 18:32:10,371 - esFrontLine - ERROR - HTTPConnectionPool(host='elasticsearch8.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 63, in catch_all
timeout=90
File "C:\Python27\lib\site-packages\requests\api.py", line 55, in get
return request('get', url, **kwargs)
File "C:\Python27\lib\site-packages\requests\api.py", line 44, in request
return session.request(method=method, url=url, **kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 335, in request
resp = self.send(prep, **send_kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 438, in send
r = adapter.send(request, **kwargs)
File "C:\Python27\lib\site-packages\requests\adapters.py", line 327, in send
raise ConnectionError(e)
ConnectionError: HTTPConnectionPool(host='elasticsearch8.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
2013-11-06 18:32:10,378 - esFrontLine - WARNING - processing problem
2013-11-06 18:32:10,378 - esFrontLine - WARNING - processing problem
2013-11-06 18:32:10,427 - esFrontLine - WARNING - processing problem
2013-11-06 18:32:10,516 - esFrontLine - WARNING - processing problem
2013-11-06 18:32:13,124 - esFrontLine - ERROR - HTTPConnectionPool(host='elasticsearch4.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /org_chart/person/_mapping (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 63, in catch_all
timeout=90
File "C:\Python27\lib\site-packages\requests\api.py", line 55, in get
return request('get', url, **kwargs)
File "C:\Python27\lib\site-packages\requests\api.py", line 44, in request
return session.request(method=method, url=url, **kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 335, in request
resp = self.send(prep, **send_kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 438, in send
r = adapter.send(request, **kwargs)
File "C:\Python27\lib\site-packages\requests\adapters.py", line 327, in send
raise ConnectionError(e)
ConnectionError: HTTPConnectionPool(host='elasticsearch4.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /org_chart/person/_mapping (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
2013-11-06 18:32:13,127 - esFrontLine - ERROR - HTTPConnectionPool(host='elasticsearch5.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/bug_version/_mapping (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 63, in catch_all
timeout=90
File "C:\Python27\lib\site-packages\requests\api.py", line 55, in get
return request('get', url, **kwargs)
File "C:\Python27\lib\site-packages\requests\api.py", line 44, in request
return session.request(method=method, url=url, **kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 335, in request
resp = self.send(prep, **send_kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 438, in send
r = adapter.send(request, **kwargs)
File "C:\Python27\lib\site-packages\requests\adapters.py", line 327, in send
raise ConnectionError(e)
ConnectionError: HTTPConnectionPool(host='elasticsearch5.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/bug_version/_mapping (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
2013-11-06 18:32:13,187 - esFrontLine - ERROR - HTTPConnectionPool(host='elasticsearch8.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 63, in catch_all
timeout=90
File "C:\Python27\lib\site-packages\requests\api.py", line 55, in get
return request('get', url, **kwargs)
File "C:\Python27\lib\site-packages\requests\api.py", line 44, in request
return session.request(method=method, url=url, **kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 335, in request
resp = self.send(prep, **send_kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 438, in send
r = adapter.send(request, **kwargs)
File "C:\Python27\lib\site-packages\requests\adapters.py", line 327, in send
raise ConnectionError(e)
ConnectionError: HTTPConnectionPool(host='elasticsearch8.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
2013-11-06 18:32:13,190 - esFrontLine - ERROR - HTTPConnectionPool(host='elasticsearch5.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 63, in catch_all
timeout=90
File "C:\Python27\lib\site-packages\requests\api.py", line 55, in get
return request('get', url, **kwargs)
File "C:\Python27\lib\site-packages\requests\api.py", line 44, in request
return session.request(method=method, url=url, **kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 335, in request
resp = self.send(prep, **send_kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 438, in send
r = adapter.send(request, **kwargs)
File "C:\Python27\lib\site-packages\requests\adapters.py", line 327, in send
raise ConnectionError(e)
ConnectionError: HTTPConnectionPool(host='elasticsearch5.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
2013-11-06 18:32:34,512 - esFrontLine - WARNING - processing problem
2013-11-06 18:32:34,519 - esFrontLine - WARNING - processing problem
2013-11-06 18:32:34,529 - esFrontLine - WARNING - processing problem
2013-11-06 18:32:34,536 - esFrontLine - ERROR - HTTPConnectionPool(host='elasticsearch5.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /org_chart/person/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 63, in catch_all
timeout=90
File "C:\Python27\lib\site-packages\requests\api.py", line 55, in get
return request('get', url, **kwargs)
File "C:\Python27\lib\site-packages\requests\api.py", line 44, in request
return session.request(method=method, url=url, **kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 335, in request
resp = self.send(prep, **send_kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 438, in send
r = adapter.send(request, **kwargs)
File "C:\Python27\lib\site-packages\requests\adapters.py", line 327, in send
raise ConnectionError(e)
ConnectionError: HTTPConnectionPool(host='elasticsearch5.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /org_chart/person/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
2013-11-06 18:32:34,556 - esFrontLine - ERROR - HTTPConnectionPool(host='elasticsearch5.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/bug_version/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 63, in catch_all
timeout=90
File "C:\Python27\lib\site-packages\requests\api.py", line 55, in get
return request('get', url, **kwargs)
File "C:\Python27\lib\site-packages\requests\api.py", line 44, in request
return session.request(method=method, url=url, **kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 335, in request
resp = self.send(prep, **send_kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 438, in send
r = adapter.send(request, **kwargs)
File "C:\Python27\lib\site-packages\requests\adapters.py", line 327, in send
raise ConnectionError(e)
ConnectionError: HTTPConnectionPool(host='elasticsearch5.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/bug_version/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
2013-11-06 18:32:34,565 - esFrontLine - ERROR - HTTPConnectionPool(host='elasticsearch8.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/bug_version/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 63, in catch_all
timeout=90
File "C:\Python27\lib\site-packages\requests\api.py", line 55, in get
return request('get', url, **kwargs)
File "C:\Python27\lib\site-packages\requests\api.py", line 44, in request
return session.request(method=method, url=url, **kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 335, in request
resp = self.send(prep, **send_kwargs)
File "C:\Python27\lib\site-packages\requests\sessions.py", line 438, in send
r = adapter.send(request, **kwargs)
File "C:\Python27\lib\site-packages\requests\adapters.py", line 327, in send
raise ConnectionError(e)
ConnectionError: HTTPConnectionPool(host='elasticsearch8.metrics.scl3.mozilla.com', port=9200): Max retries exceeded with url: /bugs/bug_version/_search (Caused by <class 'socket.error'>: [Errno 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond)
2013-11-06 18:33:30,482 - esFrontLine - WARNING - processing problem
2013-11-06 18:33:30,484 - esFrontLine - ERROR - 'Logger' object has no attribute 'note'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 69, in catch_all
logger.note("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
AttributeError: 'Logger' object has no attribute 'note'
2013-11-06 18:33:30,586 - esFrontLine - WARNING - processing problem
2013-11-06 18:33:30,588 - esFrontLine - ERROR - 'Logger' object has no attribute 'note'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 69, in catch_all
logger.note("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
AttributeError: 'Logger' object has no attribute 'note'
2013-11-06 18:33:30,872 - esFrontLine - WARNING - processing problem
2013-11-06 18:33:30,875 - esFrontLine - ERROR - 'Logger' object has no attribute 'note'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 69, in catch_all
logger.note("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
AttributeError: 'Logger' object has no attribute 'note'
2013-11-06 18:33:31,052 - esFrontLine - WARNING - processing problem
2013-11-06 18:33:31,053 - esFrontLine - ERROR - 'Logger' object has no attribute 'note'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 69, in catch_all
logger.note("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
AttributeError: 'Logger' object has no attribute 'note'
2013-11-06 18:33:31,295 - esFrontLine - WARNING - processing problem
2013-11-06 18:33:31,295 - esFrontLine - WARNING - processing problem
2013-11-06 18:33:31,299 - esFrontLine - ERROR - 'Logger' object has no attribute 'note'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 69, in catch_all
logger.note("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
AttributeError: 'Logger' object has no attribute 'note'
2013-11-06 18:33:31,301 - esFrontLine - ERROR - 'Logger' object has no attribute 'note'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 69, in catch_all
logger.note("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
AttributeError: 'Logger' object has no attribute 'note'
2013-11-06 18:33:32,135 - esFrontLine - WARNING - processing problem
2013-11-06 18:33:32,138 - esFrontLine - ERROR - 'Logger' object has no attribute 'note'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 69, in catch_all
logger.note("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
AttributeError: 'Logger' object has no attribute 'note'
2013-11-06 18:33:32,219 - esFrontLine - WARNING - processing problem
2013-11-06 18:33:32,223 - esFrontLine - ERROR - 'Logger' object has no attribute 'note'
Traceback (most recent call last):
File "C:/Users/klahnakoski/git/esFrontLine/esFrontLine/app.py", line 69, in catch_all
logger.note("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
AttributeError: 'Logger' object has no attribute 'note'
2013-11-06 18:34:51,342 - esFrontLine - DEBUG - path: org_chart/person/_mapping, request bytes=0, response bytes=143
2013-11-06 18:34:51,451 - esFrontLine - DEBUG - path: bugs/bug_version/_mapping, request bytes=0, response bytes=4929
2013-11-06 18:34:51,650 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=550
2013-11-06 18:34:51,750 - esFrontLine - DEBUG - path: org_chart/person/_search, request bytes=1353, response bytes=25844
2013-11-06 18:34:51,757 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=550
2013-11-06 18:34:51,875 - esFrontLine - DEBUG - path: bugs/_search, request bytes=480, response bytes=1124
2013-11-06 18:34:53,125 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=258, response bytes=282
2013-11-06 18:34:53,247 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=574, response bytes=10427
2013-11-06 18:35:09,164 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=708, response bytes=317858
2013-11-06 18:35:10,023 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=1214, response bytes=459
2013-11-06 18:35:50,032 - esFrontLine - DEBUG - path: org_chart/person/_mapping, request bytes=0, response bytes=143
2013-11-06 18:35:50,043 - esFrontLine - DEBUG - path: bugs/bug_version/_mapping, request bytes=0, response bytes=4929
2013-11-06 18:35:50,460 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=550
2013-11-06 18:35:50,490 - esFrontLine - DEBUG - path: org_chart/person/_search, request bytes=1353, response bytes=25847
2013-11-06 18:35:50,700 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=551
2013-11-06 18:35:50,809 - esFrontLine - DEBUG - path: bugs/_search, request bytes=480, response bytes=1122
2013-11-06 18:35:51,509 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=574, response bytes=10429
2013-11-06 18:35:51,641 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=258, response bytes=282
2013-11-06 18:36:18,509 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=708, response bytes=317848
2013-11-06 18:36:19,234 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=1214, response bytes=459
2013-11-06 18:37:07,921 - esFrontLine - DEBUG - path: org_chart/person/_mapping, request bytes=0, response bytes=143
2013-11-06 18:37:07,966 - esFrontLine - DEBUG - path: bugs/bug_version/_mapping, request bytes=0, response bytes=4929
2013-11-06 18:37:08,276 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=550
2013-11-06 18:37:08,278 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=550
2013-11-06 18:37:08,400 - esFrontLine - DEBUG - path: bugs/_search, request bytes=480, response bytes=1124
2013-11-06 18:37:10,973 - esFrontLine - DEBUG - path: org_chart/person/_search, request bytes=1353, response bytes=25846
2013-11-06 18:37:11,311 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=574, response bytes=10426
2013-11-06 18:37:12,223 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=258, response bytes=281
2013-11-06 18:37:33,994 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=708, response bytes=317851
2013-11-06 18:37:34,657 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=1214, response bytes=460
2013-11-06 18:37:40,335 - esFrontLine - DEBUG - path: bugs/bug_version/_mapping, request bytes=0, response bytes=4929
2013-11-06 18:37:40,395 - esFrontLine - DEBUG - path: org_chart/person/_mapping, request bytes=0, response bytes=143
2013-11-06 18:37:40,720 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=550
2013-11-06 18:37:41,786 - esFrontLine - DEBUG - path: org_chart/person/_search, request bytes=1353, response bytes=25844
2013-11-06 18:37:41,911 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=550
2013-11-06 18:37:42,052 - esFrontLine - DEBUG - path: bugs/_search, request bytes=480, response bytes=1121
2013-11-06 18:37:42,904 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=258, response bytes=283
2013-11-06 18:37:43,105 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=574, response bytes=10429
2013-11-06 18:38:08,002 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=549
2013-11-06 18:38:08,729 - esFrontLine - DEBUG - path: org_chart/person/_mapping, request bytes=0, response bytes=143
2013-11-06 18:38:08,736 - esFrontLine - DEBUG - path: bugs/bug_version/_mapping, request bytes=0, response bytes=4929
2013-11-06 18:38:09,299 - esFrontLine - DEBUG - path: org_chart/person/_search, request bytes=1353, response bytes=25846
2013-11-06 18:38:09,463 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=549
2013-11-06 18:38:09,651 - esFrontLine - DEBUG - path: bugs/_search, request bytes=480, response bytes=1123
2013-11-06 18:38:10,642 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=258, response bytes=282
2013-11-06 18:38:10,953 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=574, response bytes=10427
2013-11-06 18:38:12,779 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=1214, response bytes=459
2013-11-06 18:38:13,026 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=708, response bytes=317848
2013-11-06 18:38:31,868 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=708, response bytes=317848
2013-11-06 18:38:33,217 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=1214, response bytes=458
2013-11-06 18:38:39,674 - esFrontLine - DEBUG - path: org_chart/person/_mapping, request bytes=0, response bytes=143
2013-11-06 18:38:39,680 - esFrontLine - DEBUG - path: bugs/bug_version/_mapping, request bytes=0, response bytes=4929
2013-11-06 18:38:39,875 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=550
2013-11-06 18:38:44,707 - esFrontLine - DEBUG - path: bugs/_search, request bytes=4409, response bytes=550
2013-11-06 18:38:44,858 - esFrontLine - DEBUG - path: org_chart/person/_search, request bytes=1353, response bytes=25847
2013-11-06 18:38:44,867 - esFrontLine - DEBUG - path: bugs/_search, request bytes=480, response bytes=1122
2013-11-06 18:38:45,006 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=574, response bytes=10428
2013-11-06 18:38:45,802 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=258, response bytes=282
2013-11-06 18:39:07,051 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=708, response bytes=317849
2013-11-06 18:39:07,973 - esFrontLine - DEBUG - path: bugs/bug_version/_search, request bytes=1214, response bytes=459