pjs/webtools/new-graph/getdata.cgi

175 строки
4.5 KiB
Python
Executable File

#!/usr/bin/python
import cgitb; cgitb.enable()
import os
import sys
import cgi
import time
import re
import gzip
import cStringIO
from pysqlite2 import dbapi2 as sqlite
#
# All objects are returned in the form:
# {
# resultcode: n,
# ...
# }
#
# The ... is dependant on the result type.
#
# Result codes:
# 0 success
# -1 bad tinderbox
# -2 bad test name
#
# incoming query string:
# tbox=name
# tinderbox name
#
# If only tbox specified, returns array of test names for that tinderbox in data
# If invalid tbox specified, returns error -1
#
# test=testname
# test name
#
# Returns results for that test in .results, in array of [time0, value0, time1, value1, ...]
# Also reteurns .annotations, in array of [time0, string0, time1, string1, ...]
#
# raw=1
# Same as full results, but includes raw data for test, in form [time0, value0, rawresult0, ...]
#
# starttime=tval
# Start time to return results from, in seconds since GMT epoch
# endtime=tval
# End time, in seconds since GMT epoch
#
def doError(errCode):
errString = "unknown error"
if errCode == -1:
errString = "bad tinderbox"
elif errCode == -2:
errString = "bad test name"
print "{ resultcode: " + str(errCode) + ", error: '" + errString + "' }"
db = None
def doListTinderboxes(fo):
fo.write ("{ resultcode: 0, results: [")
for f in os.listdir("db"):
if f[-7:] == ".sqlite":
fo.write ("'" + f[:-7] + "',")
fo.write ("] }")
def doListTests(fo, tbox):
fo.write ("{ resultcode: 0, results: [")
cur = db.cursor()
cur.execute("SELECT DISTINCT(test_name) FROM test_results")
for row in cur:
fo.write ("'" + row[0] + "',")
cur.close()
fo.write ("] }")
def doSendResults(fo, tbox, test, starttime, endtime, raw):
raws = ""
if raw is not None:
raws = ", test_data"
s1 = ""
s2 = ""
if starttime is not None:
s1 = " AND test_time >= " + starttime
if endtime is not None:
s2 = " AND test_time <= " + endtime
stmt = "SELECT test_time, test_value" + raws + " FROM test_results WHERE test_name = '" + test + "' " + s1 + s2 + " ORDER BY test_time";
fo.write ("{ resultcode: 0, results: [")
cur = db.cursor()
cur.execute(stmt)
for row in cur:
if row[1] == 'nan':
continue
if raw:
fo.write ("%s,%s,'%s'," % (row[0], row[1], row[2]))
else:
fo.write ("%s,%s," % (row[0], row[1]))
cur.close()
fo.write ("],")
cur = db.cursor()
if starttime is not None and endtime is not None:
cur.execute("SELECT anno_time, anno_string FROM annotations WHERE anno_time >= " + starttime + " AND anno_time <= " + endtime + " ORDER BY anno_time")
elif starttime is not None:
cur.execute("SELECT anno_time, anno_string FROM annotations WHERE anno_time >= " + starttime + " ORDER BY anno_time")
elif endtime is not None:
cur.execute("SELECT anno_time, anno_string FROM annotations WHERE anno_time <= " + endtime + " ORDER BY anno_time")
else:
cur.execute("SELECT anno_time, anno_string FROM annotations ORDER BY anno_time")
fo.write ("annotations: [")
for row in cur:
fo.write("%s,'%s'," % (row[0], row[1]))
cur.close()
fo.write ("] }")
def main():
doGzip = 0
try:
if string.find(os.environ["HTTP_ACCEPT_ENCODING"], "gzip") != -1:
doGzip = 1
except:
pass
form = cgi.FieldStorage()
tbox = form.getfirst("tbox")
test = form.getfirst("test")
raw = form.getfirst("raw")
starttime = form.getfirst("starttime")
endtime = form.getfirst("endtime")
zbuf = cStringIO.StringIO()
zfile = zbuf
if doGzip == 1:
zfile = gzip.GzipFile(mode = 'wb', fileobj = zbuf, compresslevel = 5)
if tbox is None:
doListTinderboxes(zfile)
else:
if re.match(r"[^A-Za-z0-9]", tbox):
doError(-1)
return
dbfile = "db/" + tbox + ".sqlite"
if not os.path.isfile(dbfile):
doError(-1)
return
global db
db = sqlite.connect(dbfile)
db.text_factory = sqlite.OptimizedUnicode
if test is None:
doListTests(zfile, tbox)
else:
doSendResults(zfile, tbox, test, starttime, endtime, raw)
sys.stdout.write("Content-Type: text/plain\n")
if doGzip == 1:
zfile.close()
sys.stdout.write("Content-Encoding: gzip\n")
sys.stdout.write("\n")
sys.stdout.write(zbuf.getvalue())
main()