Merge pull request #1 from klahnakoski/remove_util

Remove util
This commit is contained in:
Kyle Lahnakoski 2013-11-14 05:00:00 -08:00
Родитель 331ebd7518 c37402240d
Коммит 1fb5c13fe6
28 изменённых файлов: 141 добавлений и 4370 удалений

6
.gitignore поставляемый
Просмотреть файл

@ -5,4 +5,8 @@ src
settings.json
resources/logs
esFrontLine/util/.svn
results/*
build
dist
*.egg-info
projectFilesBackup
results

17
MANIFEST.in Normal file
Просмотреть файл

@ -0,0 +1,17 @@
graft tests
graft docs
prune .git
prune .svn
prune build
prune dist
prune .idea
prune results
exclude *.egg-info
exclude *.iml
exclude *.txt
exclude README.md
exclude *.json
exclude .git*
recursive-exclude . *.pyc
include README.txt
exclude MANIFEST.in

Просмотреть файл

@ -16,20 +16,16 @@ Install
I will assume you have Python installed (if not, here are [Windows7 instructions](https://github.com/klahnakoski/pyLibrary#windows-7-install-instructions-))
git clone https://github.com/klahnakoski/esFrontLine.git
cd esFrontLine
pip install -r requirements.txt
pip install esFrontLine
Setup
-----
You must right your own setting.jason file with the following properties set:
You must write your own setting.json file with the following properties set:
* **elasticsearch** - (Array of) ElasticSearch host pointers
* **elasticsearch** - (Array of) ElasticSearch nodes
* **elasticsearch.host** - URL of the ElasticSearch cluster that will accept query requests
* **elasticsearch.host** - URL of the ElasticSearch node that will accept query requests
* **elasticsearch.port** - port for ES (default = 9200)
@ -62,13 +58,11 @@ Here is an example of my ```settings.json``` file
},
"debug":{
"log":[{
"class": "logging.handlers.RotatingFileHandler",
"filename": "./results/logs/app.log",
"maxBytes": 10000000,
"backupCount": 200,
"encoding": "utf8"
},{
"class":"esFrontLine.util.logs.Log_usingStream",
"stream":"sys.stdout"
}]
}

Просмотреть файл

@ -7,18 +7,17 @@
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from flask import Flask
import argparse
import codecs
import logging
from logging.handlers import RotatingFileHandler
import os
import random
from flask import Flask, json
import flask
import requests
from werkzeug.contrib.fixers import HeaderRewriterFix
from werkzeug.exceptions import abort
from esFrontLine.util import struct
from esFrontLine.util.randoms import Random
from esFrontLine.util import startup
from esFrontLine.util.cnv import CNV
from esFrontLine.util.logs import Log
app = Flask(__name__)
@ -26,11 +25,20 @@ app = Flask(__name__)
def stream(raw_response):
while True:
block = raw_response.read(amt=65536, decode_content=False)
if len(block) == 0:
if not block:
return
yield block
def listwrap(value):
if value is None:
return []
elif isinstance(value, list):
return value
else:
return [value]
@app.route('/', defaults={'path': ''}, methods=['GET', 'POST'])
@app.route('/<path:path>', methods=['GET', 'POST'])
def catch_all(path):
@ -39,12 +47,12 @@ def catch_all(path):
filter(path, data)
#PICK RANDOM ES
es = Random.sample(struct.listwrap(settings.elasticsearch), 1)[0]
es = random.choice(listwrap(settings["elasticsearch"]))
## SEND REQUEST
headers = {'content-type': 'application/json'}
response = requests.get(
es.host + ":" + str(es.port) + "/" + path,
es["host"] + ":" + str(es["port"]) + "/" + path,
data=data,
stream=True, #FOR STREAMING
headers=headers,
@ -54,14 +62,13 @@ def catch_all(path):
# ALLOW CROSS DOMAIN (BECAUSE ES IS USUALLY NOT ON SAME SERVER AS PAGE)
outbound_header = dict(response.headers)
outbound_header["access-control-allow-origin"] = "*"
Log.println("path: {{path}}, request bytes={{request_content_length}}, response bytes={{response_content_length}}", {
"path": path,
"request_headers": dict(response.headers),
"request_content_length": len(data),
"response_headers": outbound_header,
"response_content_length": outbound_header["content-length"]
})
logger.debug("path: {path}, request bytes={request_content_length}, response bytes={response_content_length}".format(
path=path,
# request_headers=dict(response.headers),
request_content_length=len(data),
# response_headers=outbound_header,
response_content_length=outbound_header["content-length"]
))
## FORWARD RESPONSE
return flask.wrappers.Response(
@ -71,9 +78,11 @@ def catch_all(path):
headers=outbound_header
)
except Exception, e:
Log.warning("processing problem", e)
logger.warning("processing problem")
logger.exception(e.message)
abort(400)
def filter(path_string, query):
"""
THROW EXCEPTION IF THIS IS NOT AN ElasticSearch QUERY
@ -81,23 +90,28 @@ def filter(path_string, query):
try:
path = path_string.split("/")
## EXPECTING {index_name} "/" {type_name} "/" {_id}
## EXPECTING {index_name} "/" {type_name} "/_search"
## EXPECTING {index_name} "/_search"
if len(path) not in [2, 3]:
Log.error("request must be of form: {index_name} \"/\" {type_name} \"/_search\" ")
if path[-1] not in ["_mapping", "_search"]:
Log.error("request path must end with _mapping or _search")
if len(path) == 2:
if path[-1] not in ["_mapping", "_search"]:
raise Exception("request path must end with _mapping or _search")
elif len(path) == 3:
pass #OK
else:
raise Exception('request must be of form: {index_name} "/" {type_name} "/_search" ')
## EXPECTING THE QUERY TO AT LEAST HAVE .query ATTRIBUTE
if path[-1] == "_search" and CNV.JSON2object(query).query is None:
Log.error("_search must have query")
if path[-1] == "_search" and json.loads(query).get("query", None) is None:
raise Exception("_search must have query")
## NO CONTENT ALLOWED WHEN ASKING FOR MAPPING
if path[-1] == "_mapping" and len(query) > 0:
Log.error("Can not provide content when requesting _mapping")
raise Exception("Can not provide content when requesting _mapping")
except Exception, e:
Log.error("Not allowed: {{path}}:\n{{query}}", {"path": path_string, "query": query}, e)
logger.exception(e.message)
raise Exception("Not allowed: {path}:\n{query}".format(path=path_string, query=query))
# Snagged from http://stackoverflow.com/questions/10999990/python-flask-how-to-get-whole-raw-post-body
@ -132,13 +146,47 @@ class WSGICopyBody(object):
app.wsgi_app = WSGICopyBody(app.wsgi_app)
logger = None
if __name__ == '__main__':
try:
settings = startup.read_settings()
Log.start(settings.debug)
app.run(**settings.flask.dict)
app = HeaderRewriterFix(app, remove_headers=['Date', 'Server'])
finally:
Log.println("Execution complete")
Log.stop()
parser = argparse.ArgumentParser()
parser.add_argument(*["--settings", "--settings-file", "--settings_file"], **{
"help": "path to JSON file with settings",
"type": str,
"dest": "filename",
"default": "./settings.json",
"required": False
})
namespace = parser.parse_args()
args = {k: getattr(namespace, k) for k in vars(namespace)}
if not os.path.exists(args["filename"]):
raise Exception("Can not file settings file {filename}".format(filename=args["filename"]))
with codecs.open(args["filename"], "r", encoding="utf-8") as file:
json_data = file.read()
settings = json.loads(json_data)
settings["args"] = args
logger = logging.getLogger('esFrontLine')
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
for d in listwrap(settings["debug"]["log"]):
if d.get("filename", None):
fh = RotatingFileHandler(**d)
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
logger.addHandler(fh)
elif d.get("stream", None) in ("sys.stdout", "sys.stderr"):
ch = logging.StreamHandler(stream=eval(d["stream"]))
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
HeaderRewriterFix(app, remove_headers=['Date', 'Server'])
app.run(**settings["flask"])
except Exception, e:
print(e.message)

Просмотреть файл

Просмотреть файл

@ -1,216 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import StringIO
import datetime
import re
import time
from .multiset import Multiset
from .jsons import json_decoder, json_encoder
from .logs import Log
from . import struct
from .strings import expand_template
from .struct import StructList
class CNV:
"""
DUE TO MY POOR MEMORY, THIS IS A LIST OF ALL CONVERSION ROUTINES
"""
@staticmethod
def object2JSON(obj, pretty=False):
try:
return json_encoder.encode(obj, pretty=pretty)
except Exception, e:
Log.error("Can not encode into JSON: {{value}}", {"value":repr(obj)}, e)
@staticmethod
def JSON2object(json_string, params=None, flexible=False):
try:
#REMOVE """COMMENTS""", #COMMENTS, //COMMENTS, AND \n \r
if flexible: json_string=re.sub(r"\"\"\".*?\"\"\"|\s+//.*\n|#.*?\n|\n|\r", r" ", json_string) #DERIVED FROM https://github.com/jeads/datasource/blob/master/datasource/bases/BaseHub.py#L58
if params:
params=dict([(k,CNV.value2quote(v)) for k,v in params.items()])
json_string=expand_template(json_string, params)
obj=json_decoder.decode(json_string)
if isinstance(obj, list): return StructList(obj)
return struct.wrap(obj)
except Exception, e:
Log.error("Can not decode JSON:\n\t"+json_string, e)
@staticmethod
def string2datetime(value, format):
## http://docs.python.org/2/library/datetime.html#strftime-and-strptime-behavior
try:
return datetime.datetime.strptime(value, format)
except Exception, e:
Log.error("Can not format {{value}} with {{format}}", {"value":value, "format":format}, e)
@staticmethod
def datetime2string(value, format):
try:
return value.strftime(format)
except Exception, e:
Log.error("Can not format {{value}} with {{format}}", {"value":value, "format":format}, e)
@staticmethod
def datetime2unix(d):
if d == None:
return None
return long(time.mktime(d.timetuple()))
@staticmethod
def datetime2milli(d):
try:
epoch = datetime.datetime(1970, 1, 1)
diff = d-epoch
return (diff.days * 86400000) + \
(diff.seconds * 1000) + \
(diff.microseconds / 1000) # 86400000=24*3600*1000
except Exception, e:
Log.error("Can not convert {{value}}", {"value": d})
@staticmethod
def unix2datetime(u):
return datetime.datetime.utcfromtimestamp(u)
@staticmethod
def milli2datetime(u):
return datetime.datetime.utcfromtimestamp(u/1000)
@staticmethod
def dict2Multiset(dic):
if dic == None:
return None
output = Multiset()
output.dic = struct.unwrap(dic).copy()
return output
@staticmethod
def multiset2dict(value):
"""
CONVERT MULTISET TO dict THAT MAPS KEYS TO MAPS KEYS TO KEY-COUNT
"""
if value == None:
return None
return dict(value.dic)
@staticmethod
def table2list(
column_names, #tuple of columns names
rows #list of tuples
):
return StructList([dict(zip(column_names, r)) for r in rows])
#PROPER NULL HANDLING
@staticmethod
def value2string(value):
if value == None:
return None
return unicode(value)
#RETURN PRETTY PYTHON CODE FOR THE SAME
@staticmethod
def value2quote(value):
if isinstance(value, basestring):
return CNV.string2quote(value)
else:
return repr(value)
@staticmethod
def string2quote(value):
# return repr(value)
return "\""+value.replace("\\", "\\\\").replace("\"", "\\\"")+"\""
#RETURN PYTHON CODE FOR THE SAME
@staticmethod
def value2code(value):
return repr(value)
@staticmethod
def DataFrame2string(df, columns=None):
output = StringIO.StringIO()
try:
df.to_csv(output, sep="\t", header=True, cols=columns, engine='python')
return output.getvalue()
finally:
output.close()
@staticmethod
def ascii2char(ascii):
return chr(ascii)
@staticmethod
def char2ascii(char):
return ord(char)
@staticmethod
def int2hex(value, size):
return (("0"*size)+hex(value)[2:])[-size:]
@staticmethod
def value2intlist(value):
if value == None:
return None
elif hasattr(value, '__iter__'):
output=[int(d) for d in value if d!=""]
return output
elif value.strip()=="":
return None
else:
return [int(value)]
@staticmethod
def value2int(value):
if value == None:
return None
else:
return int(value)
@staticmethod
def value2number(v):
try:
if isinstance(v, float) and round(v,0)!=v:
return v
#IF LOOKS LIKE AN INT, RETURN AN INT
return int(v)
except Exception:
try:
return float(v)
except Exception, e:
Log.error("Not a number ({{value}})", {"value":v}, e)
@staticmethod
def utf82unicode(value):
return unicode(value.decode('utf8'))
@staticmethod
def latin12unicode(value):
return unicode(value.decode('iso-8859-1'))

Просмотреть файл

@ -1,611 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime
import subprocess
from pymysql import connect
from . import struct
from .maths import Math
from .strings import expand_template
from .struct import nvl
from .cnv import CNV
from .logs import Log, Except
from .queries import Q
from .strings import indent
from .strings import outdent
from .files import File
DEBUG = False
MAX_BATCH_SIZE=100
all_db=[]
class DB(object):
"""
"""
def __init__(self, settings, schema=None):
"""OVERRIDE THE settings.schema WITH THE schema PARAMETER"""
all_db.append(self)
if isinstance(settings, DB):
settings=settings.settings
self.settings=settings.copy()
self.settings.schema=nvl(schema, self.settings.schema)
self.debug=nvl(self.settings.debug, DEBUG)
self._open()
def _open(self):
""" DO NOT USE THIS UNLESS YOU close() FIRST"""
try:
self.db=connect(
host=self.settings.host,
port=self.settings.port,
user=nvl(self.settings.username, self.settings.user),
passwd=nvl(self.settings.password, self.settings.passwd),
db=nvl(self.settings.schema, self.settings.db),
charset=u"utf8",
use_unicode=True
)
except Exception, e:
Log.error(u"Failure to connect", e)
self.cursor = None
self.partial_rollback=False
self.transaction_level=0
self.backlog=[] #accumulate the write commands so they are sent at once
def __enter__(self):
self.begin()
return self
def __exit__(self, type, value, traceback):
if isinstance(value, BaseException):
try:
if self.cursor: self.cursor.close()
self.cursor = None
self.rollback()
except Exception, e:
Log.warning(u"can not rollback()", e)
finally:
self.close()
return
try:
self.commit()
except Exception, e:
Log.warning(u"can not commit()", e)
finally:
self.close()
def begin(self):
if self.transaction_level==0: self.cursor=self.db.cursor()
self.transaction_level+=1
self.execute("SET TIME_ZONE='+00:00'")
def close(self):
if self.transaction_level>0:
Log.error(u"expecting commit() or rollback() before close")
self.cursor = None #NOT NEEDED
try:
self.db.close()
except Exception, e:
if e.message.find("Already closed")>=0:
return
Log.warning(u"can not close()", e)
finally:
all_db.remove(self)
def commit(self):
try:
self._execute_backlog()
except Exception, e:
try:
self.rollback()
except Exception:
pass
Log.error(u"Error while processing backlog", e)
if self.transaction_level==0:
Log.error(u"No transaction has begun")
elif self.transaction_level==1:
if self.partial_rollback:
try:
self.rollback()
except Exception:
pass
Log.error(u"Commit after nested rollback is not allowed")
else:
if self.cursor: self.cursor.close()
self.cursor = None
self.db.commit()
self.transaction_level-=1
def flush(self):
try:
self.commit()
except Exception, e:
Log.error(u"Can not flush", e)
try:
self.begin()
except Exception, e:
Log.error(u"Can not flush", e)
def rollback(self):
self.backlog=[] #YAY! FREE!
if self.transaction_level==0:
Log.error(u"No transaction has begun")
elif self.transaction_level==1:
self.transaction_level-=1
if self.cursor!=None:
self.cursor.close()
self.cursor = None
self.db.rollback()
else:
self.transaction_level-=1
self.partial_rollback=True
Log.warning(u"Can not perform partial rollback!")
def call(self, proc_name, params):
self._execute_backlog()
try:
self.cursor.callproc(proc_name, params)
self.cursor.close()
self.cursor=self.db.cursor()
except Exception, e:
Log.error(u"Problem calling procedure "+proc_name, e)
def query(self, sql, param=None):
self._execute_backlog()
try:
old_cursor=self.cursor
if not old_cursor: #ALLOW NON-TRANSACTIONAL READS
self.cursor=self.db.cursor()
if param: sql=expand_template(sql, self.quote_param(param))
sql=outdent(sql)
if self.debug: Log.note(u"Execute SQL:\n{{sql}}", {u"sql":indent(sql)})
self.cursor.execute(sql)
columns = [utf8_to_unicode(d[0]) for d in nvl(self.cursor.description, [])]
fixed=[[utf8_to_unicode(c) for c in row] for row in self.cursor]
result=CNV.table2list(columns, fixed)
if not old_cursor: #CLEANUP AFTER NON-TRANSACTIONAL READS
self.cursor.close()
self.cursor = None
return result
except Exception, e:
if e.message.find("InterfaceError") >= 0:
Log.error(u"Did you close the db connection?", e)
Log.error(u"Problem executing SQL:\n"+indent(sql.strip()), e, offset=1)
# EXECUTE GIVEN METHOD FOR ALL ROWS RETURNED
def execute(self, sql, param=None, execute=None):
assert execute
num=0
self._execute_backlog()
try:
old_cursor=self.cursor
if not old_cursor: #ALLOW NON-TRANSACTIONAL READS
self.cursor=self.db.cursor()
if param: sql=expand_template(sql,self.quote_param(param))
sql=outdent(sql)
if self.debug: Log.note(u"Execute SQL:\n{{sql}}", {u"sql":indent(sql)})
self.cursor.execute(sql)
columns = tuple( [utf8_to_unicode(d[0]) for d in self.cursor.description] )
for r in self.cursor:
num+=1
execute(struct.wrap(dict(zip(columns, [utf8_to_unicode(c) for c in r]))))
if not old_cursor: #CLEANUP AFTER NON-TRANSACTIONAL READS
self.cursor.close()
self.cursor = None
except Exception, e:
Log.error(u"Problem executing SQL:\n"+indent(sql.strip()), e, offset=1)
return num
def execute(self, sql, param=None):
if self.transaction_level==0: Log.error(u"Expecting transaction to be started before issuing queries")
if param: sql=expand_template(sql, self.quote_param(param))
sql=outdent(sql)
self.backlog.append(sql)
if len(self.backlog)>=MAX_BATCH_SIZE:
self._execute_backlog()
def execute_file(self, filename, param=None):
content=File(filename).read()
self.execute(content, param)
@staticmethod
def execute_sql(settings, sql, param=None):
"""EXECUTE MANY LINES OF SQL (FROM SQLDUMP FILE, MAYBE?"""
if param:
with DB(settings) as temp:
sql=expand_template(sql, temp.quote_param(param))
# MWe have no way to execute an entire SQL file in bulk, so we
# have to shell out to the commandline client.
args = [
u"mysql",
u"-h{0}".format(settings.host),
u"-u{0}".format(settings.username),
u"-p{0}".format(settings.password),
u"{0}".format(settings.schema)
]
proc = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=-1
)
(output, _) = proc.communicate(sql)
if proc.returncode:
if len(sql)>10000:
sql=u"<"+unicode(len(sql))+u" bytes of sql>"
Log.error(u"Unable to execute sql: return code {{return_code}}, {{output}}:\n {{sql}}\n", {
u"sql":indent(sql),
u"return_code":proc.returncode,
u"output":output
})
@staticmethod
def execute_file(settings, filename, param=None):
# MySQLdb provides no way to execute an entire SQL file in bulk, so we
# have to shell out to the commandline client.
sql=File(filename).read()
DB.execute_sql(settings, sql, param)
def _execute_backlog(self):
if not self.backlog: return
(backlog, self.backlog)=(self.backlog, [])
if self.db.__module__.startswith(u"pymysql"):
# BUG IN PYMYSQL: CAN NOT HANDLE MULTIPLE STATEMENTS
# https://github.com/PyMySQL/PyMySQL/issues/157
for b in backlog:
try:
self.cursor.execute(b)
except Exception, e:
Log.error(u"Can not execute sql:\n{{sql}}", {u"sql":b}, e)
self.cursor.close()
self.cursor = self.db.cursor()
else:
for i, g in Q.groupby(backlog, size=MAX_BATCH_SIZE):
sql=u";\n".join(g)
try:
if self.debug: Log.note(u"Execute block of SQL:\n"+indent(sql))
self.cursor.execute(sql)
self.cursor.close()
self.cursor = self.db.cursor()
except Exception, e:
Log.error(u"Problem executing SQL:\n{{sql}}", {u"sql":indent(sql.strip())}, e, offset=1)
## Insert dictionary of values into table
def insert(self, table_name, record):
keys = record.keys()
try:
command = u"INSERT INTO " + self.quote_column(table_name) + u"(" + \
u",".join([self.quote_column(k) for k in keys]) + \
u") VALUES (" + \
u",".join([self.quote_value(record[k]) for k in keys]) + \
u")"
self.execute(command)
except Exception, e:
Log.error(u"problem with record: {{record}}", {u"record": record}, e)
# candidate_key IS LIST OF COLUMNS THAT CAN BE USED AS UID (USUALLY PRIMARY KEY)
# ONLY INSERT IF THE candidate_key DOES NOT EXIST YET
def insert_new(self, table_name, candidate_key, new_record):
candidate_key=struct.listwrap(candidate_key)
condition=u" AND\n".join([self.quote_column(k)+u"="+self.quote_value(new_record[k]) if new_record[k] != None else self.quote_column(k)+u" IS Null" for k in candidate_key])
command=u"INSERT INTO "+self.quote_column(table_name)+u" ("+\
u",".join([self.quote_column(k) for k in new_record.keys()])+\
u")\n"+\
u"SELECT a.* FROM (SELECT "+u",".join([self.quote_value(v)+u" "+self.quote_column(k) for k,v in new_record.items()])+u" FROM DUAL) a\n"+\
u"LEFT JOIN "+\
u"(SELECT 'dummy' exist FROM "+self.quote_column(table_name)+u" WHERE "+condition+u" LIMIT 1) b ON 1=1 WHERE exist IS Null"
self.execute(command, {})
# ONLY INSERT IF THE candidate_key DOES NOT EXIST YET
def insert_newlist(self, table_name, candidate_key, new_records):
for r in new_records:
self.insert_new(table_name, candidate_key, r)
def insert_list(self, table_name, records):
keys = set()
for r in records:
keys |= set(r.keys())
keys = Q.sort(keys)
try:
command = \
u"INSERT INTO " + self.quote_column(table_name) + u"(" + \
u",".join([self.quote_column(k) for k in keys]) + \
u") VALUES " + ",".join([
"(" + u",".join([self.quote_value(r[k]) for k in keys]) + u")"
for r in records
])
self.execute(command)
except Exception, e:
Log.error(u"problem with record: {{record}}", {u"record": records}, e)
def update(self, table_name, where_slice, new_values):
"""
where_slice IS A Struct WHICH WILL BE USED TO MATCH ALL IN table
"""
new_values = self.quote_param(new_values)
where_clause = u" AND\n".join([
self.quote_column(k) + u"=" + self.quote_value(v) if v != None else self.quote_column(k) + " IS NULL"
for k, v in where_slice.items()]
)
command=u"UPDATE "+self.quote_column(table_name)+u"\n"+\
u"SET "+\
u",\n".join([self.quote_column(k)+u"="+v for k,v in new_values.items()])+u"\n"+\
u"WHERE "+\
where_clause
self.execute(command, {})
def quote_param(self, param):
return {k:self.quote_value(v) for k, v in param.items()}
def quote_value(self, value):
"""
convert values to mysql code for the same
mostly delegate directly to the mysql lib, but some exceptions exist
"""
try:
if value == None:
return "NULL"
elif isinstance(value, SQL):
if not value.param:
#value.template CAN BE MORE THAN A TEMPLATE STRING
return self.quote_sql(value.template)
param = {k: self.quote_sql(v) for k, v in value.param.items()}
return expand_template(value.template, param)
elif isinstance(value, basestring):
return self.db.literal(value)
elif isinstance(value, datetime):
return u"str_to_date('"+value.strftime(u"%Y%m%d%H%M%S")+u"', '%Y%m%d%H%i%s')"
elif hasattr(value, '__iter__'):
return self.db.literal(CNV.object2JSON(value))
elif isinstance(value, dict):
return self.db.literal(CNV.object2JSON(value))
elif Math.is_number(value):
return unicode(value)
else:
return self.db.literal(value)
except Exception, e:
Log.error(u"problem quoting SQL", e)
def quote_sql(self, value, param=None):
"""
USED TO EXPAND THE PARAMETERS TO THE SQL() OBJECT
"""
try:
if isinstance(value, SQL):
if not param:
return value
param = {k: self.quote_sql(v) for k, v in param.items()}
return expand_template(value, param)
elif isinstance(value, basestring):
return value
elif isinstance(value, dict):
return self.db.literal(CNV.object2JSON(value))
elif hasattr(value, '__iter__'):
return u"(" + u",".join([self.quote_sql(vv) for vv in value]) + u")"
else:
return unicode(value)
except Exception, e:
Log.error(u"problem quoting SQL", e)
def quote_column(self, column_name, table=None):
if isinstance(column_name, basestring):
if table:
column_name = table + "." + column_name
return SQL(u"`" + column_name.replace(u".", u"`.`") + u"`") #MY SQL QUOTE OF COLUMN NAMES
elif isinstance(column_name, list):
if table:
return SQL(u", ".join([self.quote_column(table + "." + c) for c in column_name]))
return SQL(u", ".join([self.quote_column(c) for c in column_name]))
else:
#ASSUME {u"name":name, u"value":value} FORM
return SQL(column_name.value + u" AS " + self.quote_column(column_name.name))
def sort2sqlorderby(self, sort):
sort = Q.normalize_sort(sort)
return u",\n".join([self.quote_column(s.field) + (" DESC" if s.sort == -1 else " ASC") for s in sort])
def esfilter2sqlwhere(self, esfilter):
return SQL(self._filter2where(esfilter))
def _filter2where(self, esfilter):
esfilter=struct.wrap(esfilter)
if esfilter[u"and"] != None:
return u"("+u" AND ".join([self._filter2where(a) for a in esfilter[u"and"]])+u")"
elif esfilter[u"or"] != None:
return u"("+u" OR ".join([self._filter2where(a) for a in esfilter[u"or"]])+u")"
elif esfilter[u"not"]:
return u"NOT ("+self._filter2where(esfilter[u"not"])+u")"
elif esfilter.term != None:
return u"("+u" AND ".join([self.quote_column(col)+u"="+self.quote_value(val) for col, val in esfilter.term.items()])+u")"
elif esfilter.terms:
for col, v in esfilter.terms.items():
try:
int_list=CNV.value2intlist(v)
filter=int_list_packer(col, int_list)
return self._filter2where(filter)
except Exception, e:
return self.quote_column(col)+u" in ("+", ".join([self.quote_value(val) for val in v])+")"
elif esfilter.script != None:
return u"("+esfilter.script+u")"
elif esfilter.range != None:
name2sign={
u"gt": u">",
u"gte": u">=",
u"lte": u"<=",
u"lt": u"<"
}
return u"(" + u" AND ".join([
" AND ".join([
self.quote_column(col) + name2sign[sign] + self.quote_value(value)
for sign, value in ranges.items()
])
for col, ranges in esfilter.range.items()
]) + u")"
elif esfilter.exists != None:
if isinstance(esfilter.exists, basestring):
return u"("+self.quote_column(esfilter.exists)+u" IS NOT Null)"
else:
return u"("+self.quote_column(esfilter.exists.field)+u" IS NOT Null)"
else:
Log.error(u"Can not convert esfilter to SQL: {{esfilter}}", {u"esfilter":esfilter})
def utf8_to_unicode(v):
try:
if isinstance(v, str):
return v.decode(u"utf8")
else:
return v
except Exception, e:
Log.error(u"not expected", e)
#ACTUAL SQL, DO NOT QUOTE THIS STRING
class SQL(unicode):
def __init__(self, template='', param=None):
unicode.__init__(self)
self.template=template
self.param=param
def __str__(self):
Log.error(u"do not do this")
def int_list_packer(term, values):
"""
return singletons, ranges and exclusions
"""
singletons=set()
ranges=[]
exclude=set()
sorted=Q.sort(values)
last=sorted[0]
curr_start=last
curr_excl=set()
for v in sorted[1:]:
if v<=last+1:
pass
elif v-last > 3:
if last==curr_start:
singletons.add(last)
elif last-curr_start - len(curr_excl) < 6 or ((last-curr_start) < len(curr_excl)*3):
#small ranges are singletons, sparse ranges are singletons
singletons |= set(range(curr_start, last+1))
singletons -= curr_excl
else:
ranges.append({"gte":curr_start, "lte":last})
exclude |= curr_excl
curr_start=v
curr_excl=set()
else:
if v-curr_start >= len(curr_excl)*3:
add_me = set(range(last + 1, v))
curr_excl |= add_me
else:
ranges.append({"range":{term:{"gte":curr_start, "lte":last}}})
exclude |= curr_excl
curr_start=v
curr_excl=set()
last=v
if last > curr_start+1:
ranges.append({"gte":curr_start, "lte":last})
else:
singletons.add(curr_start)
singletons.add(last)
if ranges:
r={"or":[{"range":{term:r}} for r in ranges]}
if exclude:
r = {"and":[r, {"not":{"terms":{term:Q.sort(exclude)}}}]}
if singletons:
return {"or":[
{"terms":{term: Q.sort(singletons)}},
r
]}
else:
return r
else:
raise Except("no packing possible")

Просмотреть файл

@ -1,314 +0,0 @@
# encoding: utf-8
#
from datetime import datetime
import re
import sha
import time
import requests
import struct
from .maths import Math
from .queries import Q
from .cnv import CNV
from .logs import Log
from .struct import nvl
from .struct import Struct, StructList
DEBUG=False
class ElasticSearch(object):
def __init__(self, settings):
assert settings.host
assert settings.index
assert settings.type
self.metadata = None
if not settings.port: settings.port=9200
self.debug=nvl(settings.debug, DEBUG)
globals()["DEBUG"]=DEBUG or self.debug
self.settings=settings
self.path=settings.host+":"+unicode(settings.port)+"/"+settings.index+"/"+settings.type
@staticmethod
def create_index(settings, schema):
if isinstance(schema, basestring):
schema=CNV.JSON2object(schema)
ElasticSearch.post(
settings.host+":"+unicode(settings.port)+"/"+settings.index,
data=CNV.object2JSON(schema),
headers={"Content-Type":"application/json"}
)
time.sleep(2)
es=ElasticSearch(settings)
return es
@staticmethod
def delete_index(settings, index=None):
index=nvl(index, settings.index)
ElasticSearch.delete(
settings.host+":"+unicode(settings.port)+"/"+index,
)
def get_aliases(self):
"""
RETURN LIST OF {"alias":a, "index":i} PAIRS
ALL INDEXES INCLUDED, EVEN IF NO ALIAS {"alias":Null}
"""
data=self.get_metadata().indices
output=[]
for index, desc in data.items():
if not desc["aliases"]:
output.append({"index":index, "alias":None})
else:
for a in desc["aliases"]:
output.append({"index":index, "alias":a})
return struct.wrap(output)
def get_metadata(self):
if not self.metadata:
response=self.get(self.settings.host+":"+unicode(self.settings.port)+"/_cluster/state")
self.metadata=response.metadata
return self.metadata
def get_schema(self):
return self.get_metadata().indicies[self.settings.index]
#DELETE ALL INDEXES WITH GIVEN PREFIX, EXCEPT name
def delete_all_but(self, prefix, name):
for a in self.get_aliases():
# MATCH <prefix>YYMMDD_HHMMSS FORMAT
if re.match(re.escape(prefix)+"\\d{8}_\\d{6}", a.index) and a.index!=name:
ElasticSearch.delete_index(self.settings, a.index)
@staticmethod
def proto_name(prefix, timestamp=None):
if not timestamp:
timestamp = datetime.utcnow()
return prefix + CNV.datetime2string(timestamp, "%Y%m%d_%H%M%S")
def add_alias(self, alias):
self.metadata = None
requests.post(
self.settings.host+":"+unicode(self.settings.port)+"/_aliases",
CNV.object2JSON({
"actions":[
{"add":{"index":self.settings.index, "alias":alias}}
]
})
)
def get_proto(self, alias):
output=Q.sort([
a.index
for a in self.get_aliases()
if re.match(re.escape(alias)+"\\d{8}_\\d{6}", a.index) and not a.alias
])
return output
def is_proto(self, index):
for a in self.get_aliases():
if a.index==index and a.alias:
return False
return True
def delete_record(self, query):
if isinstance(query, dict):
ElasticSearch.delete(
self.path+"/_query",
data=CNV.object2JSON(query)
)
else:
ElasticSearch.delete(
self.path+"/"+query
)
def extend(self, records):
self.add(records)
# RECORDS MUST HAVE id AND json AS A STRING OR
# HAVE id AND value AS AN OBJECT
def add(self, records):
# ADD LINE WITH COMMAND
lines=[]
for r in records:
id=r["id"]
if "json" in r:
json=r["json"]
elif "value" in r:
json=CNV.object2JSON(r["value"])
else:
Log.error("Expecting every record given to have \"value\" or \"json\" property")
if id == None:
id = sha.new(json).hexdigest()
lines.append('{"index":{"_id":'+CNV.object2JSON(id)+'}}')
lines.append(json)
if not lines: return
response=ElasticSearch.post(
self.path+"/_bulk",
data="\n".join(lines).encode("utf8")+"\n",
headers={"Content-Type":"text"}
)
items=response["items"]
for i, item in enumerate(items):
if not item.index.ok:
Log.error("{{error}} while loading line:\n{{line}}", {
"error":item.index.error,
"line":lines[i*2+1]
})
if self.debug:
Log.note("{{num}} items added", {"num":len(lines)/2})
# -1 FOR NO REFRESH
def set_refresh_interval(self, seconds):
if seconds <= 0:
interval = "-1"
else:
interval = unicode(seconds) + "s"
response=ElasticSearch.put(
self.settings.host + ":" + unicode(
self.settings.port) + "/" + self.settings.index + "/_settings",
data="{\"index.refresh_interval\":\"" + interval + "\"}"
)
if response.content != '{"ok":true}':
Log.error("Can not set refresh interval ({{error}})", {
"error": response.content
})
def search(self, query):
try:
return ElasticSearch.post(self.path+"/_search", data=CNV.object2JSON(query))
except Exception, e:
Log.error("Problem with search", e)
def threaded_queue(self, size):
return Threaded_Queue(self, size)
@staticmethod
def post(*list, **args):
try:
response=requests.post(*list, **args)
if DEBUG: Log.note(response.content[:130])
details=CNV.JSON2object(response.content)
if details.error:
Log.error(details.error)
return details
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def get(*list, **args):
try:
response=requests.get(*list, **args)
if DEBUG: Log.note(response.content[:130])
details=CNV.JSON2object(response.content)
if details.error:
Log.error(details.error)
return details
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def put(*list, **args):
try:
response=requests.put(*list, **args)
if DEBUG: Log.note(response.content)
return response
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def delete(*list, **args):
try:
response=requests.delete(*list, **args)
if DEBUG: Log.note(response.content)
return response
except Exception, e:
Log.error("Problem with call to {{url}}", {"url":list[0]}, e)
@staticmethod
def scrub(r):
"""
REMOVE KEYS OF DEGENERATE VALUES (EMPTY STRINGS, EMPTY LISTS, AND NULLS)
TO LOWER CASE
CONVERT STRINGS OF NUMBERS TO NUMBERS
RETURNS **COPY**, DOES NOT CHANGE ORIGINAL
"""
return struct.wrap(_scrub(r))
def _scrub(r):
try:
if r == None:
return None
elif isinstance(r, basestring):
if r == "":
return None
return r.lower()
elif Math.is_number(r):
return CNV.value2number(r)
elif isinstance(r, dict):
if isinstance(r, Struct):
r = r.dict
output = {}
for k, v in r.items():
v = _scrub(v)
if v != None:
output[k.lower()] = v
if len(output) == 0:
return None
return output
elif hasattr(r, '__iter__'):
if isinstance(r, StructList):
r = r.list
output = []
for v in r:
v = _scrub(v)
if v != None:
output.append(v)
if not output:
return None
try:
return Q.sort(output)
except Exception:
return output
else:
return r
except Exception, e:
Log.warning("Can not scrub: {{json}}", {"json": r})

Просмотреть файл

@ -1,101 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
import sys
from .struct import nvl
class Emailer:
def __init__(self, settings):
self.settings=settings
def send_email(self,
from_address = None,
to_addrs = None,
subject='No Subject',
text_data = None,
html_data = None
):
"""Sends an email.
from_addr is an email address; to_addrs is a list of email adresses.
Addresses can be plain (e.g. "jsmith@example.com") or with real names
(e.g. "John Smith <jsmith@example.com>").
text_data and html_data are both strings. You can specify one or both.
If you specify both, the email will be sent as a MIME multipart
alternative, i.e., the recipient will see the HTML content if his
viewer supports it; otherwise he'll see the text content.
"""
settings=self.settings
from_address=nvl(from_address, settings.from_address)
if not from_address or not to_addrs:
raise Exception("Both from_addr and to_addrs must be specified")
if not text_data and not html_data:
raise Exception("Must specify either text_data or html_data")
if settings.use_ssl:
server = smtplib.SMTP_SSL(settings.host, settings.port)
else:
server = smtplib.SMTP(settings.host, settings.port)
if settings.username and settings.password:
server.login(settings.username, settings.password)
if not html_data:
msg = MIMEText(text_data)
elif not text_data:
msg = MIMEMultipart()
msg.preamble = subject
msg.attach(MIMEText(html_data, 'html'))
else:
msg = MIMEMultipart('alternative')
msg.attach(MIMEText(text_data, 'plain'))
msg.attach(MIMEText(html_data, 'html'))
msg['Subject'] = subject
msg['From'] = from_address
msg['To'] = ', '.join(to_addrs)
server.sendmail(from_address, to_addrs, msg.as_string())
server.quit()
if sys.hexversion < 0x020603f0:
# versions earlier than 2.6.3 have a bug in smtplib when sending over SSL:
# http://bugs.python.org/issue4066
# Unfortunately the stock version of Python in Snow Leopard is 2.6.1, so
# we patch it here to avoid having to install an updated Python version.
import socket
import ssl
def _get_socket_fixed(self, host, port, timeout):
if self.debuglevel > 0: print>> sys.stderr, 'connect:', (host, port)
new_socket = socket.create_connection((host, port), timeout)
new_socket = ssl.wrap_socket(new_socket, self.keyfile, self.certfile)
self.file = smtplib.SSLFakeFile(new_socket)
return new_socket
smtplib.SMTP_SSL._get_socket = _get_socket_fixed

Просмотреть файл

@ -1,120 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import codecs
from datetime import datetime
import os
import shutil
from .struct import listwrap, nvl
from .cnv import CNV
class File(object):
def __init__(self, filename):
if filename == None:
from .logs import Log
Log.error("File must be given a filename")
#USE UNIX STANDARD
self._filename = "/".join(filename.split(os.sep))
@property
def filename(self):
return self._filename.replace("/", os.sep)
@property
def abspath(self):
return os.path.abspath(self._filename)
def backup_name(self, timestamp=None):
"""
RETURN A FILENAME THAT CAN SERVE AS A BACKUP FOR THIS FILE
"""
suffix = CNV.datetime2string(nvl(timestamp, datetime.now()), "%Y%m%d_%H%M%S")
parts = self._filename.split(".")
if len(parts) == 1:
output = self._filename + "." + suffix
elif len(parts) > 1 and parts[-2][-1] == "/":
output = self._filename + "." + suffix
else:
parts.insert(-1, suffix)
output = ".".join(parts)
return output
def read(self, encoding="utf-8"):
with codecs.open(self._filename, "r", encoding=encoding) as file:
return file.read()
def read_ascii(self):
if not self.parent.exists: self.parent.create()
with open(self._filename, "r") as file:
return file.read()
def write_ascii(self, content):
if not self.parent.exists: self.parent.create()
with open(self._filename, "w") as file:
file.write(content)
def write(self, data):
if not self.parent.exists: self.parent.create()
with open(self._filename, "w") as file:
for d in listwrap(data):
file.write(d)
def iter(self):
return codecs.open(self._filename, "r")
def append(self, content):
if not self.parent.exists: self.parent.create()
with open(self._filename, "a") as output_file:
output_file.write(content)
def delete(self):
try:
if os.path.isdir(self._filename):
shutil.rmtree(self._filename)
elif os.path.isfile(self._filename):
os.remove(self._filename)
return self
except Exception, e:
if e.strerror=="The system cannot find the path specified":
return
from .logs import Log
Log.error("Could not remove file", e)
def backup(self):
names=self._filename.split("/")[-1].split(".")
if len(names)==1:
backup=File(self._filename+".backup "+datetime.utcnow().strftime("%Y%m%d %H%i%s"))
def create(self):
try:
os.makedirs(self._filename)
except Exception, e:
from .logs import Log
Log.error("Could not make directory {{dir_name}}", {"dir_name":self._filename}, e)
@property
def parent(self):
return File("/".join(self._filename.split("/")[:-1]))
@property
def exists(self):
if self._filename in ["", "."]: return True
try:
return os.path.exists(self._filename)
except Exception, e:
return False

Просмотреть файл

@ -1,181 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime, time
from decimal import Decimal
import json
import re
try:
# StringBuilder IS ABOUT 2x FASTER THAN list()
from __pypy__.builders import StringBuilder
use_pypy = True
except Exception, e:
use_pypy = False
class StringBuilder(list):
def __init__(self, length=None):
list.__init__(self)
def build(self):
return "".join(self)
class PyPyJSONEncoder(object):
"""
pypy DOES NOT OPTIMIZE GENERATOR CODE WELL
"""
def __init__(self):
object.__init__(self)
def encode(self, value, pretty=False):
if pretty:
return json.dumps(json_scrub(value), indent=4, sort_keys=True, separators=(',', ': '))
_buffer = StringBuilder(1024)
_value2json(value, _buffer.append)
output = _buffer.build()
return output
class cPythonJSONEncoder(object):
def __init__(self):
object.__init__(self)
def encode(self, value, pretty=False):
if pretty:
return json.dumps(json_scrub(value), indent=4, sort_keys=True, separators=(',', ': '))
return json.dumps(json_scrub(value))
# OH HUM, cPython with uJSON, OR pypy WITH BUILTIN JSON?
# http://liangnuren.wordpress.com/2012/08/13/python-json-performance/
# http://morepypy.blogspot.ca/2011/10/speeding-up-json-encoding-in-pypy.html
if use_pypy:
json_encoder = PyPyJSONEncoder()
json_decoder = json._default_decoder
else:
json_encoder = cPythonJSONEncoder()
json_decoder = json._default_decoder
def _value2json(value, appender):
if isinstance(value, basestring):
_string2json(value, appender)
elif value == None:
appender("null")
elif value is True:
appender('true')
elif value is False:
appender('false')
elif isinstance(value, (int, long, Decimal)):
appender(str(value))
elif isinstance(value, float):
appender(repr(value))
elif isinstance(value, datetime):
appender(unicode(long(time.mktime(value.timetuple())*1000)))
elif isinstance(value, dict):
_dict2json(value, appender)
elif hasattr(value, '__iter__'):
_list2json(value, appender)
else:
raise Exception(repr(value)+" is not JSON serializable")
def _list2json(value, appender):
appender("[")
first = True
for v in value:
if first:
first = False
else:
appender(", ")
_value2json(v, appender)
appender("]")
def _dict2json(value, appender):
items = value.iteritems()
appender("{")
first = True
for k, v in value.iteritems():
if first:
first = False
else:
appender(", ")
_string2json(unicode(k), appender)
appender(": ")
_value2json(v, appender)
appender("}")
special_find = u"\\\"\t\n\r".find
replacement = [u"\\\\", u"\\\"", u"\\t", u"\\n", u"\\r"]
ESCAPE = re.compile(r'[\x00-\x1f\\"\b\f\n\r\t]')
ESCAPE_DCT = {
'\\': '\\\\',
'"': '\\"',
'\b': '\\b',
'\f': '\\f',
'\n': '\\n',
'\r': '\\r',
'\t': '\\t',
}
for i in range(0x20):
ESCAPE_DCT.setdefault(chr(i), '\\u{0:04x}'.format(i))
def _string2json(value, appender):
def replace(match):
return ESCAPE_DCT[match.group(0)]
appender("\"")
appender(ESCAPE.sub(replace, value))
appender("\"")
#REMOVE VALUES THAT CAN NOT BE JSON-IZED
def json_scrub(r):
return _scrub(r)
def _scrub(r):
if r == None:
return None
elif isinstance(r, dict):
output = {}
for k, v in r.iteritems():
v = _scrub(v)
output[k] = v
return output
elif hasattr(r, '__iter__'):
output = []
for v in r:
v = _scrub(v)
output.append(v)
return output
elif isinstance(r, Decimal):
return float(r)
else:
return r

Просмотреть файл

@ -1,400 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime, timedelta
import traceback
import logging
import sys
from .struct import listwrap, nvl
import struct, threads
from .strings import indent, expand_template
from .threads import Thread
ERROR="ERROR"
WARNING="WARNING"
NOTE="NOTE"
main_log = None
logging_multi = None
class Log(object):
"""
FOR STRUCTURED LOGGING AND EXCEPTION CHAINING
"""
@classmethod
def new_instance(cls, settings):
settings=struct.wrap(settings)
if settings["class"]:
if not settings["class"].startswith("logging.handlers."):
return make_log_from_settings(settings)
# elif settings["class"]=="sys.stdout":
#CAN BE SUPER SLOW
else:
return Log_usingLogger(settings)
if settings.file: return Log_usingFile(file)
if settings.filename: return Log_usingFile(settings.filename)
if settings.stream: return Log_usingStream(settings.stream)
@classmethod
def add_log(cls, log):
logging_multi.add_log(log)
@staticmethod
def debug(template=None, params=None):
"""
USE THIS FOR DEBUGGING (AND EVENTUAL REMOVAL)
"""
Log.note(nvl(template, ""), params)
@staticmethod
def println(template, params=None):
Log.note(template, params)
@staticmethod
def note(template, params=None):
template="{{log_timestamp}} - "+template
params = nvl(params, {}).copy()
#NICE TO GATHER MANY MORE ITEMS FOR LOGGING (LIKE STACK TRACES AND LINE NUMBERS)
params["log_timestamp"]=datetime.utcnow().strftime("%H:%M:%S")
main_log.write(template, params)
@staticmethod
def warning(template, params=None, cause=None):
if isinstance(params, BaseException):
cause=params
params = None
if cause and not isinstance(cause, Except):
cause=Except(WARNING, unicode(cause), trace=format_trace(traceback.extract_tb(sys.exc_info()[2]), 0))
e = Except(WARNING, template, params, cause, format_trace(traceback.extract_stack(), 1))
Log.note(unicode(e))
#raise an exception with a trace for the cause too
@staticmethod
def error(
template, #human readable template
params=None, #parameters for template
cause=None, #pausible cause
offset=0 #stack trace offset (==1 if you do not want to report self)
):
if params and isinstance(struct.listwrap(params)[0], BaseException):
cause=params
params = None
if cause and not isinstance(cause, Except):
cause=[Except(ERROR, unicode(cause), trace=format_trace(traceback.extract_tb(sys.exc_info()[2]), offset))]
else:
cause=listwrap(cause)
trace=format_trace(traceback.extract_stack(), 1+offset)
e=Except(ERROR, template, params, cause, trace)
raise e
#RUN ME FIRST TO SETUP THE THREADED LOGGING
@staticmethod
def start(settings=None):
##http://victorlin.me/2012/08/good-logging-practice-in-python/
if not settings: return
if not settings.log: return
globals()["logging_multi"]=Log_usingMulti()
globals()["main_log"]=logging_multi
for log in listwrap(settings.log):
Log.add_log(Log.new_instance(log))
@staticmethod
def stop():
main_log.stop()
def write(self):
Log.error("not implemented")
def format_trace(tbs, trim=0):
tbs.reverse()
list = []
for filename, lineno, name, line in tbs[trim:]:
item = 'at %s:%d (%s)\n' % (filename,lineno,name)
list.append(item)
return "".join(list)
#def format_trace(tb, trim=0):
# list = []
# for filename, lineno, name, line in traceback.extract_tb(tb)[0:-trim]:
# item = 'File "%s", line %d, in %s\n' % (filename,lineno,name)
# if line:
# item = item + '\t%s\n' % line.strip()
# list.append(item)
# return "".join(list)
class Except(Exception):
def __init__(self, type=ERROR, template=None, params=None, cause=None, trace=None):
super(Exception, self).__init__(self)
self.type=type
self.template=template
self.params=params
self.cause=cause
self.trace=trace
@property
def message(self):
return unicode(self)
def __str__(self):
output=self.template
if self.params: output=expand_template(output, self.params)
if self.trace:
output+="\n"+indent(self.trace)
if self.cause:
output+="\ncaused by\n\t"+"\nand caused by\n\t".join([c.__str__() for c in self.cause])
return output+"\n"
class BaseLog(object):
def write(self, template, params):
pass
def stop(self):
pass
class Log_usingFile(BaseLog):
def __init__(self, file):
assert file
from files import File
self.file=File(file)
if self.file.exists:
self.file.backup()
self.file.delete()
self.file_lock=threads.Lock()
def write(self, template, params):
from files import File
with self.file_lock:
File(self.filename).append(expand_template(template, params))
#WRAP PYTHON CLASSIC logger OBJECTS
class Log_usingLogger(BaseLog):
def __init__(self, settings):
self.logger=logging.Logger("unique name", level=logging.INFO)
self.logger.addHandler(make_log_from_settings(settings))
def write(self, template, params):
# http://docs.python.org/2/library/logging.html#logging.LogRecord
self.logger.info(expand_template(template, params))
def make_log_from_settings(settings):
assert settings["class"]
# IMPORT MODULE FOR HANDLER
path = settings["class"].split(".")
class_name = path[-1]
path = ".".join(path[:-1])
try:
temp = __import__(path, globals(), locals(), [class_name], -1)
except Exception, e:
raise Exception("can not find path class: " + path)
constructor = object.__getattribute__(temp, class_name)
#IF WE NEED A FILE, MAKE SURE DIRECTORY EXISTS
if settings.filename:
from files import File
f = File(settings.filename)
if not f.parent.exists:
f.parent.create()
params = settings.dict
del params['class']
return constructor(**params)
class Log_usingStream(BaseLog):
#stream CAN BE AN OBJCET WITH write() METHOD, OR A STRING
#WHICH WILL eval() TO ONE
def __init__(self, stream):
assert stream
if isinstance(stream, basestring):
self.stream=eval(stream)
name=stream
else:
self.stream=stream
name="stream"
#WRITE TO STREAMS CAN BE *REALLY* SLOW, WE WILL USE A THREAD
from threads import Queue
self.queue=Queue()
def worker(please_stop):
queue=self.queue
while not please_stop:
next_run = datetime.utcnow() + timedelta(seconds=0.3)
logs = queue.pop_all()
if logs:
lines=[]
for log in logs:
try:
if log==Thread.STOP:
please_stop.go()
next_run = datetime.utcnow()
break
lines.append(expand_template(log.get("template", None), log.get("params", None)))
except Exception, e:
pass
try:
self.stream.write("\n".join(lines))
self.stream.write("\n")
except Exception, e:
pass
Thread.sleep(till=next_run)
self.thread=Thread("log to "+name, worker)
self.thread.start()
def write(self, template, params):
try:
self.queue.add({"template":template, "params":params})
return self
except Exception, e:
raise e #OH NO!
def stop(self):
try:
self.queue.add(Thread.STOP) #BE PATIENT, LET REST OF MESSAGE BE SENT
self.thread.join()
except Exception, e:
pass
try:
self.queue.close()
except Exception, f:
pass
class Log_usingThread(BaseLog):
def __init__(self, logger):
#DELAYED LOAD FOR THREADS MODULE
from threads import Queue
self.queue=Queue()
def worker():
while True:
logs = self.queue.pop_all()
for log in logs:
if log==Thread.STOP:
break
logger.write(**log)
Thread.sleep(1)
self.thread=Thread("log thread", worker)
self.thread.start()
def write(self, template, params):
try:
self.queue.add({"template":template, "params":params})
return self
except Exception, e:
sys.stdout.write("IF YOU SEE THIS, IT IS LIKELY YOU FORGOT TO RUN Log.start() FIRST")
raise e #OH NO!
def stop(self):
try:
self.queue.add(Thread.STOP) #BE PATIENT, LET REST OF MESSAGE BE SENT
self.thread.join()
except Exception, e:
pass
try:
self.queue.close()
except Exception, f:
pass
class Log_usingMulti(BaseLog):
def __init__(self):
self.many=[]
def write(self, template, params):
for m in self.many:
try:
m.write(template, params)
except Exception, e:
pass
return self
def add_log(self, logger):
self.many.append(logger)
return self
def remove_log(self, logger):
self.many.remove(logger)
return self
def clear_log(self):
self.many=[]
def stop(self):
for m in self.many:
try:
m.stop()
except Exception, e:
pass
if not main_log:
main_log = Log_usingStream("sys.stdout")

Просмотреть файл

@ -1,115 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import math
from . import struct
from .struct import Null, nvl
from .logs import Log
from .strings import find_first
class Math(object):
@staticmethod
def bayesian_add(a, b):
if a>=1 or b>=1 or a<=0 or b<=0: Log.error("Only allowed values *between* zero and one")
return a*b/(a*b+(1-a)*(1-b))
# FOR GOODNESS SAKE - IF YOU PROVIDE A METHOD abs(), PLEASE PROVIDE IT'S COMPLEMENT
# x = abs(x)*sign(x)
# FOUND IN numpy, BUT WE USUALLY DO NOT NEED TO BRING IN A BIG LIB FOR A SIMPLE DECISION
@staticmethod
def sign(v):
if v<0: return -1
if v>0: return +1
return 0
@staticmethod
def is_number(s):
try:
float(s)
return True
except Exception:
return False
@staticmethod
def is_integer(s):
try:
if float(s)==round(float(s), 0):
return True
return False
except Exception:
return False
@staticmethod
def round_sci(value, decimal=None, digits=None):
if digits != None:
m=pow(10, math.floor(math.log10(digits)))
return round(value/m, digits)*m
return round(value, decimal)
@staticmethod
def floor(value, mod=None):
"""
x == floor(x, a) + mod(x, a) FOR ALL a
"""
mod = nvl(mod, 1)
v = int(math.floor(value))
return v - (v % mod)
#RETURN A VALUE CLOSE TO value, BUT WITH SHORTER len(unicode(value))<len(unicode(value)):
@staticmethod
def approx_str(value):
v=unicode(value)
d=v.find(".")
if d==-1: return value
i=find_first(v, ["9999", "0000"], d)
if i==-1: return value
return Math.round_sci(value, decimal=i-d-1)
@staticmethod
def min(values):
output = Null
for v in values:
if v == None:
continue
if math.isnan(v):
continue
if output == None:
output = v
continue
output = min(output, v)
return output
@staticmethod
def max(values):
output = Null
for v in values:
if v == None:
continue
if math.isnan(v):
continue
if output == None:
output = v
continue
output = max(output, v)
return output

Просмотреть файл

@ -1,95 +0,0 @@
# encoding: utf-8
#
from multiprocessing.queues import Queue
from .logs import Log
class worker(object):
def __init__(func, inbound, outbound, logging):
logger = Log_usingInterProcessQueue(logging)
class Log_usingInterProcessQueue(Log):
def __init__(self, outbound):
self.outbound = outbound
def write(self, template, params):
self.outbound.put({"template": template, "param": params})
class Multiprocess(object):
def __init__(self, functions):
self.outbound = Queue()
self.inbound = Queue()
self.inbound = Queue()
#MAKE
#MAKE THREADS
self.threads = []
for t, f in enumerate(functions):
thread = worker(
"worker " + unicode(t),
f,
self.inbound,
self.outbound,
)
self.threads.append(thread)
def __enter__(self):
return self
#WAIT FOR ALL QUEUED WORK TO BE DONE BEFORE RETURNING
def __exit__(self, a, b, c):
try:
self.inbound.close() # SEND STOPS TO WAKE UP THE WORKERS WAITING ON inbound.pop()
except Exception, e:
Log.warning("Problem adding to inbound", e)
self.join()
#IF YOU SENT A stop(), OR STOP, YOU MAY WAIT FOR SHUTDOWN
def join(self):
try:
#WAIT FOR FINISH
for t in self.threads:
t.join()
except (KeyboardInterrupt, SystemExit):
Log.note("Shutdow Started, please be patient")
except Exception, e:
Log.error("Unusual shutdown!", e)
finally:
for t in self.threads:
t.keep_running = False
for t in self.threads:
t.join()
self.inbound.close()
self.outbound.close()
#RETURN A GENERATOR THAT HAS len(parameters) RESULTS (ANY ORDER)
def execute(self, parameters):
#FILL QUEUE WITH WORK
self.inbound.extend(parameters)
num = len(parameters)
def output():
for i in xrange(num):
result = self.outbound.pop()
yield result
return output()
#EXTERNAL COMMAND THAT RETURNS IMMEDIATELY
def stop(self):
self.inbound.close() #SEND STOPS TO WAKE UP THE WORKERS WAITING ON inbound.pop()
for t in self.threads:
t.keep_running = False

Просмотреть файл

@ -1,86 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from .struct import Null
from .logs import Log
class Multiset(object):
def __init__(self, list=Null, key_field=None, count_field=None):
if not key_field and not count_field:
self.dic = dict()
for i in list:
self.add(i)
return
else:
self.dic={i[key_field]:i[count_field] for i in list}
def __iter__(self):
for k, m in self.dic.items():
for i in range(m):
yield k
def items(self):
return self.dic.items()
def keys(self):
return self.dic.keys()
def add(self, value):
if value in self.dic:
self.dic[value]+=1
else:
self.dic[value]=1
def extend(self, values):
for v in values:
self.add(v)
def remove(self, value):
if value not in self.dic:
Log.error("{{value}} is not in multiset", {"value":value})
self._remove(value)
def copy(self):
output = Multiset()
output.dic=self.dic.copy()
return output
def _remove(self, value):
count=self.dic.get(value, None)
if count == None:
return
count-=1
if count==0:
del(self.dic[value])
else:
self.dic[value]=count
def __sub__(self, other):
output=self.copy()
for o in other:
output._remove(o)
return output
def __set__(self, other):
return set(self.dic.keys())
def __len__(self):
return sum(self.dic.values())
def count(self, value):
if value in self.dic:
return self.dic[value]
else:
return 0

Просмотреть файл

@ -1,160 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import threading
from .struct import nvl
from .logs import Log
from .threads import Queue, Thread
DEBUG = True
class worker_thread(threading.Thread):
#in_queue MUST CONTAIN HASH OF PARAMETERS FOR load()
def __init__(self, name, in_queue, out_queue, function):
threading.Thread.__init__(self)
self.name=name
self.in_queue=in_queue
self.out_queue=out_queue
self.function=function
self.keep_running=True
self.num_runs=0
self.start()
#REQUIRED TO DETECT KEYBOARD, AND OTHER, INTERRUPTS
def join(self, timeout=None):
while self.isAlive():
Log.note("Waiting on thread {{thread}}", {"thread":self.name})
threading.Thread.join(self, nvl(timeout, 0.5))
def run(self):
got_stop=False
while self.keep_running:
request = self.in_queue.pop()
if request == Thread.STOP:
got_stop=True
if self.in_queue.queue:
Log.warning("programmer error")
break
if not self.keep_running:
break
try:
if DEBUG and hasattr(self.function, "func_name"):
Log.note("run {{function}}", {"function": self.function.func_name})
result = self.function(**request)
if self.out_queue:
self.out_queue.add({"response": result})
except Exception, e:
Log.warning("Can not execute with params={{params}}", {"params": request}, e)
if self.out_queue:
self.out_queue.add({"exception": e})
finally:
self.num_runs += 1
self.keep_running = False
if self.num_runs==0:
Log.warning("{{name}} thread did no work", {"name":self.name})
if DEBUG and self.num_runs!=1:
Log.note("{{name}} thread did {{num}} units of work", {
"name":self.name,
"num":self.num_runs
})
if got_stop and self.in_queue.queue:
Log.warning("multithread programmer error")
if DEBUG:
Log.note("{{thread}} DONE", {"thread":self.name})
def stop(self):
self.keep_running=False
#PASS A SET OF FUNCTIONS TO BE EXECUTED (ONE PER THREAD)
#PASS AN (ITERATOR/LIST) OF PARAMETERS TO BE ISSUED TO NEXT AVAILABLE THREAD
class Multithread(object):
def __init__(self, functions):
self.outbound=Queue()
self.inbound=Queue()
#MAKE THREADS
self.threads=[]
for t, f in enumerate(functions):
thread=worker_thread("worker "+unicode(t), self.inbound, self.outbound, f)
self.threads.append(thread)
def __enter__(self):
return self
#WAIT FOR ALL QUEUED WORK TO BE DONE BEFORE RETURNING
def __exit__(self, type, value, traceback):
try:
if isinstance(value, Exception):
self.inbound.close()
self.inbound.add(Thread.STOP)
self.join()
except Exception, e:
Log.warning("Problem sending stops", e)
#IF YOU SENT A stop(), OR Thread.STOP, YOU MAY WAIT FOR SHUTDOWN
def join(self):
try:
#WAIT FOR FINISH
for t in self.threads:
t.join()
except (KeyboardInterrupt, SystemExit):
Log.note("Shutdow Started, please be patient")
except Exception, e:
Log.error("Unusual shutdown!", e)
finally:
for t in self.threads:
t.keep_running=False
self.inbound.close()
self.outbound.close()
for t in self.threads:
t.join()
#RETURN A GENERATOR THAT HAS len(parameters) RESULTS (ANY ORDER)
def execute(self, request):
#FILL QUEUE WITH WORK
self.inbound.extend(request)
num=len(request)
def output():
for i in xrange(num):
result=self.outbound.pop()
if "exception" in result:
raise result["exception"]
else:
yield result["response"]
return output()
#EXTERNAL COMMAND THAT RETURNS IMMEDIATELY
def stop(self):
self.inbound.close() #SEND STOPS TO WAKE UP THE WORKERS WAITING ON inbound.pop()
for t in self.threads:
t.keep_running=False

Просмотреть файл

@ -1,596 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import sys
from ..logs import Log
from ..struct import nvl, listwrap
from .. import struct
from ..strings import indent, expand_template
from ..struct import StructList, Struct, Null
from ..multiset import Multiset
# A COLLECTION OF DATABASE OPERATORS (RELATIONAL ALGEBRA OPERATORS)
def run(query):
query = struct.wrap(query)
if isinstance(query["from"], list):
_from = query["from"]
else:
_from = run(query["from"])
if query.edges != None:
Log.error("not implemented yet")
if query.filter != None:
Log.error("not implemented yet")
for param in listwrap(query.window):
window(_from, param)
if query.where != None:
w = query.where
_from = filter(_from, w)
if query.sort != None:
_from = sort(_from, query.sort)
if query.select != None:
_from = select(_from, query.select)
return _from
def groupby(data, keys=None, size=None, min_size=None, max_size=None):
#return list of (keys, values) pairs where
#group by the set of set of keys
#values IS LIST OF ALL data that has those keys
if size != None or min_size != None or max_size != None:
if size != None: max_size = size
return groupby_min_max_size(data, min_size=min_size, max_size=max_size)
try:
def keys2string(x):
#REACH INTO dict TO GET PROPERTY VALUE
return "|".join([unicode(x[k]) for k in keys])
def get_keys(d):
return struct.wrap({k: d[k] for k in keys})
agg = {}
for d in data:
key = keys2string(d)
if key in agg:
pair = agg[key]
else:
pair = (get_keys(d), StructList())
agg[key] = pair
pair[1].append(d)
return agg.values()
except Exception, e:
Log.error("Problem grouping", e)
def index(data, keys=None):
#return dict that uses keys to index data
keys = struct.unwrap(listwrap(keys))
output = dict()
for d in data:
o = output
for k in keys[:-1]:
v = d[k]
o = o.get(v, dict())
v = d[keys[-1]]
o = o.get(v, list())
o.append(d)
return output
def unique_index(data, keys=None):
"""
RETURN dict THAT USES KEYS TO INDEX DATA
ONLY ONE VALUE ALLOWED PER UNIQUE KEY
"""
o = Index(listwrap(keys))
for d in data:
try:
o.add(d)
except Exception, e:
Log.error("index {{index}} is not unique {{key}} maps to both {{value1}} and {{value2}}", {
"index": keys,
"key": select([d], keys)[0],
"value1": o[d],
"value2": d
}, e)
return o
def map(data, relation):
if data == None:
return Null
if isinstance(relation, dict):
return [relation[d] for d in data]
else:
Log.error("Do not know how to handle relation of type {{type}}", {
"type":type(relation)
})
return Null
def select(data, field_name):
#return list with values from field_name
if isinstance(data, Cube): Log.error("Do not know how to deal with cubes yet")
if isinstance(field_name, basestring):
return [d[field_name] for d in data]
return [dict([(k, v) for k, v in x.items() if k in field_name]) for x in data]
def get_columns(data):
output = {}
for d in data:
for k, v in d.items():
if k not in output:
c = {"name": k, "domain": Null}
output[k] = c
# IT WOULD BE NICE TO ADD DOMAIN ANALYSIS HERE
return [{"name": n} for n in output]
def stack(data, name=None, value_column=None, columns=None):
"""
STACK ALL CUBE DATA TO A SINGLE COLUMN, WITH ONE COLUMN PER DIMENSION
>>> s
a b
one 1 2
two 3 4
>>> stack(s)
one a 1
one b 2
two a 3
two b 4
STACK LIST OF HASHES, OR 'MERGE' SEPARATE CUBES
data - expected to be a list of dicts
name - give a name to the new column
value_column - Name given to the new, single value column
columns - explicitly list the value columns (USE SELECT INSTEAD)
"""
assert value_column != None
if isinstance(data, Cube): Log.error("Do not know how to deal with cubes yet")
if columns == None:
columns = data.get_columns()
data = data.select(columns)
name = nvl(name, data.name)
output = []
parts = set()
for r in data:
for c in columns:
v = r[c]
parts.add(c)
output.append({"name": c, "value": v})
edge = struct.wrap({"domain": {"type": "set", "partitions": parts}})
#UNSTACKING CUBES WILL BE SIMPLER BECAUSE THE keys ARE IMPLIED (edges-column)
def unstack(data, keys=None, column=None, value=None):
assert keys != None
assert column != None
assert value != None
if isinstance(data, Cube): Log.error("Do not know how to deal with cubes yet")
output = []
for key, values in groupby(data, keys):
for v in values:
key[v[column]] = v[value]
output.append(key)
return StructList(output)
def normalize_sort(fieldnames):
"""
CONVERT SORT PARAMETERS TO A NORMAL FORM SO EASIER TO USE
"""
if fieldnames == None:
return []
formal = []
for f in listwrap(fieldnames):
if isinstance(f, basestring):
f = {"field": f, "sort": 1}
formal.append(f)
return formal
def sort(data, fieldnames=None):
"""
PASS A FIELD NAME, OR LIST OF FIELD NAMES, OR LIST OF STRUCTS WITH {"field":field_name, "sort":direction}
"""
try:
if data == None:
return Null
if fieldnames == None:
return sorted(data)
if not isinstance(fieldnames, list):
#SPECIAL CASE, ONLY ONE FIELD TO SORT BY
if isinstance(fieldnames, basestring):
def comparer(left, right):
return cmp(nvl(left, Struct())[fieldnames], nvl(right, Struct())[fieldnames])
return sorted(data, cmp=comparer)
else:
#EXPECTING {"field":f, "sort":i} FORMAT
def comparer(left, right):
return fieldnames["sort"] * cmp(nvl(left, Struct())[fieldnames["field"]],
nvl(right, Struct())[fieldnames["field"]])
return sorted(data, cmp=comparer)
formal = normalize_sort(fieldnames)
def comparer(left, right):
left = nvl(left, Struct())
right = nvl(right, Struct())
for f in formal:
try:
result = f["sort"] * cmp(left[f["field"]], right[f["field"]])
if result != 0: return result
except Exception, e:
Log.error("problem with compare", e)
return 0
if isinstance(data, list):
output = sorted(data, cmp=comparer)
elif hasattr(data, "__iter__"):
output = sorted(list(data), cmp=comparer)
else:
Log.error("Do not know how to handle")
return output
except Exception, e:
Log.error("Problem sorting\n{{data}}", {"data": data}, e)
def add(*values):
total = Null
for v in values:
if total == None:
total = v
else:
if v != None:
total += v
return total
def filter(data, where):
"""
where - a function that accepts (record, rownum, rows) and return s boolean
"""
where = wrap_function(where)
return [d for i, d in enumerate(data) if where(d, i, data)]
def wrap_function(func):
"""
RETURN A THREE-PARAMETER WINDOW FUNCTION TO MATCH
"""
numarg = func.__code__.co_argcount
if numarg == 0:
def temp(row, rownum, rows):
return func()
return temp
elif numarg == 1:
def temp(row, rownum, rows):
return func(row)
return temp
elif numarg == 2:
def temp(row, rownum, rows):
return func(row, rownum)
return temp
elif numarg == 3:
return func
def window(data, param):
"""
MAYBE WE CAN DO THIS WITH NUMPY??
data - list of records
"""
name = param.name # column to assign window function result
edges = param.edges # columns to gourp by
sort = param.sort # columns to sort by
value = wrap_function(param.value) # function that takes a record and returns a value (for aggregation)
aggregate = param.aggregate # WindowFunction to apply
_range = param.range # of form {"min":-10, "max":0} to specify the size and relative position of window
if aggregate == None and sort == None and edges == None:
#SIMPLE CALCULATED VALUE
for rownum, r in enumerate(data):
r[name] = value(r, rownum, data)
return
for rownum, r in enumerate(data):
r["__temp__"] = value(r, rownum, data)
for keys, values in groupby(data, edges):
if not values:
continue # CAN DO NOTHING WITH THIS ZERO-SAMPLE
sequence = struct.wrap(sort(values, sort))
head = nvl(_range.max, _range.stop)
tail = nvl(_range.min, _range.start)
#PRELOAD total
total = aggregate()
for i in range(head):
total += sequence[i].__temp__
#WINDOW FUNCTION APPLICATION
for i, r in enumerate(sequence):
r[name] = total.end()
total.add(sequence[i + head].__temp__)
total.sub(sequence[i + tail].__temp__)
for r in data:
r["__temp__"] = Null #CLEANUP
def groupby_size(data, size):
if hasattr(data, "next"):
iterator = data
elif hasattr(data, "__iter__"):
iterator = data.__iter__()
else:
Log.error("do not know how to handle this type")
done = []
def more():
output = []
for i in range(size):
try:
output.append(iterator.next())
except StopIteration:
done.append(True)
break
return output
#THIS IS LAZY
i = 0
while True:
output = more()
yield (i, output)
if len(done) > 0: break
i += 1
def groupby_Multiset(data, min_size, max_size):
# GROUP multiset BASED ON POPULATION OF EACH KEY, TRYING TO STAY IN min/max LIMITS
if min_size == None: min_size = 0
total = 0
i = 0
g = list()
for k, c in data.items():
if total < min_size or total + c < max_size:
total += c
g.append(k)
elif total < max_size:
yield (i, g)
i += 1
total = c
g = [k]
if total >= max_size:
Log.error("({{min}}, {{max}}) range is too strict given step of {{increment}}", {
"min": min_size, "max": max_size, "increment": c
})
if g:
yield (i, g)
def groupby_min_max_size(data, min_size=0, max_size=None, ):
if max_size == None:
max_size = sys.maxint
if hasattr(data, "__iter__"):
def _iter():
g=0
out=[]
for i, d in enumerate(data):
out.append(d)
if (i+1)%max_size==0:
yield g, out
g+=1
out=[]
if out:
yield g, out
return _iter()
elif not isinstance(data, Multiset):
return groupby_size(data, max_size)
else:
return groupby_Multiset(data, min_size, max_size)
class Cube():
def __init__(self, data=None, edges=None, name=None):
if isinstance(data, Cube): Log.error("do not know how to handle cubes yet")
columns = get_columns(data)
if edges == None:
self.edges = [{"name": "index", "domain": {"type": "numeric", "min": 0, "max": len(data), "interval": 1}}]
self.data = data
self.select = columns
return
self.name = name
self.edges = edges
self.select = Null
def get_columns(self):
return self.columns
class Domain():
def __init__(self):
pass
def part2key(self, part):
pass
def part2label(self, part):
pass
def part2value(self, part):
pass
# SIMPLE TUPLE-OF-STRINGS LOOKUP TO LIST
class Index(object):
def __init__(self, keys):
self._data = {}
self._keys = keys
self.count = 0
#THIS ONLY DEPENDS ON THE len(keys), SO WE COULD SHARED lookup
#BETWEEN ALL n-key INDEXES. FOR NOW, JUST MAKE lookup()
code = "def lookup(d0):\n"
for i, k in enumerate(self._keys):
code = code + indent(expand_template(
"for k{{next}}, d{{next}} in d{{curr}}.items():\n", {
"next": i + 1,
"curr": i
}), prefix=" ", indent=i + 1)
i = len(self._keys)
code = code + indent(expand_template(
"yield d{{curr}}", {"curr": i}), prefix=" ", indent=i + 1)
exec code
self.lookup = lookup
def __getitem__(self, key):
try:
if not isinstance(key, dict):
#WE WILL BE FORGIVING IF THE KEY IS NOT IN A LIST
if len(self._keys) > 1:
Log.error("Must be given an array of keys")
key = {self._keys[0]: key}
d = self._data
for k in self._keys:
v = key[k]
if v == None:
Log.error("can not handle when {{key}} == None", {"key": k})
if v not in d:
return Null
d = d[v]
if len(key) != len(self._keys):
#NOT A COMPLETE INDEXING, SO RETURN THE PARTIAL INDEX
output = Index(self._keys[-len(key):])
output._data = d
return output
except Exception, e:
Log.error("something went wrong", e)
def __setitem__(self, key, value):
Log.error("Not implemented")
def add(self, val):
if not isinstance(val, dict): val = {(self._keys[0], val)}
d = self._data
for k in self._keys[0:-1]:
v = val[k]
if v == None:
Log.error("can not handle when {{key}} == None", {"key": k})
if v not in d:
e = {}
d[v] = e
d = d[v]
v = val[self._keys[-1]]
if v in d:
Log.error("key already filled")
d[v] = val
self.count += 1
def __contains__(self, key):
return self[key] != None
def __iter__(self):
return self.lookup(self._data)
def __sub__(self, other):
output = Index(self._keys)
for v in self:
if v not in other: output.add(v)
return output
def __and__(self, other):
output = Index(self._keys)
for v in self:
if v in other: output.add(v)
return output
def __or__(self, other):
output = Index(self._keys)
for v in self: output.add(v)
for v in other: output.add(v)
return output
def __len__(self):
return self.count
def subtract(self, other):
return self.__sub__(other)
def intersect(self, other):
return self.__and__(other)

Просмотреть файл

@ -1,4 +0,0 @@
# encoding: utf-8
#
__author__ = 'klahnakoski'

Просмотреть файл

@ -1,162 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from ..logs import Log
from ..maths import Math
from ..multiset import Multiset
from ..stats import Z_moment, stats2z_moment, z_moment2stats
class AggregationFunction(object):
def __init__(self):
"""
RETURN A ZERO-STATE AGGREGATE
"""
Log.error("not implemented yet")
def add(self, value):
"""
ADD value TO AGGREGATE
"""
Log.error("not implemented yet")
def merge(self, agg):
"""
ADD TWO AGGREGATES TOGETHER
"""
Log.error("not implemented yet")
def end(self):
"""
RETURN AGGREGATE
"""
class WindowFunction(AggregationFunction):
def __init__(self):
"""
RETURN A ZERO-STATE AGGREGATE
"""
Log.error("not implemented yet")
def sub(self, value):
"""
REMOVE value FROM AGGREGATE
"""
Log.error("not implemented yet")
class Stats(WindowFunction):
def __init__(self):
object.__init__(self)
self.total=Z_moment(0,0,0)
def add(self, value):
if value == None:
return
self.total+=stats2z_moment(value)
def sub(self, value):
if value == None:
return
self.total-=stats2z_moment(value)
def merge(self, agg):
self.total+=agg.total
def end(self):
return z_moment2stats(self.total)
class Min(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = Multiset()
def add(self, value):
if value == None:
return
self.total.add(value)
def sub(self, value):
if value == None:
return
self.total.remove(value)
def end(self):
return Math.min(self.total)
class Max(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = Multiset()
def add(self, value):
if value == None:
return
self.total.add(value)
def sub(self, value):
if value == None:
return
self.total.remove(value)
def end(self):
return Math.max(self.total)
class Count(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = 0
def add(self, value):
if value == None:
return
self.total += 1
def sub(self, value):
if value == None:
return
self.total -= 1
def end(self):
return self.total
class Sum(WindowFunction):
def __init__(self):
object.__init__(self)
self.total = 0
def add(self, value):
if value == None:
return
self.total += value
def sub(self, value):
if value == None:
return
self.total -= value
def end(self):
return self.total

Просмотреть файл

@ -1,31 +0,0 @@
# encoding: utf-8
#
import random
import string
SIMPLE_ALPHABET=string.ascii_letters + string.digits
SEED=random.Random()
class Random(object):
@staticmethod
def string(length, alphabet=SIMPLE_ALPHABET):
result = ''
for i in range(0, length):
result += SEED.choice(alphabet)
return result
@staticmethod
def hex(length):
return Random.string(length, string.digits + 'ABCDEF')
@staticmethod
def int(*args):
return random.randrange(*args)
@staticmethod
def sample(data, count):
num = len(data)
return [data[Random.int(num)] for i in range(count)]

Просмотреть файл

@ -1,77 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import argparse
import struct
from .struct import listwrap
from .cnv import CNV
from .logs import Log
from .files import File
#PARAMETERS MATCH argparse.ArgumentParser.add_argument()
#http://docs.python.org/dev/library/argparse.html#the-add-argument-method
#name or flags - Either a name or a list of option strings, e.g. foo or -f, --foo.
#action - The basic type of action to be taken when this argument is encountered at the command line.
#nargs - The number of command-line arguments that should be consumed.
#const - A constant value required by some action and nargs selections.
#default - The value produced if the argument is absent from the command line.
#type - The type to which the command-line argument should be converted.
#choices - A container of the allowable values for the argument.
#required - Whether or not the command-line option may be omitted (optionals only).
#help - A brief description of what the argument does.
#metavar - A name for the argument in usage messages.
#dest - The name of the attribute to be added to the object returned by parse_args().
def _argparse(defs):
parser = argparse.ArgumentParser()
for d in listwrap(defs):
args = d.copy()
name = args.name
args.name = None
parser.add_argument(*listwrap(name).list, **args.dict)
namespace=parser.parse_args()
output={k: getattr(namespace, k) for k in vars(namespace)}
return struct.wrap(output)
def read_settings(filename=None, defs=None):
# READ SETTINGS
if filename:
settings_file = File(filename)
if not settings_file.exists:
Log.error("Can not file settings file {{filename}}", {
"filename": settings_file.abspath
})
json = settings_file.read()
settings = CNV.JSON2object(json, flexible=True)
if defs:
settings.args = _argparse(defs)
return settings
else:
defs=listwrap(defs)
defs.append({
"name": ["--settings", "--settings-file", "--settings_file"],
"help": "path to JSON file with settings",
"type": str,
"dest": "filename",
"default": "./settings.json",
"required": False
})
args = _argparse(defs)
settings_file = File(args.filename)
if not settings_file.exists:
Log.error("Can not file settings file {{filename}}", {
"filename": settings_file.abspath
})
json = settings_file.read()
settings = CNV.JSON2object(json, flexible=True)
settings.args = args
return settings

Просмотреть файл

@ -1,162 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from math import sqrt
from .struct import nvl
from .logs import Log
DEBUG=True
EPSILON=0.000001
def stats2z_moment(stats):
# MODIFIED FROM http://statsmodels.sourceforge.net/devel/_modules/statsmodels/stats/moment_helpers.html
# ADDED count
# FIXED ERROR IN COEFFICIENTS
mc0, mc1, mc2, skew, kurt = (stats.count, stats.mean, stats.variance, stats.skew, stats.kurtosis)
mz0 = mc0
mz1 = mc1 * mc0
mz2 = (mc2 + mc1*mc1)*mc0
mc3 = skew*(mc2**1.5) # 3rd central moment
mz3 = (mc3 + 3*mc1*mc2 - mc1**3)*mc0 # 3rd non-central moment
mc4 = (kurt+3.0)*(mc2**2.0) # 4th central moment
mz4 = (mc4 + 4*mc1*mc3 + 6*mc1*mc1*mc2 + mc1**4) * mc0
m=Z_moment(stats.count, mz1, mz2, mz3, mz4)
if DEBUG:
v = z_moment2stats(m, unbiased=False)
if not closeEnough(v.count, stats.count): Log.error("convertion error")
if not closeEnough(v.mean, stats.mean): Log.error("convertion error")
if not closeEnough(v.variance, stats.variance):
Log.error("convertion error")
return m
def closeEnough(a, b):
if abs(a-b)<=EPSILON*(abs(a)+abs(b)+1): return True
return False
def z_moment2stats(z_moment, unbiased=True):
free=0
if unbiased: free=1
N=z_moment.S[0]
if N==0: return Stats()
return Stats(
count=N,
mean=z_moment.S[1] / N if N > 0 else float('nan'),
variance=(z_moment.S[2] - (z_moment.S[1] ** 2) / N) / (N - free) if N - free > 0 else float('nan'),
unbiased=unbiased
)
class Stats(object):
def __init__(self, **args):
if "count" not in args:
self.count=0
self.mean=0
self.variance=0
self.skew=0
self.kurtosis=0
elif "mean" not in args:
self.count=args["count"]
self.mean=0
self.variance=0
self.skew=0
self.kurtosis=0
elif "variance" not in args and "std" not in args:
self.count=args["count"]
self.mean=args["mean"]
self.variance=0
self.skew=0
self.kurtosis=0
elif "skew" not in args:
self.count=args["count"]
self.mean=args["mean"]
self.variance=args["variance"] if "variance" in args else args["std"]**2
self.skew=0
self.kurtosis=0
elif "kurtosis" not in args:
self.count=args["count"]
self.mean=args["mean"]
self.variance=args["variance"] if "variance" in args else args["std"]**2
self.skew=args["skew"]
self.kurtosis=0
else:
self.count=args["count"]
self.mean=args["mean"]
self.variance=args["variance"] if "variance" in args else args["std"]**2
self.skew=args["skew"]
self.kurtosis=args["kurtosis"]
self.unbiased=\
args["unbiased"] if "unbiased" in args else \
not args["biased"] if "biased" in args else \
False
@property
def std(self):
return sqrt(self.variance)
class Z_moment(object):
"""
ZERO-CENTERED MOMENTS
"""
def __init__(self, *args):
self.S=tuple(args)
def __add__(self, other):
return Z_moment(*map(add, self.S, other.S))
def __sub__(self, other):
return Z_moment(*map(sub, self.S, other.S))
@property
def tuple(self):
#RETURN AS ORDERED TUPLE
return self.S
@property
def dict(self):
#RETURN HASH OF SUMS
return {"s"+unicode(i): m for i, m in enumerate(self.S)}
@staticmethod
def new_instance(values=None):
if values == None: return Z_moment()
values=[float(v) for v in values if v != None]
return Z_moment(
len(values),
sum([n for n in values]),
sum([pow(n, 2) for n in values]),
sum([pow(n, 3) for n in values]),
sum([pow(n, 4) for n in values])
)
def add(a,b):
return nvl(a, 0)+nvl(b,0)
def sub(a,b):
return nvl(a, 0)-nvl(b,0)

Просмотреть файл

@ -1,133 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import re
from .jsons import json_encoder
import struct
from .struct import Struct
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
def indent(value, prefix=u"\t", indent=None):
if indent != None:
prefix=prefix*indent
try:
content=value.rstrip()
suffix=value[len(content):]
lines=content.splitlines()
return prefix+(u"\n"+prefix).join(lines)+suffix
except Exception, e:
raise Exception(u"Problem with indent of value ("+e.message+u")\n"+unicode(value))
def outdent(value):
try:
num=100
lines=value.splitlines()
for l in lines:
trim=len(l.lstrip())
if trim>0: num=min(num, len(l)-len(l.lstrip()))
return u"\n".join([l[num:] for l in lines])
except Exception, e:
from .logs import Log
Log.error("can not outdent value", e)
def between(value, prefix, suffix):
s = value.find(prefix)
if s==-1: return None
s+=len(prefix)
e=value.find(suffix, s)
if e==-1:
return None
s=value.rfind(prefix, 0, e)+len(prefix) #WE KNOW THIS EXISTS, BUT THERE MAY BE A RIGHT-MORE ONE
return value[s:e]
def right(value, len):
if len<=0: return u""
return value[-len:]
def find_first(value, find_arr, start=0):
i=len(value)
for f in find_arr:
temp=value.find(f, start)
if temp==-1: continue
i=min(i, temp)
if i==len(value): return -1
return i
pattern=re.compile(r"(\{\{[\w_\.]+\}\})")
def expand_template(template, values):
values=struct.wrap(values)
def replacer(found):
var=found.group(1)
try:
val=values[var[2:-2]]
val=toString(val)
return val
except Exception, e:
try:
if e.message.find(u"is not JSON serializable"):
#WORK HARDER
val=toString(val)
return val
except Exception:
raise Exception(u"Can not find "+var[2:-2]+u" in template:\n"+indent(template), e)
return pattern.sub(replacer, template)
def toString(val):
if isinstance(val, Struct):
return json_encoder.encode(val.dict, pretty=True)
elif isinstance(val, dict) or isinstance(val, list) or isinstance(val, set):
val=json_encoder.encode(val, pretty=True)
return val
return unicode(val)
def edit_distance(s1, s2):
"""
FROM http://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Levenshtein_distance#Python
LICENCE http://creativecommons.org/licenses/by-sa/3.0/
"""
if len(s1) < len(s2):
return edit_distance(s2, s1)
# len(s1) >= len(s2)
if len(s2) == 0:
return 1.0
previous_row = xrange(len(s2) + 1)
for i, c1 in enumerate(s1):
current_row = [i + 1]
for j, c2 in enumerate(s2):
insertions = previous_row[j + 1] + 1 # j+1 instead of j since previous_row and current_row are one character longer
deletions = current_row[j] + 1 # than s2
substitutions = previous_row[j] + (c1 != c2)
current_row.append(min(insertions, deletions, substitutions))
previous_row = current_row
return float(previous_row[-1])/len(s1)

Просмотреть файл

@ -1,343 +0,0 @@
# encoding: utf-8
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
SPECIAL=["keys", "values", "items", "iteritems", "dict", "copy"]
class Struct(dict):
"""
Struct is an anonymous class with some properties good for manipulating JSON
0) a.b==a["b"]
1) the IDE does tab completion, so my spelling mistakes get found at "compile time"
2) it deals with missing keys gracefully, so I can put it into set operations (database operations) without choking
2b) missing keys is important when dealing with JSON, which is often almost anything
3) also, which I hardly use, is storing JSON paths in a variable, so : a["b.c"]==a.b.c
MORE ON MISSING VALUES: http://www.numpy.org/NA-overview.html
IT ONLY CONSIDERS THE LEGITIMATE-FIELD-WITH-MISSING-VALUE (Statistical Null)
AND DOES NOT LOOK AT FIELD-DOES-NOT-EXIST-IN-THIS-CONTEXT (Database Null)
"""
def __init__(self, **kwargs):
dict.__init__(self)
object.__setattr__(self, "__dict__", kwargs) #map IS A COPY OF THE PARAMETERS
def __bool__(self):
return True
def __nonzero__(self):
return True
def __str__(self):
return dict.__str__(object.__getattribute__(self, "__dict__"))
def __getitem__(self, key):
d=object.__getattribute__(self, "__dict__")
if key.find(".")>=0:
key=key.replace("\.", "\a")
seq=[k.replace("\a", ".") for k in key.split(".")]
for n in seq:
d=d.get(n, None)
if d == None:
return Null
d=unwrap(d)
return wrap(d)
return wrap(d.get(key, Null))
def __setitem__(self, key, value):
try:
d=object.__getattribute__(self, "__dict__")
value=unwrap(value)
if key.find(".") == -1:
if value is None:
d.pop(key, None)
else:
d[key] = value
return self
key=key.replace("\.", "\a")
seq=[k.replace("\a", ".") for k in key.split(".")]
for k in seq[:-1]:
d = d[k]
if value == None:
d.pop(seq[-1], None)
else:
d[seq[-1]] = value
return self
except Exception, e:
raise e
def __getattribute__(self, key):
d=object.__getattribute__(self, "__dict__")
if key not in SPECIAL:
return wrap(d.get(key, Null))
#SOME dict FUNCTIONS
if key == "items":
def temp():
_is = dict.__getattribute__(d, "items")
return [(k, wrap(v)) for k, v in _is()]
return temp
if key == "iteritems":
#LOW LEVEL ITERATION
return d.iteritems
if key=="keys":
def temp():
k=dict.__getattribute__(d, "keys")
return set(k())
return temp
if key=="values":
def temp():
vs=dict.__getattribute__(d, "values")
return [wrap(v) for v in vs()]
return temp
if key=="dict":
return d
if key=="copy":
o = wrap({k: v for k, v in d.items()})
def output():
return o
return output
def __setattr__(self, key, value):
Struct.__setitem__(self, key, value)
# dict.__setattr__(self, unicode(key), value)
def __delitem__(self, key):
d=object.__getattribute__(self, "__dict__")
if key.find(".") == -1:
d.pop(key, None)
key=key.replace("\.", "\a")
seq=[k.replace("\a", ".") for k in key.split(".")]
for k in seq[:-1]:
d = d[k]
d.pop(seq[-1], None)
def keys(self):
d=object.__getattribute__(self, "__dict__")
return d.keys()
# KEEP TRACK OF WHAT ATTRIBUTES ARE REQUESTED, MAYBE SOME (BUILTIN) ARE STILL USEFUL
requested = set()
class NullStruct(object):
"""
Structural Null provides closure under the dot (.) operator
Null[x] == Null
Null.x == Null
"""
def __init__(self):
pass
def __bool__(self):
return False
def __nonzero__(self):
return False
def __gt__(self, other):
return False
def __ge__(self, other):
return False
def __le__(self, other):
return False
def __lt__(self, other):
return False
def __eq__(self, other):
return other is Null or other is None
def __ne__(self, other):
return other is not Null and other is not None
def __getitem__(self, key):
return self
def __len__(self):
return 0
def __iter__(self):
return ZeroList.__iter__()
def __getattribute__(self, key):
if key not in SPECIAL:
return Null
#SOME dict FUNCTIONS
if key == "items":
def temp():
return ZeroList
return temp
if key == "iteritems":
#LOW LEVEL ITERATION
return self.__iter__()
if key=="keys":
def temp():
return ZeroList
return temp
if key=="values":
def temp():
return ZeroList
return temp
if key=="dict":
return Null
if key=="copy":
#THE INTENT IS USUALLY PREPARE FOR UPDATES
def output():
return Struct()
return output
def keys(self):
return set()
def __str__(self):
return "None"
Null = NullStruct()
ZeroList=[]
class StructList(list):
def __init__(self, vals=None):
""" USE THE vals, NOT A COPY """
list.__init__(self)
if vals == None:
self.list=[]
elif isinstance(vals, StructList):
self.list=vals.list
else:
self.list=vals
def __getitem__(self, index):
if index < 0 or len(self.list) <= index:
return Null
return wrap(self.list[index])
def __setitem__(self, i, y):
self.list[i]=unwrap(y)
def __iter__(self):
i=self.list.__iter__()
while True:
yield wrap(i.next())
def append(self, val):
self.list.append(unwrap(val))
return self
def __str__(self):
return self.list.__str__()
def __len__(self):
return self.list.__len__()
def __getslice__(self, i, j):
return wrap(self.list[i:j])
def remove(self, x):
self.list.remove(x)
return self
def extend(self, values):
for v in values:
self.list.append(unwrap(v))
return self
def wrap(v):
if v == None:
return Null
if isinstance(v, (Struct, StructList)):
return v
if isinstance(v, dict):
m = Struct()
object.__setattr__(m, "__dict__", v) #INJECT m.__dict__=v SO THERE IS NO COPY
return m
if isinstance(v, list):
return StructList(v)
return v
def unwrap(v):
if isinstance(v, Struct):
return object.__getattribute__(v, "__dict__")
if isinstance(v, StructList):
return v.list
if v == None:
return None
return v
def inverse(d):
"""
reverse the k:v pairs
"""
output = {}
for k, v in unwrap(d).iteritems():
output[v] = output.get(v, [])
output[v].append(k)
return wrap(output)
def nvl(*args):
#pick the first none-null value
for a in args:
if a != None:
return a
return Null
def listwrap(value):
"""
OFTEN IT IS NICE TO ALLOW FUNCTION PARAMETERS TO BE ASSIGNED A VALUE,
OR A list-OF-VALUES, OR NULL. CHECKING FOR THIS IS TEDIOUS AND WE WANT TO CAST
FROM THOSE THREE CASES TO THE SINGLE CASE OF A LIST
Null -> []
value -> [value]
[...] -> [...] (unchanged list)
#BEFORE
if a is not None:
if not isinstance(a, list):
a=[a]
for x in a:
#do something
#AFTER
for x in listwrap(a):
#do something
"""
if value == None:
return []
elif isinstance(value, list):
return wrap(value)
else:
return wrap([value])

Просмотреть файл

@ -1,365 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
from datetime import datetime, timedelta
import threading
import thread
import time
DEBUG = True
class Lock(object):
"""
SIMPLE LOCK (ACTUALLY, A PYTHON threadind.Condition() WITH notify() BEFORE EVERY RELEASE)
"""
def __init__(self, name=""):
self.monitor=threading.Condition()
self.name=name
def __enter__(self):
self.monitor.acquire()
return self
def __exit__(self, a, b, c):
self.monitor.notify()
self.monitor.release()
def wait(self, timeout=None, till=None):
if till:
timeout=(datetime.utcnow()-till).total_seconds()
if timeout<0:
return
self.monitor.wait(timeout=timeout)
def notify_all(self):
self.monitor.notify_all()
# SIMPLE MESSAGE QUEUE, multiprocessing.Queue REQUIRES SERIALIZATION, WHICH IS HARD TO USE JUST BETWEEN THREADS
class Queue(object):
def __init__(self):
self.keep_running=True
self.lock=Lock("lock for queue")
self.queue=[]
def __iter__(self):
while self.keep_running:
try:
value=self.pop()
if value!=Thread.STOP:
yield value
except Exception, e:
from .logs import Log
Log.warning("Tell me about what happend here", e)
def add(self, value):
with self.lock:
if self.keep_running:
self.queue.append(value)
return self
def extend(self, values):
with self.lock:
if self.keep_running:
self.queue.extend(values)
def __len__(self):
with self.lock:
return len(self.queue)
def pop(self):
with self.lock:
while self.keep_running:
if self.queue:
value=self.queue.pop(0)
if value==Thread.STOP: #SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION
self.keep_running=False
return value
self.lock.wait()
return Thread.STOP
def pop_all(self):
"""
NON-BLOCKING POP ALL IN QUEUE, IF ANY
"""
with self.lock:
if not self.keep_running:
return [Thread.STOP]
if not self.queue:
return []
for v in self.queue:
if v == Thread.STOP: #SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION
self.keep_running = False
output = list(self.queue)
del self.queue[:] #CLEAR
return output
def close(self):
with self.lock:
self.keep_running=False
class AllThread(object):
"""
RUN ALL ADDED FUNCTIONS IN PARALLEL, BE SURE TO HAVE JOINED BEFORE EXIT
"""
def __init__(self):
self.threads=[]
def __enter__(self):
return self
#WAIT FOR ALL QUEUED WORK TO BE DONE BEFORE RETURNING
def __exit__(self, type, value, traceback):
self.join()
def join(self):
exceptions=[]
try:
for t in self.threads:
response=t.join()
if "exception" in response:
exceptions.append(response["exception"])
except Exception, e:
from .logs import Log
Log.warning("Problem joining", e)
if exceptions:
from .logs import Log
Log.error("Problem in child threads", exceptions)
def add(self, target, *args, **kwargs):
"""
target IS THE FUNCTION TO EXECUTE IN THE THREAD
"""
t=Thread.run(target, *args, **kwargs)
self.threads.append(t)
class Thread(object):
"""
join() ENHANCED TO ALLOW CAPTURE OF CTRL-C, AND RETURN POSSIBLE THREAD EXCEPTIONS
run() ENHANCED TO CAPTURE EXCEPTIONS
"""
num_threads=0
STOP="stop"
TIMEOUT="TIMEOUT"
def __init__(self, name, target, *args, **kwargs):
self.name = name
self.target = target
self.response = None
self.synch_lock=Lock()
self.args = args
#ENSURE THERE IS A SHARED please_stop SIGNAL
self.kwargs = kwargs.copy()
self.kwargs["please_stop"]=self.kwargs.get("please_stop", Signal())
self.please_stop=self.kwargs["please_stop"]
self.stopped=Signal()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if isinstance(type, BaseException):
self.please_stop.go()
# TODO: AFTER A WHILE START KILLING THREAD
self.join()
def start(self):
try:
self.thread=thread.start_new_thread(Thread._run, (self, ))
except Exception, e:
from .logs import Log
Log.error("Can not start thread", e)
def stop(self):
self.please_stop.go()
def _run(self):
try:
if self.target is not None:
response=self.target(*self.args, **self.kwargs)
with self.synch_lock:
self.response={"response":response}
except Exception, e:
with self.synch_lock:
self.response={"exception":e}
from .logs import Log
Log.error("Problem in thread", e)
finally:
self.stopped.go()
del self.target, self.args, self.kwargs
def is_alive(self):
return not self.stopped
def join(self, timeout=None, till=None):
"""
RETURN THE RESULT OF THE THREAD EXECUTION (INCLUDING EXCEPTION)
"""
if not till and timeout:
till=datetime.utcnow()+timedelta(seconds=timeout)
if till is None:
while True:
with self.synch_lock:
for i in range(10):
if self.stopped:
return self.response
self.synch_lock.wait(0.5)
from .logs import Log
if DEBUG:
Log.note("Waiting on thread {{thread}}", {"thread":self.name})
else:
self.stopped.wait_for_go(till=till)
if self.stopped:
return self.response
else:
from logs import Except
raise Except(type=Thread.TIMEOUT)
@staticmethod
def run(target, *args, **kwargs):
#ENSURE target HAS please_stop ARGUMENT
if "please_stop" not in target.__code__.co_varnames:
from logs import Log
Log.error("function must have please_stop argument for signalling emergency shutdown")
if hasattr(target, "func_name") and target.func_name != "<lambda>":
name = "thread-" + str(Thread.num_threads) + " (" + target.func_name + ")"
else:
name = "thread-" + str(Thread.num_threads)
Thread.num_threads += 1
output = Thread(name, target, *args, **kwargs)
output.start()
return output
@staticmethod
def sleep(seconds=None, till=None):
if seconds is not None:
time.sleep(seconds)
if till is not None:
duration = (till - datetime.utcnow()).total_seconds()
if duration > 0:
time.sleep(duration)
class Signal(object):
"""
SINGLE-USE THREAD SAFE SIGNAL
"""
def __init__(self):
self.lock = Lock()
self._go = False
self.job_queue=[]
def __bool__(self):
with self.lock:
return self._go
def __nonzero__(self):
with self.lock:
return self._go
def wait_for_go(self, timeout=None, till=None):
with self.lock:
while not self._go:
self.lock.wait(timeout=timeout, till=till)
return True
def go(self):
with self.lock:
if self._go:
return
self._go = True
jobs=self.job_queue
self.job_queue=[]
self.lock.notify_all()
for j in jobs:
j()
def is_go(self):
with self.lock:
return self._go
def on_go(self, target):
"""
RUN target WHEN SIGNALED
"""
with self.lock:
if self._go:
target()
else:
self.job_queue.append(target)
class ThreadedQueue(Queue):
"""
TODO: Check that this queue is not dropping items at shutdown
DISPATCH TO ANOTHER (SLOWER) queue IN BATCHES OF GIVEN size
"""
def __init__(self, queue, size):
Queue.__init__(self)
def push_to_queue(please_stop):
please_stop.on_go(lambda : self.add(Thread.STOP))
#output_queue IS A MULTI-THREADED QUEUE, SO THIS WILL BLOCK UNTIL THE 5K ARE READY
from .queries import Q
for i, g in Q.groupby(self, size=size):
queue.extend(g)
if please_stop:
from logs import Log
Log.warning("ThreadedQueue stopped early, with {{num}} items left in queue", {
"num":len(self)
})
return
self.thread=Thread.run(push_to_queue)
def __enter__(self):
return self
def __exit__(self, a, b, c):
self.add(Thread.STOP)
if isinstance(b, BaseException):
self.thread.please_stop.go()
self.thread.join()

Просмотреть файл

@ -1,49 +0,0 @@
# encoding: utf-8
#
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Author: Kyle Lahnakoski (kyle@lahnakoski.com)
#
import time
from .strings import expand_template
from .logs import Log
class Timer:
"""
USAGE:
with Timer("doing hard time"):
something_that_takes_long()
OUTPUT:
doing hard time took 45.468 sec
"""
def __init__(self, description, param=None):
self.description=expand_template(description, param) #WE WOULD LIKE TO KEEP THIS TEMPLATE, AND PASS IT TO THE LOGGER ON __exit__(), WE FAKE IT FOR NOW
def __enter__(self):
Log.note("Timer start: {{description}}", {
"description":self.description
})
self.start = time.clock()
return self
def __exit__(self, type, value, traceback):
self.end = time.clock()
self.interval = self.end - self.start
Log.note("Timer end : {{description}} (took {{duration}} sec)", {
"description":self.description,
"duration":round(self.interval, 3)
})
@property
def duration(self):
return self.interval

Просмотреть файл

@ -1,3 +0,0 @@
Flask==0.9
requests==1.2.3

32
setup.py Normal file
Просмотреть файл

@ -0,0 +1,32 @@
import os
from setuptools import setup
root = os.path.abspath(os.path.dirname(__file__))
path = lambda *p: os.path.join(root, *p)
try:
long_desc = open(path('README.txt')).read()
except Exception:
long_desc = "<Missing README.txt>"
print "Missing README.txt"
setup(
name='esFrontLine',
version="0.9.13316",
description='Limit restful requests to backend ElasticSearch cluster: Queries only.',
long_description=long_desc,
author='Kyle Lahnakoski',
author_email='kyle@lahnakoski.com',
url='https://github.com/klahnakoski/esFrontLine',
license='MPL 2.0',
packages=['esFrontLine'],
install_requires=['Flask==0.9', 'requests==1.2.3'],
include_package_data=True,
zip_safe=False,
classifiers=[ #https://pypi.python.org/pypi?%3Aaction=list_classifiers
"Development Status :: 2 - Pre-Alpha",
"Topic :: Internet :: Proxy Servers",
"License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)",
]
)