Tested an ready for deployment I think

This commit is contained in:
Jonas Finnemann Jensen 2013-11-04 15:55:15 -08:00
Родитель f01adc375c
Коммит 342a331465
17 изменённых файлов: 1852 добавлений и 313 удалений

99
Formats.mkd Normal file
Просмотреть файл

@ -0,0 +1,99 @@
File Format used for Telemetry Dashboard
========================================
_All format described here are internal, not for external consumption._
External users should include `telemetry.js` and consume data through this
interface. Reading the raw data is hard and these data format may change, but
the javascript interface is designed to be reasonably stable.
Processor Output Format
-----------------------
/my/dim/../ JSON
JSON:
revision:
buildid:
histogram:
Web Facing Bucket Layout
------------------------
LATEST_VERSION = v2
v1/
data
v2/
check-points.json
check-points/ (one for every week)
YYYYMMDDhhmmss/
versions.json
FILES_PROCESSED
FILES_MISSING
<channel>/<version>/
MEASURE-by-build-date.json
MEASURE-by-submission-date.json
filter-tree.json
histograms.json
revisions.json
latest-current.json = most recent current, contents of versions.json
current/
YYYYMMDDhhmmss/
versions.json
FILES_PROCESSED
FILES_MISSING
<channel>/<version>/
MEASURE-by-build-date.json
MEASURE-by-submission-date.json
filter-tree.json
histograms.json
revisions.json
Web Facing Format
-----------------
/<channel>/<version>
MEASURE.json
{
<filter_id>: [
bucket0,
bucket1,
...,
bucketN,
sum, # -1, if missing
log_sum, # -1, if missing
log_sum_squares, # -1, if missing
sum_squares_lo, # -1, if missing
sum_squares_hi, # -1, if missing
count
],
<filter_id>...
}
filters.json
{
_id: filter_id,
name: "filter-name",
<option>: {
<subtree>
}
}
histograms.json
{
MEASURE: {
description: ...
...
}
}

Просмотреть файл

@ -1 +0,0 @@
html/*

Просмотреть файл

@ -1,17 +1,5 @@
FILES = histogram_tools.py Histograms.json specs.py dashboard.zip
all: $(FILES)
Histograms.json:
wget -c http://hg.mozilla.org/mozilla-central/raw-file/tip/toolkit/components/telemetry/Histograms.json -O $@
histogram_tools.py:
wget -c http://hg.mozilla.org/mozilla-central/raw-file/tip/toolkit/components/telemetry/histogram_tools.py -O $@
specs.py: Histograms.json
python specgen.py $< > $@
dashboard.zip: specs.py processor.py auxiliary.py
zip $@ $?
egg:
python setup.py bdist_egg
clean:
rm -f $(FILES) *.pyc
rm -rf dist build telemetry_dashboard.egg-info

Просмотреть файл

@ -1,57 +1,42 @@
#Telemetry Dashboard
Telemetry Dashboard
===================
Telemetry dashboard is an analysis job that aggregates telemetry histograms and
simple measures, and offers an decent presentation. The default dashboard
developed in this repository is hosted at
(telemetry.mozilla.com)[http://telemetry.mozilla.com]. But the aggregated data
is also available for consumption by third-party applications, so you don't need
to do the aggregation on your own.
Generate static files for a telemetry dashboard.
Consuming Telemetry Aggregations
--------------------------------
Include into your code `http://telemetry.mozilla.com/js/telemetry.js` feel free
to use the other modules too.
Don't go about reading the raw JSON files, they are not designed for human
consumption!
#How to Run
Hacking Telemetry Dashboard
---------------------------
If you want to improve the user-interface for telemetry dashboard, clone this
repository, setup a static server that hosts the `html/` folder on our localhost
and start hacking. This is easy!
You'll need to have `mango` set up in your .ssh_config to connect you to the hadoop node where you'll run jydoop from.
```
Run `script/bootstrap`
Serve the `html/` dir
```
##Histogram View
There are x fields to narrow query by
have a category table that stores category tree:
Each node has a unique id
Level1 Product: Firefox|Fennec|Thunderbird
Level2 Platform: Windows|Linux
Level3 etc
size of this table can be kept in check by reducing common videocards to a family name, etc
Can also customize what shows up under different levels..For example we could restrict tb, to have less childnodes.
Store the tree in a table, but keep it read into memory for queries, inserting new records
Then have a histogram table where
columns: histogram_id | category_id | value
where histogram_id is id like SHUTDOWN_OK, category id is a key from category table, value is the sum of histograms in that category...can be represented with some binary value
##Misc
Evolution can be implemented by adding a build_date field to histogram table
TODO:
How big would the category tree table be..surely there is a finite size for that
histogram table would be |category_table| * |number of histograms|, pretty compact
### Map + Reduce
Mapper should turn each submission into
<key> <data> which looks like
buildid/channel/reason/appName/appVersion/OS/osVersion/arch {histograms:{A11Y_CONSUMERS:{histogram_data}, ...} simpleMeasures:{firstPaint:[100,101,1000...]}}
Where key identifies where in the filter tree the data should live..Note a single packet could produce more than 1 such entry if we want to get into detailed breakdowns of gfxCard vs some FX UI animation histogram
Reducer would then take above data and sum up histograms + append to simple measure lists based on key
This should produce a fairly small file per day per channel(~200 records). Which will then be quick to pull out and merge into the per-build-per-histogram-json that can be rsynced to some webserver. This basically a final iterative REDUCE on top of map-reduce for new data. Hadoop does not feel like the right option for that, but I could be wrong.
###todo:
* oneline local testing using Jython's FileDriver.py
If you want to add new aggregations, or improve upon existing aggregations,
change the storage format, take a look at `Formats.mkd`. Talk to the guy who is
maintaining telemetry dashboard.
Basic flow is as follows:
1. An `.egg` file is generated with `make egg`
2. Analysis tasks are created with telemetry-server
3. `DashboardProcessor` from `analysis.py` aggregated telemetry submissions,
this process is driven by telemetry-server.
4. `Aggregator` from `aggregator.py` collects results from analysis tasks, by:
1. Downloads existing data from s3
2. Fetch task finished messages from SQS
3. Download `result.txt` files in parallel
4. Updates results on disk
5. Publishes updated results in a new subfolder of `current/` on s3, every
once in a while.
6. Check points all aggregated data to a subfolder of `check-points/` on s3,
every once in a while.
7. Repeat

291
dashboard/aggregator.py Normal file
Просмотреть файл

@ -0,0 +1,291 @@
#!/usr/bin/env python
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from boto.sqs import connect_to_region as sqs_connect
from boto.s3 import connect_to_region as s3_connect
from boto.s3.key import Key
from boto.sqs.jsonmessage import JSONMessage
from multiprocessing import Process, Queue, cpu_count
from time import sleep
import os, sys, shutil, gzip
import json
from cStringIO import StringIO
from datetime import datetime
from utils import mkdirp
from results2disk import results2disk
from s3put import s3put
from s3get import s3get, Downloader
class Aggregator:
def __init__(self, input_queue, work_folder, bucket, prefix, region, aws_cred):
self.input_queue_name = input_queue
self.work_folder = work_folder
self.data_folder = os.path.join(work_folder, 'data')
self.bucket_name = bucket
self.prefix = prefix
self.region = region
self.aws_cred = aws_cred
self.analysis_bucket_name = "jonasfj-telemetry-analysis"
if self.prefix != '' and not self.prefix.endswith('/'):
self.prefix += '/'
# Clear the work folder
shutil.rmtree(self.work_folder, ignore_errors = True)
mkdirp(self.data_folder)
self.s3 = s3_connect(self.region, **self.aws_cred)
self.bucket = self.s3.get_bucket(self.bucket_name, validate = False)
self.analysis_bucket = self.s3.get_bucket(self.analysis_bucket_name,
validate = False)
def s3get_json(self, prefix, decompress, fallback_value = None):
k = self.bucket.get_key(self.prefix + prefix)
if k is None:
return fallback_value
data = k.get_contents_as_string()
if decompress:
fobj = StringIO(data)
with gzip.GzipFile(mode = 'rb', fileobj = fobj) as zobj:
data = zobj.read()
fobj.close()
return json.loads(data)
def s3put_json(self, prefix, compress, value):
k = Key(self.bucket)
k.key = self.prefix + prefix
data = json.dumps(value)
headers = {
'Content-Type': 'application/json'
}
if compress:
fobj = StringIO()
with gzip.GzipFile(mode = 'wb', fileobj = fobj) as zobj:
zobj.write(data)
data = fobj.getvalue()
fobj.close()
headers['Content-Encoding'] = 'gzip'
k.set_contents_from_string(data, headers = headers)
def download_latest(self):
# Get latest-current.json
latest_current = self.s3get_json('latest-current.json', True)
# Get checkpoints.json
self.checkpoints = self.s3get_json('check-points.json', True, [])
# Get files to work folder
if latest_current != None:
current_prefix = self.prefix + "current/" + latest_current['current']
retval = s3get(self.bucket_name, current_prefix, self.data_folder,
True, False, self.region, self.aws_cred)
if not retval:
raise Error("Failed to download latest current version")
def process_message(self, msg):
""" Process a message """
if msg['target-prefix'] is None:
# If target-prefix is None, then the message failed... we add the
# input files to list of missing files
files_missing_path = os.path.join(self.data_folder, 'FILES_MISSING')
with open(files_missing_path, 'a+') as files_missing:
for f in msg['files']:
files_missing.write(f + "\n")
else:
# If we have a target-prefix, we fetch results.txt
results_path = os.path.join(self.work_folder, 'result.txt')
k = self.analysis_bucket.get_key(msg['target-prefix'] + 'result.txt')
k.get_contents_to_filename(results_path)
# Now put results to disk
results2disk(results_path, self.data_folder, False, False)
# Upload FILES_PROCESSED
files_processed_path = os.path.join(self.data_folder, 'FILES_PROCESSED')
with open(files_processed_path, 'a+') as files_processed:
for f in msg['files']:
files_processed.write(f + "\n")
print "Processed message: %s" % msg['id']
def create_checkpoint(self):
# Find date
date = datetime.utcnow().strftime("%Y%m%d%H%M%S")
# s3put data_folder to checkpoint folder
checkpoint_prefix = self.prefix + "check-points/%s/" % date
s3put(self.data_folder, self.bucket_name, checkpoint_prefix, False,
True, self.region, self.aws_cred)
# Update and upload checkpoints
self.checkpoints.append(date)
self.s3put_json('check-points.json', True, self.checkpoints)
def days_since_last_checkpoint(self):
last_checkpoint = datetime(1, 1, 1, 0, 0)
for checkpoint in self.checkpoints:
cp = datetime.strptime(checkpoint, "%Y%m%d%H%M%S")
if last_checkpoint < cp:
last_checkpoint = cp
return (datetime.utcnow() - last_checkpoint).days
def publish_results(self):
# s3put compressed to current/...
date = datetime.utcnow().strftime("%Y%m%d%H%M%S")
current_prefix = self.prefix + 'current/%s/' % date
s3put(self.data_folder, self.bucket_name, current_prefix, False, True,
self.region, self.aws_cred)
# update latest-current.json
with open(os.path.join(self.data_folder, 'versions.json'), 'r') as f:
versions = json.load(f)
self.s3put_json('latest-current.json', True, {
'current': date,
'versions': versions
})
print "Published results at %s" % current_prefix
def process_queue(self):
# connect to sqs
sqs = sqs_connect(self.region, **self.aws_cred)
queue = sqs.get_queue(self.input_queue_name)
queue.set_message_class(JSONMessage)
# messages processed since last flush
processed_msgblocks = []
last_flush = datetime.utcnow()
while True:
# get 10 new_messages from sqs
msgs = queue.get_messages(
num_messages = 10,
wait_time_seconds = 20
)
print "Fetched %i messages" % len(msgs)
# process msgs
self.process_messages(msgs)
if len(msgs) > 0:
processed_msgblocks.append(msgs)
else:
sleep(120)
# Publish if necessary
if (datetime.utcnow() - last_flush).seconds > 60 * 25:
last_flush = datetime.utcnow()
# Skip publishing if there are no new results
if len(processed_msgblocks) == 0:
continue
self.publish_results()
# delete messages
for block in processed_msgblocks:
queue.delete_message_batch(block)
processed_msgblocks = []
# check point if necessary
if self.days_since_last_checkpoint() >= 7:
self.create_checkpoint()
def aggregate(self):
# Download latest results
self.download_latest()
# Process queue while publishing results and making check points
self.process_queue()
def process_messages(self, msgs):
# Find results to download
results = []
for msg in msgs:
if msg['target-prefix'] != None:
results.append(msg['target-prefix'] + 'result.txt')
# Download results
if len(results) > 0:
target_paths = []
download_queue = Queue()
result_queue = Queue()
# Make a job queue
i = 0
for result in results:
i += 1
result_path = os.path.join(self.work_folder, "result-%i.txt" % i)
download_queue.put((result, result_path))
target_paths.append(result_path)
# Start downloaders
downloaders = []
for i in xrange(0, 2):
downloader = Downloader(download_queue, result_queue,
self.analysis_bucket_name, False, False,
self.region, self.aws_cred)
downloaders.append(downloader)
downloader.start()
download_queue.put(None)
# Wait and process results as they are downloaded
while len(target_paths) > 0:
result_path = result_queue.get(timeout = 20 * 60)
results2disk(result_path, self.data_folder, False, False)
print " - Processed results"
os.remove(result_path)
target_paths.remove(result_path)
# Check that downloaders downloaded correctly
for downloader in downloaders:
downloader.join()
if downloader.exitcode != 0:
sys.exit(1)
# Update FILES_PROCESSED and FILES_MISSING
for msg in msgs:
# If there's no target-prefix the message failed
if msg['target-prefix'] is None:
# If target-prefix is None, then the message failed... we add the
# input files to list of missing files
files_missing_path = os.path.join(self.data_folder, 'FILES_MISSING')
with open(files_missing_path, 'a+') as files_missing:
for f in msg['files']:
files_missing.write(f + "\n")
else:
# Update FILES_PROCESSED
files_processed_path = os.path.join(self.data_folder, 'FILES_PROCESSED')
with open(files_processed_path, 'a+') as files_processed:
for f in msg['files']:
files_processed.write(f + "\n")
def main():
p = ArgumentParser(
description = 'Aggregated and upload dashboard results',
formatter_class = ArgumentDefaultsHelpFormatter
)
p.add_argument(
"input_queue",
help = "Queue with results from analysis jobs"
)
p.add_argument(
"-k", "--aws-key",
help = "AWS Key"
)
p.add_argument(
"-s", "--aws-secret-key",
help = "AWS Secret Key"
)
p.add_argument(
"-w", "--work-folder",
help = "Folder to store temporary data in",
required = True
)
p.add_argument(
"-b", "--bucket",
help = "Bucket to update with data-files",
required = True
)
p.add_argument(
"-p", "--prefix",
help = "Prefix in bucket",
required = False
)
p.add_argument(
"-r", "--region",
help = "AWS region to connect to",
default = 'us-west-2'
)
cfg = p.parse_args()
aws_cred = {
'aws_access_key_id': cfg.aws_key,
'aws_secret_access_key': cfg.aws_secret_key
}
aggregator = Aggregator(cfg.input_queue, cfg.work_folder, cfg.bucket,
cfg.prefix, cfg.region, aws_cred)
aggregator.aggregate()
if __name__ == "__main__":
sys.exit(main())

Просмотреть файл

@ -2,13 +2,12 @@ try:
import simplejson as json
except ImportError:
import json
from helpers import parse_input
from datetime import datetime
import math, sys, os
# Import histogram specs and generated by makefile using specgen.py
# This is imported the zipfile that this module was loaded from
HistogramAggregator = __loader__.load_module("auxiliary").HistogramAggregator
#HistogramAggregator = __loader__.load_module("auxiliary").HistogramAggregator
from auxiliary import HistogramAggregator
# Counts number of times we've printed a log message
logMsgCount = {}
@ -63,12 +62,13 @@ simple_measures_buckets = (
############################## End of ugly hacks for simple measures
class Processor:
class DashboardProcessor:
INTERFACE = 'parsed-json'
def __init__(self, output_folder):
self.output_folder = output_folder
self.cache = {}
@parse_input
def process(self, uid, dimensions, payload):
# Unpack dimensions
reason, appName, channel, version, buildId, submissionDate = dimensions

Просмотреть файл

@ -1,77 +0,0 @@
#!/usr/bin/env python
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from boto.sqs import connect_to_region as sqs_connect
from boto.s3.connection import S3Connection
from boto.sqs.jsonmessage import JSONMessage
from time import sleep
import os, sys
def upload_folder(data_folder, target_bucket):
os.system("aws s3 cp --recursive %s %s/" % (data_folder, target_bucket))
def process_message(msg, data_folder, bucket):
source_prefix = "output/" + msg["id"] + "/result.txt"
k = bucket.get_key(source_prefix)
k.get_contents_to_filename("result.txt")
os.system("python mr2disk.py %s < result.txt" % data_folder)
def main():
p = ArgumentParser(
description = 'Aggregated and upload dashboard results',
formatter_class = ArgumentDefaultsHelpFormatter
)
p.add_argument(
"input_queue",
help = "Queue with results from analysis jobs"
)
p.add_argument(
"-k", "--aws-key",
help = "AWS Key"
)
p.add_argument(
"-s", "--aws-secret-key",
help = "AWS Secret Key"
)
p.add_argument(
"-d", "--data-folder",
help = "Folder for data-files",
required = True
)
p.add_argument(
"-t", "--target-bucket",
help = "Bucket to upload the data-files to",
required = True
)
cfg = p.parse_args()
aws_cred = {
'aws_access_key_id': cfg.aws_key,
'aws_secret_access_key': cfg.aws_secret_key
}
s3 = S3Connection(**aws_cred)
sqs = sqs_connect("us-west-2", **aws_cred)
bucket = s3.get_bucket("jonasfj-telemetry-analysis")
input_queue = sqs.get_queue(cfg.input_queue)
input_queue.set_message_class(JSONMessage)
changed = False
while True:
msgs = input_queue.get_messages(num_messages = 2)
if len(msgs) > 0:
for msg in msgs:
process_message(msg, cfg.data_folder, bucket)
input_queue.delete_message_batch(msgs)
changed = True
else:
if changed:
upload_folder(cfg.data_folder, cfg.target_bucket)
changed = False
else:
sleep(120)
if __name__ == "__main__":
sys.exit(main())

61
dashboard/gzipclone.py Normal file
Просмотреть файл

@ -0,0 +1,61 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import sys, os, gzip
from utils import mkdirp
import shutil
def gzipclone(source_folder, target_folder, decompress, compress):
shutil.rmtree(target_folder, ignore_errors = True)
if decompress:
def read(path):
return gzip.open(path, 'r')
else:
def read(path):
return open(path, 'r')
if compress:
def write(path):
return gzip.open(path, 'w')
else:
def write(path):
return open(path, 'w')
# Walk source_folder
for path, folder, files in os.walk(source_folder):
for f in files:
source_file = os.path.join(path, f)
relpath = os.path.relpath(source_file, source_folder)
target_file = os.path.join(target_folder, relpath)
mkdirp(os.path.dirname(target_file))
with read(source_file) as i:
with write(target_file) as o:
shutil.copyfileobj(i, o)
def main():
p = ArgumentParser(
description = 'Clone folder tree while decompressing and/or compressing files',
formatter_class = ArgumentDefaultsHelpFormatter
)
p.add_argument(
"-i", "--input-folder",
help = "Input folder to clone into output folder",
required = True
)
p.add_argument(
"-o", "--output-folder",
help = "Output folder to clone data into",
required = True
)
p.add_argument(
"-z", "--gzip",
help = "gzip compressed output",
action = 'store_true'
)
p.add_argument(
"-d", "--gunzip",
help = "decompress input tree",
action = 'store_true'
)
cfg = p.parse_args()
gzipclone(cfg.input_folder, cfg.output_folder, cfg.gunzip, cfg.gzip)
if __name__ == "__main__":
sys.exit(main())

Просмотреть файл

@ -4,16 +4,8 @@ except ImportError:
import json
from urllib2 import urlopen, HTTPError
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import math, sys, os, errno
from auxiliary import HistogramAggregator
def mkdirp(path):
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST or not os.path.isdir(path):
raise
import sys, os, gzip
from utils import mkdirp
def get_simple_measures_definition(measure):
return {
@ -25,14 +17,34 @@ def get_simple_measures_definition(measure):
"description": "Histogram aggregation for simple measure: %s" % measure[15:]
}
def load_json_from_file(path, compressed, fallback_value = None):
if os.path.isfile(path):
if compressed:
with gzip.open(path, 'r') as f:
return json.load(f)
else:
with open(path, 'r') as f:
return json.load(f)
return fallback_value
def save_json_to_file(path, value, compress, pretty_print):
print_indent = None
if pretty_print:
print_indent = 2
if compress:
with gzip.open(path, 'w') as f:
json.dump(value, f, indent = print_indent)
else:
with open(path, 'w') as f:
json.dump(value, f, indent = print_indent)
class ChannelVersionManager:
""" Manages data stored for a specific channel / version """
def __init__(self, root_folder, channel, version, pretty_print):
def __init__(self, root_folder, channel, version, compress, pretty_print):
self.data_folder = os.path.join(root_folder, channel, version)
mkdirp(self.data_folder)
self.print_indent = None
if pretty_print:
self.print_indent = 2
self.compress = compress
self.pretty_print = pretty_print
self.max_filter_id = None
# Load filter-tree
@ -50,16 +62,12 @@ class ChannelVersionManager:
def json_from_file(self, filename, fallback_value):
""" Load json from file, return fallback_value if no file exists """
path = os.path.join(self.data_folder, filename)
if os.path.isfile(path):
with open(path, 'r') as f:
return json.load(f)
return fallback_value
return load_json_from_file(path, self.compress, fallback_value)
def json_to_file(self, filename, value):
""" Write JSON to file """
path = os.path.join(self.data_folder, filename)
with open(path, 'w') as f:
json.dump(value, f, indent = self.print_indent)
save_json_to_file(path, value, self.compress, self.pretty_print)
def get_next_filter_id(self):
""" Get the next filter id available """
@ -172,6 +180,9 @@ class ChannelVersionManager:
os.remove(byBuildDateFileName)
self.json_to_file(byBuildDateFileName, {})
self.json_to_file(bySubmissionDateFileName, {})
oldlen = metadata.get('length', 0)
if oldlen != 0:
print >> sys.stderr, "Purging data for %s from length %i to %i" % (measure, oldlen, length)
# Filename to merge blob into
filename = measure + "-" + byDateType + ".json"
@ -191,28 +202,62 @@ class ChannelVersionManager:
continue
# Get data for a date
data = dataset.setdefault(date, {})
data = dataset.setdefault(date, [])
# Get filter id
fid = str(self.get_filter_id(filterPath))
fid = self.get_filter_id(filterPath)
new_values = dump['values']
# Get existing values, if any
values = data.get(fid, None)
values = None
for array in data:
if array[-1] == fid:
values = array
# Merge in values, if we don't have any and
if values != None:
aggregator = HistogramAggregator(**dump)
aggregator.merge(values, buildId, revision)
values = aggregator.values
for i in xrange(0, len(new_values) - 6):
values[i] += new_values[i]
# Entries [-6:-1] may have -1 indicating missing entry
for i in xrange(len(new_values) - 6, len(new_values) - 1):
# Missing entries are indicated with -1,
# we shouldn't add these up
if values[i] == -1 and new_values[i] == -1:
continue
values[i] += new_values[i]
# Last entry (count) cannot be negative
values[-2] += new_values[-1]
else:
values = dump['values']
# Store values for filter id
data[fid] = values
data.append(new_values + [fid])
# Store dataset
self.json_to_file(filename, dataset)
def results2disk(result_file, output_folder, compress, pretty_print):
cache = {}
with open(result_file, 'r') as f:
for line in f:
filePath, blob = line.split('\t')
(channel, majorVersion, measure, byDateType) = filePath.split('/')
blob = json.loads(blob)
manager = cache.get((channel, majorVersion), None)
if manager is None:
manager = ChannelVersionManager(output_folder,
channel, majorVersion,
compress, pretty_print)
cache[(channel, majorVersion)] = manager
manager.merge_in_blob(measure, byDateType, blob)
# Update versions.json and flush managers
version_file = os.path.join(output_folder, 'versions.json')
versions = load_json_from_file(version_file, compress, [])
for channelVersion, manager in cache.iteritems():
manager.flush()
version = "/".join(channelVersion)
if version not in versions:
versions.append(version)
save_json_to_file(version_file, versions, compress, pretty_print)
def main():
p = ArgumentParser(
@ -229,28 +274,20 @@ def main():
help = "Output folder to merge data into",
required = True
)
p.add_argument(
"-z", "--gzip",
help = "gzip compressed output",
action = 'store_true'
)
p.add_argument(
"-p", "--pretty-print",
help = "Pretty print generated JSON",
action = 'store_true'
)
cfg = p.parse_args()
cache = {}
with open(cfg.input_file, 'r') as f:
for line in f:
filePath, blob = line.split('\t')
(channel, majorVersion, measure, byDateType) = filePath.split('/')
blob = json.loads(blob)
manager = cache.get((channel, majorVersion), None)
if manager is None:
manager = ChannelVersionManager(cfg.output_folder,
channel, majorVersion,
cfg.pretty_print)
cache[(channel, majorVersion)] = manager
manager.merge_in_blob(measure, byDateType, blob)
for manager in cache.itervalues():
manager.flush()
cfg = p.parse_args()
results2disk(cfg.input_file, cfg.output_folder, cfg.gzip, cfg.pretty_print)
if __name__ == "__main__":
sys.exit(main())

176
dashboard/s3get.py Normal file
Просмотреть файл

@ -0,0 +1,176 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import sys, os, gzip
from cStringIO import StringIO
from utils import mkdirp
from multiprocessing import Process, Queue, cpu_count
from boto.s3 import connect_to_region as s3_connect
from boto.s3.key import Key
from traceback import print_exc
import shutil
class Downloader(Process):
def __init__(self, queue, output_queue, input_bucket, decompress, compress,
region, aws_cred):
super(Downloader, self).__init__()
self.queue = queue
self.output_queue = output_queue
self.input_bucket = input_bucket
self.decompress = decompress
self.compress = compress
self.region = region
self.aws_cred = aws_cred
def run(self):
if self.compress:
def write(path):
return gzip.open(path, 'w')
else:
def write(path):
return open(path, 'w')
s3 = s3_connect(self.region, **self.aws_cred)
bucket = s3.get_bucket(self.input_bucket, validate = False)
while True:
msg = self.queue.get()
if msg == None:
break
source_prefix, target_path = msg
retries = 0
while retries < 3:
try:
retries += 1
k = Key(bucket)
k.key = source_prefix
data = k.get_contents_as_string()
if self.decompress:
fobj = StringIO(data)
with gzip.GzipFile(mode = 'rb', fileobj = fobj) as zobj:
data = zobj.read()
fobj.close()
# Create target folder
mkdirp(os.path.dirname(target_path))
with write(target_path) as f:
f.write(data)
break
except:
print >> sys.stderr, "Failed to download %s to %s" % msg
print_exc(file = sys.stderr)
if retries >= 3:
sys.exit(1)
if self.output_queue != None:
self.output_queue.put(target_path)
s3.close()
def s3get(input_bucket, prefix, output_folder, decompress, compress, region,
aws_cred, nb_workers = cpu_count() * 4):
# Clear output folder if necessary
shutil.rmtree(output_folder, ignore_errors = True)
# Sanitize prefix, we always work on folders here
if prefix != "" and not prefix.endswith('/'):
prefix += '/'
# Create queue of work to do
queue = Queue()
# Start workers
downloaders = []
for i in xrange(0, nb_workers):
downloader = Downloader(queue, None, input_bucket, decompress, compress,
region, aws_cred)
downloaders.append(downloader)
downloader.start()
s3 = s3_connect(region, **aws_cred)
bucket = s3.get_bucket(input_bucket, validate = False)
for k in bucket.list(prefix = prefix):
source_prefix = k.key
rel_prefix = source_prefix[len(prefix):]
target_path = os.path.join(output_folder, *rel_prefix.split('/'))
queue.put((source_prefix, target_path))
# Add end of queue marker for each worker
for i in xrange(0, nb_workers):
queue.put(None)
# Join workers
for downloader in downloaders:
downloader.join()
# If one of the worker failed, we've failed
for downloader in downloaders:
if downloader.exitcode != 0:
return False
return True
def main():
p = ArgumentParser(
description = 'Clone folder tree from s3 while decompressing and/or compressing files',
formatter_class = ArgumentDefaultsHelpFormatter
)
p.add_argument(
"-i", "--input-bucket",
help = "Input bucket to clone from s3",
required = True
)
p.add_argument(
"-p", "--prefix",
help = "Prefix in input bucket",
required = False
)
p.add_argument(
"-o", "--output-folder",
help = "Folder to download files to",
required = True
)
p.add_argument(
"-k", "--aws-key",
help = "AWS Key"
)
p.add_argument(
"-s", "--aws-secret-key",
help = "AWS Secret Key"
)
p.add_argument(
"-z", "--gzip",
help = "gzip compressed output",
action = 'store_true'
)
p.add_argument(
"-d", "--gunzip",
help = "decompress input tree",
action = 'store_true'
)
p.add_argument(
"-r", "--region",
help = "AWS region to connect to",
default = 'us-west-2'
)
p.add_argument(
"-j", "--nb-workers",
help = "Number of parallel workers",
default = "4 x cpu-count"
)
cfg = p.parse_args()
nb_workers = None
try:
nb_workers = int(cfg.nb_workers)
except ValueError:
nb_workers = cpu_count() * 4
cfg = p.parse_args()
aws_cred = {
'aws_access_key_id': cfg.aws_key,
'aws_secret_access_key': cfg.aws_secret_key
}
retval = s3get(cfg.input_bucket, cfg.prefix, cfg.output_folder, cfg.gunzip,
cfg.gzip, cfg.region, aws_cred, nb_workers)
if retval:
print "Successfully downloaded all"
else:
print "Failed download some"
if __name__ == "__main__":
sys.exit(main())

170
dashboard/s3put.py Normal file
Просмотреть файл

@ -0,0 +1,170 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import sys, os, gzip
from cStringIO import StringIO
from utils import mkdirp
from multiprocessing import Process, Queue, cpu_count
from boto.s3 import connect_to_region as s3_connect
from boto.s3.key import Key
from traceback import print_exc
import shutil
class Uploader(Process):
def __init__(self, queue, target_bucket, decompress, compress, region, aws_cred):
super(Uploader, self).__init__()
self.queue = queue
self.target_bucket = target_bucket
self.decompress = decompress
self.compress = compress
self.region = region
self.aws_cred = aws_cred
def run(self):
if self.decompress:
def read(path):
return gzip.open(path, 'r')
else:
def read(path):
return open(path, 'r')
s3 = s3_connect(self.region, **self.aws_cred)
bucket = s3.get_bucket(self.target_bucket, validate = False)
while True:
msg = self.queue.get()
if msg == None:
break
source_file, target_prefix = msg
retries = 0
while retries < 3:
try:
retries += 1
with read(source_file) as f:
data = f.read()
headers = {
'Content-Type': 'application/json'
}
if self.compress:
fobj = StringIO()
with gzip.GzipFile(mode = 'wb', fileobj = fobj) as zobj:
zobj.write(data)
data = fobj.getvalue()
fobj.close()
headers['Content-Encoding'] = 'gzip'
# Put to S3
k = Key(bucket)
k.key = target_prefix
k.set_contents_from_string(data, headers = headers)
break
except:
print >> sys.stderr, "Failed to upload %s to %s" % msg
print_exc(file = sys.stderr)
if retries >= 3:
sys.exit(1)
s3.close()
def s3put(input_folder, target_bucket, prefix, decompress, compress, region,
aws_cred, nb_workers = cpu_count() * 4):
if prefix != "" and not prefix.endswith('/'):
prefix += '/'
# Create queue of work to do
queue = Queue()
# Start workers
uploaders = []
for i in xrange(0, nb_workers):
uploader = Uploader(queue, target_bucket, decompress, compress, region, aws_cred)
uploaders.append(uploader)
uploader.start()
# Walk input_folder
for path, folder, files in os.walk(input_folder):
for f in files:
source_file = os.path.join(path, f)
relpath = os.path.relpath(source_file, input_folder)
queue.put((source_file, prefix + relpath))
# Add end of queue marker for each worker
for i in xrange(0, nb_workers):
queue.put(None)
# Join workers
for uploader in uploaders:
uploader.join()
# If one of the uploaders failed, we've failed
for uploader in uploaders:
if uploader.exitcode != 0:
return False
return True
def main():
p = ArgumentParser(
description = 'Clone folder tree to s3 while decompressing and/or compressing files',
formatter_class = ArgumentDefaultsHelpFormatter
)
p.add_argument(
"-i", "--input-folder",
help = "Input folder to clone to s3",
required = True
)
p.add_argument(
"-o", "--target-bucket",
help = "Bucket to upload files to",
required = True
)
p.add_argument(
"-p", "--prefix",
help = "Prefix in target bucket",
required = False
)
p.add_argument(
"-k", "--aws-key",
help = "AWS Key"
)
p.add_argument(
"-s", "--aws-secret-key",
help = "AWS Secret Key"
)
p.add_argument(
"-z", "--gzip",
help = "gzip compressed output",
action = 'store_true'
)
p.add_argument(
"-d", "--gunzip",
help = "decompress input tree",
action = 'store_true'
)
p.add_argument(
"-r", "--region",
help = "AWS region to connect to",
default = 'us-west-2'
)
p.add_argument(
"-j", "--nb-workers",
help = "Number of parallel workers",
default = "4 x cpu-count"
)
cfg = p.parse_args()
nb_workers = None
try:
nb_workers = int(cfg.nb_workers)
except ValueError:
nb_workers = cpu_count() * 4
cfg = p.parse_args()
aws_cred = {
'aws_access_key_id': cfg.aws_key,
'aws_secret_access_key': cfg.aws_secret_key
}
retval = s3put(cfg.input_folder, cfg.target_bucket, cfg.prefix, cfg.gunzip,
cfg.gzip, cfg.region, aws_cred, nb_workers)
if retval:
print "Successfully uploaded all"
else:
print "Failed to upload some"
if __name__ == "__main__":
sys.exit(main())

724
dashboard/telemetry.js Normal file
Просмотреть файл

@ -0,0 +1,724 @@
(function(exports){
"use strict";
/** Namespace for this module */
var Telemetry = {};
// Data folder from which data will be loaded, initialized by Telemetry.init()
var _data_folder = null;
// List of channel/version, loaded by Telemetry.init()
var _versions = null;
// Dictionary of histogram specifications, loaded by Telemetry.init()
var _specifications = null;
/** Auxiliary function to GET files from _data_folder */
function _get(path, cb) {
// Check that we've been initialized
if(_data_folder === null) {
throw new Error("Telemetry._get: Telemetry module haven't been " +
"initialized, please call Telemetry.init()");
}
// Create path from array, if that's what we're giving
if (path instanceof Array) {
path = path.join("/");
}
// Create HTTP request
var xhr = new XMLHttpRequest();
xhr.onload = function (e) {
if (e.target.status == 200) {
cb.apply(this, [JSON.parse(this.responseText)]);
} else {
console.log("Telemetry._get: Failed loading " + path + " with " +
e.target.status);
}
};
xhr.open("get", _data_folder + "/" + path, true);
xhr.send();
}
/**
* Initialize telemetry module by fetching meta-data from data_folder
* cb() will be invoked when Telemetry module is ready for use.
*/
Telemetry.init = function Telemetry_load(data_folder, cb) {
if (_data_folder !== null) {
throw new Error("Telemetry.init: Telemetry module is initialized!");
}
_data_folder = data_folder;
// Number of files to load
var load_count = 2;
// Count down files loaded
function count_down(){
load_count--;
if (load_count === 0) {
cb();
}
}
// Get list of channels/version in data folder from versions.json
_get("versions.json", function(data) {
_versions = data;
count_down();
});
// Get list of histogram specifications from histogram_descriptions.json
_get("Histograms.json", function(data) {
_specifications = data;
count_down();
});
};
/** Get list of channel/version */
Telemetry.versions = function Telemetry_versions() {
if (_data_folder === null) {
throw new Error("Telemetry.versions: Telemetry module isn't initialized!");
}
return _versions;
};
/**
* Request measures available for channel/version given. Once fetched the
* callback with invoked as cb(measures, measureInfo) where measures a list of
* measure ids and measureInfo is mapping from measure id to kind and
* description, i.e. a JSON object on the following form:
* {
* "A_TELEMETRY_MEASURE_ID": {
* kind: "linear|exponential|flag|enumerated|boolean",
* description: "A human readable description"
* },
* ...
* }
*
* Note, channel/version must be a string from Telemetry.versions()
*/
Telemetry.measures = function Telemetry_measures(channel_version, cb) {
_get([channel_version, "histograms.json"], function(data) {
var measures = [];
var measureInfo = {};
for(var key in data) {
// Add measure id
measures.push(key);
// Find specification
var spec = _specifications[key];
// Hack to provide specification of simple measures
if (spec === undefined) {
spec = {
kind: "exponential",
description: "Histogram of simple measure"
};
}
// Add measure info
measureInfo[key] = {
kind: spec.kind,
description: spec.description
};
}
// Sort measures alphabetically
measures.sort();
// Return measures by callback
cb(measures, measureInfo);
});
};
/**
* Request HistogramEvolution instance over builds for a given channel/version
* and measure, once fetched cb(histogramEvolution) will be invoked with the
* HistogramEvolution instance. The dates in the HistogramEvolution instance
* fetched will be build dates, not telemetry ping submission dates.
* Note, measure must be a valid measure identifier from Telemetry.measures()
*/
Telemetry.loadEvolutionOverBuilds =
function Telemetry_loadEvolutionOverBuilds(channel_version, measure, cb) {
// Unpack measure, if a dictionary from Telemetry.measures was provided
// instead of just a measure id.
if (measure instanceof Object && measure.measure !== undefined) {
measure = measure.measure;
}
// Number of files to load, and what to do when done
var load_count = 2;
var data, filter_tree;
function count_down() {
load_count--;
if (load_count === 0) {
var spec = _specifications[measure];
if (spec === undefined) {
spec = {
kind: "exponential",
description: "Histogram of simple measure"
};
}
cb(
new Telemetry.HistogramEvolution(
[measure],
data,
filter_tree,
spec
)
);
}
}
// Load data for measure
_get([channel_version, measure + ".json"], function(json) {
data = json;
count_down();
});
// Load filter data
_get([channel_version, "filter.json"], function(json) {
filter_tree = json;
count_down();
});
};
/** Place holder for when bug 916217 is implemented */
Telemetry.loadEvolutionOverTime =
function Telemetry_loadEvolutionOverTime(channel_version, measure, cb) {
throw new Error(
"Telemetry.loadEvolutionOverTime() is not implemented yet, " +
"server-side data aggregation is still missing! (See bug 916217)"
);
};
/** Auxiliary function to find all filter_ids in a filter_tree */
function _listFilterIds(filter_tree){
var ids = [];
function visitFilterNode(filter_node){
ids.push(filter_node._id);
for (var key in filter_node) {
if (key != "name" && key != "_id") {
visitFilterNode(filter_node[key]);
}
}
}
visitFilterNode(filter_tree);
return ids;
}
// Offset relative to length for special elements in arrays of raw data
var DataOffsets = {
SUM: -7, // The following keys are documented in StorageFormat.md
LOG_SUM: -6, // See the docs/ folder of the telemetry-server
LOG_SUM_SQ: -5, // Repository. They are essentially part of the
SUM_SQ_LO: -4, // validated telemetry histogram format
SUM_SQ_HI: -3,
SUBMISSIONS: -2, // Added in deashboard.py
FILTER_ID: -1 // Added in mr2disk.py
};
/** Representation of histogram under possible filter application */
Telemetry.Histogram = (function(){
/**
* Auxiliary function to aggregate values of index from histogram dataset
*/
function _aggregate(index, histogram) {
if (histogram._aggregated === undefined) {
histogram._aggregated = [];
}
var sum = histogram._aggregated[index];
if (sum === undefined) {
// Cache the list of filter ids
if (histogram._filterIds === undefined) {
histogram._filterIds = _listFilterIds(histogram._filter_tree);
}
// Aggregate index as sum over histogram
sum = 0;
var n = histogram._dataset.length;
for(var i = 0; i < n; i++) {
var data_array = histogram._dataset[i];
// Check if filter_id is filtered
var filter_id_offset = data_array.length + DataOffsets.FILTER_ID;
if (histogram._filterIds.indexOf(data_array[filter_id_offset]) != -1) {
sum += data_array[index >= 0 ? index : data_array.length + index];
}
}
histogram._aggregated[index] = sum;
}
return sum;
}
/** Auxiliary function for estimating the end of the last bucket */
function _estimateLastBucketEnd(histogram) {
// As there is no next bucket for the last bucket, we sometimes need to
// estimate one. First we estimate the sum of all data-points in buckets
// below the last bucket
var sum_before_last = 0;
var n = histogram._buckets.length;
for (var i = 0; i < n - 1; i++) {
var bucket_center = (histogram._buckets[i+1] - histogram._buckets[i]) / 2 +
histogram._buckets[i];
sum_before_last += _aggregate(i, histogram) * bucket_center;
}
// We estimate the sum of data-points in the last bucket by subtracting the
// estimate of sum of data-points before the last bucket...
var sum_last = _aggregate(DataOffsets.SUM, histogram) - sum_before_last;
// We estimate the mean of the last bucket as follows
var last_bucket_mean = sum_last / _aggregate(n - 1, histogram);
// We find the start of the last bucket
var last_bucket_start = histogram._buckets[n - 1];
// Now estimate the last bucket end
return last_bucket_start + (last_bucket_mean - last_bucket_start) * 2;
}
/**
* Create a new histogram, where
* - filter_path is a list of [name, date-range, filter1, filter2...]
* - buckets is a list of bucket start values,
* - dataset is a mapping from filter ids to arrays of raw data
* - filter_tree is a node in filter tree structure, and
* - spec is the histogram specification.
*/
function Histogram(filter_path, buckets, dataset, filter_tree, spec) {
this._filter_path = filter_path;
this._buckets = buckets;
this._dataset = dataset;
this._filter_tree = filter_tree;
this._spec = spec;
}
/** Get new histogram representation of this histogram filter for option */
Histogram.prototype.filter = function Histogram_filter(option) {
if (!(this._filter_tree[option] instanceof Object)) {
throw new Error("filter option: \"" + option +"\" is not available");
}
return new Histogram(
this._filter_path.concat(option),
this._buckets,
this._dataset,
this._filter_tree[option],
this._spec
);
};
/** Name of filter available, null if none */
Histogram.prototype.filterName = function Histogram_filterName() {
return this._filter_tree.name || null;
};
/** List of options available for current filter */
Histogram.prototype.filterOptions = function Histogram_filterOptions() {
var options = [];
for (var key in this._filter_tree) {
if (key != "name" && key != "_id") {
options.push(key);
}
}
return options.sort();
};
/** Get the histogram kind */
Histogram.prototype.kind = function Histogram_kind() {
return this._spec.kind;
};
/** Get a description of the measure in this histogram */
Histogram.prototype.description = function Histogram_description() {
return this._spec.description;
};
/** Get number of data points in this histogram */
Histogram.prototype.count = function Histogram_count() {
var count = 0;
var n = this._buckets.length;
for(var i = 0; i < n; i++) {
count += _aggregate(i, this);
}
return count;
};
/** Number of telemetry pings aggregated in this histogram */
Histogram.prototype.submissions = function Histogram_submissions() {
return _aggregate(DataOffsets.SUBMISSIONS, this);
};
/** Get the mean of all data points in this histogram, null if N/A */
Histogram.prototype.mean = function Histogram_mean() {
// if (this.kind() != "linear" && this.kind() != "exponential") {
// throw new Error("Histogram.geometricMean() is only available for " +
// "linear and exponential histograms");
// }
var sum = _aggregate(DataOffsets.SUM, this);
return sum / this.count();
};
/** Get the geometric mean of all data points in this histogram, null if N/A */
Histogram.prototype.geometricMean = function Histogram_geometricMean() {
if (this.kind() != "exponential") {
throw new Error("Histogram.geometricMean() is only available for " +
"exponential histograms");
}
var log_sum = _aggregate(DataOffsets.LOG_SUM, this);
return log_sum / this.count();
};
/**
* Get the standard deviation over all data points in this histogram,
* null if not applicable as this is only available for some histograms.
*/
Histogram.prototype.standardDeviation = function Histogram_standardDeviation() {
if (this.kind() != "linear") {
throw new Error("Histogram.standardDeviation() is only available for " +
"linear histograms");
}
var sum = new Big(_aggregate(DataOffsets.SUM, this));
var count = new Big(this.count());
var sum_sq_hi = new Big(_aggregate(DataOffsets.SUM_SQ_HI, this));
var sum_sq_lo = new Big(_aggregate(DataOffsets.SUM_SQ_LO, this));
var sum_sq = sum_sq_lo.plus(sum_sq_hi.times(new Big(2).pow(32)));
// std. dev. = sqrt(count * sum_squares - sum * sum) / count
// http://en.wikipedia.org/wiki/Standard_deviation#Rapid_calculation_methods
return count.times(sum_sq).minus(sum.pow(2)).divide(count).toFixed(3);
};
/**
* Get the geometric standard deviation over all data points in this histogram,
* null if not applicable as this is only available for some histograms.
*/
Histogram.prototype.geometricStandardDeviation =
function Histogram_geometricStandardDeviation() {
if (this.kind() != 'exponential') {
throw new Error(
"Histogram.geometricStandardDeviation() is only " +
"available for exponential histograms"
);
}
var count = this.count();
var log_sum = _aggregate(DataOffsets.LOG_SUM, this);
var log_sum_sq = _aggregate(DataOffsets.LOG_SUM_SQ, this);
// Deduced from http://en.wikipedia.org/wiki/Geometric_standard_deviation
// using wxmaxima... who knows maybe it's correct...
return Math.exp(
Math.sqrt(
(
count * Math.pow(Math.log(log_sum / count), 2) +
log_sum_sq -
2 * log_sum * Math.log(log_sum / count)
) / count
)
);
};
/** Estimate value of a percentile */
Histogram.prototype.percentile = function Histogram_percentile(percent) {
// if (this.kind() != "linear" && this.kind() != "exponential") {
// throw new Error("Histogram.percentile() is only available for linear " +
// "and exponential histograms");
// }
var frac = percent / 100;
var count = this.count();
// Count until we have the bucket containing the percentile
var to_count = count * frac;
var i, n = this._buckets.length;
for (i = 0; i < n; i++) {
var nb_points = _aggregate(i, this);
if (to_count - nb_points <= 0) {
break;
}
to_count -= nb_points;
}
// Bucket start and end
var start = this._buckets[i];
var end = this._buckets[i+1];
if(i >= n - 1) {
// If we're at the end bucket, then there's no next bucket, hence, no upper
// bound, so we estimate one.
end = _estimateLastBucketEnd(this);
}
// Fraction indicating where in bucket i the percentile is located
var bucket_fraction = to_count / (_aggregate(i, this) + 1);
if (this.kind() == "exponential") {
// Interpolate median assuming an exponential distribution
return start + Math.exp(Math.log(end - start) * bucket_fraction);
}
// Interpolate median assuming a uniform distribution between start and end.
return start + (end - start) * bucket_fraction;
};
/** Estimate the median, returns null, if not applicable */
Histogram.prototype.median = function Histogram_median() {
return this.percentile(50);
};
/**
* Invoke cb(count, start, end, index) for every bucket in this histogram, the
* cb is invoked for each bucket ordered from low to high.
* Note, if context is provided it will be given as this parameter to cb().
*/
Histogram.prototype.each = function Histogram_each(cb, context) {
// Set context if none is provided
if (context === undefined) {
context = this;
}
// For each bucket
var n = this._buckets.length;
for(var i = 0; i < n; i++) {
// Find count, start and end of bucket
var count = _aggregate(i, this),
start = this._buckets[i],
end = this._buckets[i+1];
// If we're at the last bucket, then there's no next upper bound so we
// estimate one
if (i >= n - 1) {
end = _estimateLastBucketEnd(this);
}
// Invoke callback as promised
cb.call(context, count, start, end, i);
}
};
/**
* Returns a bucket ordered array of results from invocation of
* cb(count, start, end, index) for each bucket, ordered low to high.
* Note, if context is provided it will be given as this parameter to cb().
*/
Histogram.prototype.map = function Histogram_map(cb, context) {
// Set context if none is provided
if (context === undefined) {
context = this;
}
// Array of return values
var results = [];
// For each, invoke cb and push the result
this.each(function(count, start, end, index) {
results.push(cb.call(context, count, start, end, index));
});
// Return values from cb
return results;
};
return Histogram;
})(); /* Histogram */
/** Representation of histogram changes over time */
Telemetry.HistogramEvolution = (function(){
/** Auxiliary function to parse a date string from JSON data format */
function _parseDateString(d) {
return new Date(d.substr(0,4) + "/" + d.substr(4,2) + "/"+ d.substr(6,2));
}
/**
* Create a histogram evolution, where
* - filter_path is a list of [name, date-range, filter1, filter2...]
* - data is the JSON data loaded from file,
* - filter_tree is the filter_tree root, and
* - spec is the histogram specification.
*/
function HistogramEvolution(filter_path, data, filter_tree, spec) {
this._filter_path = filter_path;
this._data = data;
this._filter_tree = filter_tree;
this._spec = spec;
}
/** Get the histogram kind */
HistogramEvolution.prototype.kind = function HistogramEvolution_kind() {
return this._spec.kind;
};
/** Get a description of the measure in this histogram */
HistogramEvolution.prototype.description =
function HistogramEvolution_description() {
return this._spec.description;
};
/** Get new HistogramEvolution representation filtered with option */
HistogramEvolution.prototype.filter = function histogramEvolution_filter(opt) {
if (!(this._filter_tree[opt] instanceof Object)) {
throw new Error("filter option: \"" + opt +"\" is not available");
}
return new HistogramEvolution(
this._filter_path.concat(opt),
this._data,
this._filter_tree[opt],
this._spec
);
};
/** Name of filter available, null if none */
HistogramEvolution.prototype.filterName =
function HistogramEvolution_filterName() {
return this._filter_tree.name || null;
};
/** List of options available for current filter */
HistogramEvolution.prototype.filterOptions =
function HistogramEvolution_filterOptions() {
var options = [];
for (var key in this._filter_tree) {
if (key != "name" && key != "_id") {
options.push(key);
}
}
return options.sort();
};
/**
* Get merged histogram for the interval [start; end], ie. start and end dates
* are inclusive. Omitting start and/or end will give you the merged histogram
* for the open-ended interval.
*/
HistogramEvolution.prototype.range =
function HistogramEvolution_range(start, end) {
// Construct a dataset by merging all datasets/histograms in the range
var merged_dataset = [];
// List of filter_ids we care about, instead of just merging all filters
var filter_ids = _listFilterIds(this._filter_tree);
// For each date we have to merge the filter_ids into merged_dataset
for (var datekey in this._data.values) {
// Check that date is between start and end (if start and end is defined)
var date = _parseDateString(datekey);
if((!start || start <= date) && (!end || date <= end)) {
// Find dataset of this datekey, merge filter_ids for this dataset into
// merged_dataset.
var dataset = this._data.values[datekey];
// Copy all data arrays over... we'll filter and aggregate later
merged_dataset = merged_dataset.concat(dataset);
}
}
// Create merged histogram
return new Telemetry.Histogram(
this._filter_path,
this._data.buckets,
merged_dataset,
this._filter_tree,
this._spec
);
};
/** Get the list of dates in the evolution sorted by date */
HistogramEvolution.prototype.dates = function HistogramEvolution_dates() {
var dates = [];
for(var date in this._data.values) {
dates.push(_parseDateString(date));
}
return dates.sort();
};
/**
* Invoke cb(date, histogram, index) with each date, histogram pair, ordered by
* date. Note, if provided cb() will be invoked with ctx as this argument.
*/
HistogramEvolution.prototype.each = function HistogramEvolution_each(cb, ctx) {
// Set this as context if none is provided
if (ctx === undefined) {
ctx = this;
}
// Find and sort all date strings
var dates = [];
for(var date in this._data.values) {
dates.push(date);
}
dates.sort();
// Find filter ids
var filterIds = _listFilterIds(this._filter_tree);
// Auxiliary function to filter data arrays by filter_id
function filterByFilterId(data_array) {
var filterId = data_array[data_array.length + DataOffsets.FILTER_ID];
return filterIds.indexOf(filterId) != -1;
}
// Pair index, this is not equal to i as we may have filtered something out
var index = 0;
// Now invoke cb with each histogram
var n = dates.length;
for(var i = 0; i < n; i++) {
// Get dataset for date
var dataset = this._data.values[dates[i]];
// Filter for data_arrays with relevant filterId
dataset = dataset.filter(filterByFilterId);
// Skip this date if there was not data_array after filtering as applied
if (dataset.length === 0) {
continue;
}
// Invoke callback with date and histogram
cb.call(
ctx,
_parseDateString(dates[i]),
new Telemetry.Histogram(
this._filter_path,
this._data.buckets,
dataset,
this._filter_tree,
this._spec
),
index++
);
}
};
/**
* Returns a date ordered array of results from invocation of
* cb(date, histogram, index) for each date, histogram pair.
* Note, if provided cb() will be invoked with ctx as this argument.
*/
HistogramEvolution.prototype.map = function HistogramEvolution_map(cb, ctx) {
// Set this as context if none is provided
if (ctx === undefined) {
ctx = this;
}
// Return value array
var results = [];
// For each date, histogram pair invoke cb() and add result to results
this.each(function(date, histogram, index) {
results.push(cb.call(ctx, date, histogram, index));
});
// Return array of computed values
return results;
};
return HistogramEvolution;
})(); /* HistogramEvolution */
exports.Telemetry = Telemetry;
return exports.Telemetry;
})(this);

9
dashboard/utils.py Normal file
Просмотреть файл

@ -0,0 +1,9 @@
import os, errno
def mkdirp(path):
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST or not os.path.isdir(path):
raise

Просмотреть файл

@ -39,8 +39,24 @@
<script src="js/jquery.telemetry.js"></script>
<script src="js/dashboard.js"></script>
<script>
Telemetry.init("https://s3.amazonaws.com/telemetry-dashboard-data/v1", function(){
Dashboard.init();
Telemetry.init(function(){
Dashboard.init();
/*$("#plot-area").html("<pre id='log' style='padding: 10px'></pre>");
function print(msg) { $("#log").append(msg + "\n"); }
print("Loaded");
var count = 0;
Telemetry.measures('nightly/27', function(measures) {
for(var measure in measures) {
(function(measure){
Telemetry.loadEvolutionOverBuilds('nightly/27', measure,
function(hgram) {
print("Loaded: " + measure);
print(hgram.());
});
})(measure);
}
});*/
});
</script>
<html>

Просмотреть файл

@ -49,9 +49,8 @@ $.widget("telemetry.histogramfilter", {
defaultVersion: null,
/**
* Default measure, or function that takes a list of measure ids and a
* measureInfo object as created by Telemetry.measures() and returns
* a measure id from that list.
* Default measure, or function that takes a JSON object of measure ids as
* created by Telemetry.measures() and returns a measure id.
*/
defaultMeasure: null,
@ -319,7 +318,7 @@ $.widget("telemetry.histogramfilter", {
this._triggerChange();
// Load measures for selected version
Telemetry.measures(version, $.proxy(function(measures, measureInfo) {
Telemetry.measures(version, $.proxy(function(measures) {
// Abort if another version have been selected while we loaded
if (this._versionSelector.val() != version) {
return;
@ -328,22 +327,23 @@ $.widget("telemetry.histogramfilter", {
// If there is a list of allowed histogram kinds, we limit the list of
// measures under consideration to these measures
if (this.options.allowedHistogramKinds !== null) {
measures = measures.filter(function(m) {
// Lookup measure kind
var kind = measureInfo[m].kind;
// Check if kind is allowed
return this.options.allowedHistogramKinds.indexOf(kind) != -1;
}, this);
var allowedMeasures = {};
for(var m in measures) {
var kind = measures[m].kind;
if (this.options.allowedHistogramKinds.indexOf(kind) != -1) {
allowedMeasures[m] = measures[m];
}
}
}
// Choose default measure if desired isn't available
if(measures.indexOf(measure) == -1) {
measure = this._defaultMeasure(measures, measureInfo);
if(measures[measure] === undefined) {
measure = this._defaultMeasure(measures);
}
// Populate measures selector while ignoring changes in event handlers
this._ignoreChanges = true;
this._populateSelect(measures, this._measureSelector);
this._populateSelect(Object.keys(measures).sort(), this._measureSelector);
this._ignoreChanges = false;
// Restore things at measure level
@ -472,19 +472,19 @@ $.widget("telemetry.histogramfilter", {
* a function.
*/
_defaultMeasure:
function histogramfilter__defaultMeasure(measures, measureInfo) {
function histogramfilter__defaultMeasure(measures) {
// Get default measure
var measure = this.options.defaultMeasure;
// If function, use it to choose a measure
if (measure instanceof Function) {
measure = measure.call(this.element, measures, measureInfo);
measure = measure.call(this.element, measures);
}
// Validate selected measure
if (measures.indexOf(measure) == -1) {
if (measures[measure] === undefined) {
// Now resort to choose the first measure available
measure = measures[0];
measure = Object.keys(measures).sort()[0]
}
return measure;

Просмотреть файл

@ -5,19 +5,20 @@
/** Namespace for this module */
var Telemetry = {};
// Data folder from which data will be loaded, initialized by Telemetry.init()
var _data_folder = null;
// Data folder from which data will be loaded, another level indicating current
// folder will be initialized by Telemetry.init()
var _data_folder = 'https://s3-us-west-2.amazonaws.com/telemetry-dashboard/v3';
// Boolean tracks if we've initialized
var _initialized = false;
// List of channel/version, loaded by Telemetry.init()
var _versions = null;
// Dictionary of histogram specifications, loaded by Telemetry.init()
var _specifications = null;
/** Auxiliary function to GET files from _data_folder */
function _get(path, cb) {
// Check that we've been initialized
if(_data_folder === null) {
if(!_initialized && path != "latest-current.json") {
throw new Error("Telemetry._get: Telemetry module haven't been " +
"initialized, please call Telemetry.init()");
}
@ -42,36 +43,16 @@ function _get(path, cb) {
}
/**
* Initialize telemetry module by fetching meta-data from data_folder
* Initialize telemetry module by fetching meta-data from server
* cb() will be invoked when Telemetry module is ready for use.
*/
Telemetry.init = function Telemetry_load(data_folder, cb) {
if (_data_folder !== null) {
throw new Error("Telemetry.init: Telemetry module is initialized!");
}
_data_folder = data_folder;
// Number of files to load
var load_count = 2;
// Count down files loaded
function count_down(){
load_count--;
if (load_count === 0) {
cb();
}
}
// Get list of channels/version in data folder from versions.json
_get("versions.json", function(data) {
_versions = data;
count_down();
});
// Get list of histogram specifications from histogram_descriptions.json
_get("Histograms.json", function(data) {
_specifications = data;
count_down();
Telemetry.init = function Telemetry_load(cb) {
// Get list of channels/version as most recent folder from latest-current.json
_get("latest-current.json", function(data) {
_data_folder += '/current/' + data.current
_versions = data.versions.sort();
_initialized = true;
cb();
});
};
@ -85,9 +66,8 @@ Telemetry.versions = function Telemetry_versions() {
/**
* Request measures available for channel/version given. Once fetched the
* callback with invoked as cb(measures, measureInfo) where measures a list of
* measure ids and measureInfo is mapping from measure id to kind and
* description, i.e. a JSON object on the following form:
* callback with invoked as cb(measures) where measures a dictionary on the
* following form:
* {
* "A_TELEMETRY_MEASURE_ID": {
* kind: "linear|exponential|flag|enumerated|boolean",
@ -100,37 +80,19 @@ Telemetry.versions = function Telemetry_versions() {
*/
Telemetry.measures = function Telemetry_measures(channel_version, cb) {
_get([channel_version, "histograms.json"], function(data) {
var measures = [];
var measureInfo = {};
for(var key in data) {
var measures = {};
// For each measure fetched
for(var measure in data) {
// Add measure id
measures.push(key);
// Find specification
var spec = _specifications[key];
// Hack to provide specification of simple measures
if (spec === undefined) {
spec = {
kind: "exponential",
description: "Histogram of simple measure"
};
measures[measure] = {
kind: data[measure].kind,
description: data[measure].description
}
// Add measure info
measureInfo[key] = {
kind: spec.kind,
description: spec.description
};
}
// Sort measures alphabetically
measures.sort();
// Return measures by callback
cb(measures, measureInfo);
cb(measures);
});
};
@ -143,54 +105,81 @@ Telemetry.measures = function Telemetry_measures(channel_version, cb) {
*/
Telemetry.loadEvolutionOverBuilds =
function Telemetry_loadEvolutionOverBuilds(channel_version, measure, cb) {
// Unpack measure, if a dictionary from Telemetry.measures was provided
// instead of just a measure id.
if (measure instanceof Object && measure.measure !== undefined) {
measure = measure.measure;
}
// Number of files to load, and what to do when done
var load_count = 2;
var data, filter_tree;
var load_count = 3;
var data, filter_tree, specifications;
function count_down() {
load_count--;
if (load_count === 0) {
var spec = _specifications[measure];
if (spec === undefined) {
spec = {
kind: "exponential",
description: "Histogram of simple measure"
};
}
cb(
new Telemetry.HistogramEvolution(
measure,
[measure],
data,
filter_tree,
spec
specifications[measure]
)
);
}
}
// Load data for measure
_get([channel_version, measure + ".json"], function(json) {
_get([channel_version, measure + "-by-build-date.json"], function(json) {
data = json;
count_down();
});
// Load filter data
_get([channel_version, "filter.json"], function(json) {
_get([channel_version, "filter-tree.json"], function(json) {
filter_tree = json;
count_down();
});
// Load histogram specifications
_get([channel_version, "histograms.json"], function(json) {
specifications = json;
count_down();
});
};
/** Place holder for when bug 916217 is implemented */
Telemetry.loadEvolutionOverTime =
/**
* Request HistogramEvolution instance over time for a given channel/version
* and measure, once fetched cb(histogramEvolution) will be invoked with the
* HistogramEvolution instance. The dates in the HistogramEvolution instance
* fetched will be telemetry ping submission dates.
* Note, measure must be a valid measure identifier from Telemetry.measures()
*/
Telemetry.loadEvolutionOverTime =
function Telemetry_loadEvolutionOverTime(channel_version, measure, cb) {
throw new Error(
"Telemetry.loadEvolutionOverTime() is not implemented yet, " +
"server-side data aggregation is still missing! (See bug 916217)"
);
// Number of files to load, and what to do when done
var load_count = 3;
var data, filter_tree, specifications;
function count_down() {
load_count--;
if (load_count === 0) {
cb(
new Telemetry.HistogramEvolution(
measure,
[measure],
data,
filter_tree,
specifications[measure]
)
);
}
}
// Load data for measure
_get([channel_version, measure + "-by-submission-date.json"], function(json) {
data = json;
count_down();
});
// Load filter data
_get([channel_version, "filter-tree.json"], function(json) {
filter_tree = json;
count_down();
});
// Load histogram specifications
_get([channel_version, "histograms.json"], function(json) {
specifications = json;
count_down();
});
};
/** Auxiliary function to find all filter_ids in a filter_tree */
@ -216,7 +205,7 @@ var DataOffsets = {
SUM_SQ_LO: -4, // validated telemetry histogram format
SUM_SQ_HI: -3,
SUBMISSIONS: -2, // Added in deashboard.py
FILTER_ID: -1 // Added in mr2disk.py
FILTER_ID: -1 // Added in results2disk.py
};
/** Representation of histogram under possible filter application */
@ -277,13 +266,15 @@ function _estimateLastBucketEnd(histogram) {
/**
* Create a new histogram, where
* - measure is the name of the histogram,
* - filter_path is a list of [name, date-range, filter1, filter2...]
* - buckets is a list of bucket start values,
* - dataset is a mapping from filter ids to arrays of raw data
* - filter_tree is a node in filter tree structure, and
* - spec is the histogram specification.
*/
function Histogram(filter_path, buckets, dataset, filter_tree, spec) {
function Histogram(measure, filter_path, buckets, dataset, filter_tree, spec) {
this._measure = measure;
this._filter_path = filter_path;
this._buckets = buckets;
this._dataset = dataset;
@ -297,6 +288,7 @@ Histogram.prototype.filter = function Histogram_filter(option) {
throw new Error("filter option: \"" + option +"\" is not available");
}
return new Histogram(
this._measure,
this._filter_path.concat(option),
this._buckets,
this._dataset,
@ -321,6 +313,11 @@ Histogram.prototype.filterOptions = function Histogram_filterOptions() {
return options.sort();
};
/** Get the histogram measure */
Histogram.prototype.measure = function Histogram_measure() {
return this._measure;
};
/** Get the histogram kind */
Histogram.prototype.kind = function Histogram_kind() {
return this._spec.kind;
@ -528,20 +525,78 @@ function _parseDateString(d) {
return new Date(d.substr(0,4) + "/" + d.substr(4,2) + "/"+ d.substr(6,2));
}
/**
* Auxiliary function to compute all bucket ends from a specification
* This returns a list [b0, b1, ..., bn] where b0 is the separator value between
* entries in bucket index 0 and bucket index 1. Such that all values less than
* b0 was counted in bucket 0, values greater than counted in bucket 1.
*/
function _computeBuckets(spec){
// Find bounds from specification
var low = 1, high, nbuckets;
if(spec.kind == 'boolean' || spec.kind == 'flag') {
high = 2;
nbuckets = 3;
} else if (spec.kind == 'enumerated') {
high = eval(spec.n_values);
nbuckets = eval(spec.n_values) + 1;
} else if (spec.kind == 'linear' || spec.kind == 'exponential') {
low = eval(spec.low) || 1;
high = eval(spec.high);
nbuckets = eval(spec.n_buckets)
}
// Compute buckets
var buckets = null;
if(spec.kind == 'exponential') {
// Exponential buckets is a special case
var log_max = Math.log(high);
buckets = [0, low];
var current = low;
for(var i = 2; i < nbuckets; i++) {
var log_current = Math.log(current);
var log_ratio = (log_max - log_current) / (nbuckets - i);
var log_next = log_current + log_ratio;
var next_value = Math.floor(Math.exp(log_next) + 0.5);
if (next_value > current) {
current = next_value;
} else {
current = current + 1;
}
buckets.push(current);
}
} else {
// Linear buckets are computed as follows
buckets = [0];
for(var i = 1; i < nbuckets; i++) {
var range = (low * (nbuckets - 1 - i) + high * (i - 1));
buckets.push(Math.floor(range / (nbuckets - 2) + 0.5));
}
}
return buckets;
}
/**
* Create a histogram evolution, where
* - measure is the name of this histogram,
* - filter_path is a list of [name, date-range, filter1, filter2...]
* - data is the JSON data loaded from file,
* - filter_tree is the filter_tree root, and
* - spec is the histogram specification.
*/
function HistogramEvolution(filter_path, data, filter_tree, spec) {
function HistogramEvolution(measure, filter_path, data, filter_tree, spec) {
this._measure = measure
this._filter_path = filter_path;
this._data = data;
this._filter_tree = filter_tree;
this._spec = spec;
this._buckets = _computeBuckets(spec);
}
/** Get the histogram measure */
HistogramEvolution.prototype.measure = function HistogramEvolution_measure() {
return this._measure;
};
/** Get the histogram kind */
HistogramEvolution.prototype.kind = function HistogramEvolution_kind() {
return this._spec.kind;
@ -559,6 +614,7 @@ HistogramEvolution.prototype.filter = function histogramEvolution_filter(opt) {
throw new Error("filter option: \"" + opt +"\" is not available");
}
return new HistogramEvolution(
this._measure,
this._filter_path.concat(opt),
this._data,
this._filter_tree[opt],
@ -598,7 +654,7 @@ HistogramEvolution.prototype.range =
var filter_ids = _listFilterIds(this._filter_tree);
// For each date we have to merge the filter_ids into merged_dataset
for (var datekey in this._data.values) {
for (var datekey in this._data) {
// Check that date is between start and end (if start and end is defined)
var date = _parseDateString(datekey);
@ -606,7 +662,7 @@ HistogramEvolution.prototype.range =
// Find dataset of this datekey, merge filter_ids for this dataset into
// merged_dataset.
var dataset = this._data.values[datekey];
var dataset = this._data[datekey];
// Copy all data arrays over... we'll filter and aggregate later
merged_dataset = merged_dataset.concat(dataset);
@ -615,8 +671,9 @@ HistogramEvolution.prototype.range =
// Create merged histogram
return new Telemetry.Histogram(
this._measure,
this._filter_path,
this._data.buckets,
this._buckets,
merged_dataset,
this._filter_tree,
this._spec
@ -626,7 +683,7 @@ HistogramEvolution.prototype.range =
/** Get the list of dates in the evolution sorted by date */
HistogramEvolution.prototype.dates = function HistogramEvolution_dates() {
var dates = [];
for(var date in this._data.values) {
for(var date in this._data) {
dates.push(_parseDateString(date));
}
return dates.sort();
@ -644,7 +701,7 @@ HistogramEvolution.prototype.each = function HistogramEvolution_each(cb, ctx) {
// Find and sort all date strings
var dates = [];
for(var date in this._data.values) {
for(var date in this._data) {
dates.push(date);
}
dates.sort();
@ -665,7 +722,7 @@ HistogramEvolution.prototype.each = function HistogramEvolution_each(cb, ctx) {
var n = dates.length;
for(var i = 0; i < n; i++) {
// Get dataset for date
var dataset = this._data.values[dates[i]];
var dataset = this._data[dates[i]];
// Filter for data_arrays with relevant filterId
dataset = dataset.filter(filterByFilterId);
@ -680,8 +737,9 @@ HistogramEvolution.prototype.each = function HistogramEvolution_each(cb, ctx) {
ctx,
_parseDateString(dates[i]),
new Telemetry.Histogram(
this._measure,
this._filter_path,
this._data.buckets,
this._buckets,
dataset,
this._filter_tree,
this._spec

Просмотреть файл

@ -16,8 +16,11 @@ setup(
'processor = dashboard.analysis:DashboardProcessor'
],
'console_scripts': [
'result2disk = dashboard.result2disk:main',
'aggregate_results = dashboard.aggregate_results:main'
'results2disk = dashboard.results2disk:main',
'aggregator = dashboard.aggregator:main',
'gzipclone = dashboard.gzipclone:main',
's3put = dashboard.s3put:main',
's3get = dashboard.s3get:main'
]
}
)