diff --git a/Formats.mkd b/Formats.mkd index d498157..b34807f 100644 --- a/Formats.mkd +++ b/Formats.mkd @@ -6,20 +6,16 @@ External users should include `telemetry.js` and consume data through this interface. Reading the raw data is hard and these data format may change, but the javascript interface is designed to be reasonably stable. - - - - - -Processor Output Format ------------------------ +Analysis Tasks Output Format +---------------------------- +The analysis task writes a single file to disk called `result.txt`. /my/dim/../ JSON -JSON: - revision: - buildid: - histogram: + filtrerPath: + revision: + buildid: + histogram: @@ -56,28 +52,28 @@ v2/ revisions.json -Web Facing Format +Web Facing JSON Format: ----------------- // MEASURE.json -{ - : [ - bucket0, - bucket1, - ..., - bucketN, - sum, # -1, if missing - log_sum, # -1, if missing - log_sum_squares, # -1, if missing - sum_squares_lo, # -1, if missing - sum_squares_hi, # -1, if missing - count - ], - ... -} +[ + [ + bucket0, + bucket1, + ..., + bucketN, + sum, # -1, if missing + log_sum, # -1, if missing + log_sum_squares, # -1, if missing + sum_squares_lo, # -1, if missing + sum_squares_hi, # -1, if missing + count, + filter_id, + ], +] filters.json { diff --git a/dashboard/aggregator.py b/dashboard/aggregator.py index dea3a02..20def68 100644 --- a/dashboard/aggregator.py +++ b/dashboard/aggregator.py @@ -5,7 +5,7 @@ from boto.sqs import connect_to_region as sqs_connect from boto.s3 import connect_to_region as s3_connect from boto.s3.key import Key from boto.sqs.jsonmessage import JSONMessage -from multiprocessing import Process, Queue, cpu_count +from multiprocessing import Queue, cpu_count from time import sleep import os, sys, shutil, gzip import json @@ -16,6 +16,7 @@ from utils import mkdirp from results2disk import results2disk from s3put import s3put from s3get import s3get, Downloader +from mergeresults import ResultContext, ResultMergingProcess class Aggregator: def __init__(self, input_queue, work_folder, bucket, prefix, region, aws_cred): @@ -121,6 +122,7 @@ class Aggregator: return (datetime.utcnow() - last_checkpoint).days def publish_results(self): + print "Uploading Results" # s3put compressed to current/... date = datetime.utcnow().strftime("%Y%m%d%H%M%S") current_prefix = self.prefix + 'current/%s/' % date @@ -144,17 +146,19 @@ class Aggregator: processed_msgblocks = [] last_flush = datetime.utcnow() while True: - # get 10 new_messages from sqs - msgs = queue.get_messages( - num_messages = 10, - wait_time_seconds = 20 - ) - print "Fetched %i messages" % len(msgs) + # get new_messages from sqs + messages = [] + for i in xrange(0, 2): + msgs = queue.get_messages(num_messages = 10) + messages += msgs + if len(msgs) > 0: + processed_msgblocks.append(msgs) + else: + break + print "Fetched %i messages" % len(messages) # process msgs - self.process_messages(msgs) - if len(msgs) > 0: - processed_msgblocks.append(msgs) - else: + self.process_messages(messages) + if len(messages) == 0: sleep(120) # Publish if necessary if (datetime.utcnow() - last_flush).seconds > 60 * 25: @@ -238,6 +242,79 @@ class Aggregator: for f in msg['files']: files_processed.write(f + "\n") + def process_messages_merging(self, msgs): + # Find results to download + results = [] + for msg in msgs: + if msg['target-prefix'] != None: + results.append(msg['target-prefix'] + 'result.txt') + + # Download results + if len(results) > 0: + target_paths = [] + download_queue = Queue() + result_queue = Queue() + # Make a job queue + i = 0 + for result in results: + i += 1 + result_path = os.path.join(self.work_folder, "result-%i.txt" % i) + download_queue.put((result, result_path)) + target_paths.append(result_path) + + # Start downloaders + downloaders = [] + for i in xrange(0, 16): + downloader = Downloader(download_queue, result_queue, + self.analysis_bucket_name, False, False, + self.region, self.aws_cred) + downloaders.append(downloader) + downloader.start() + download_queue.put(None) + + # Wait and process results as they are downloaded + result_merged_path = os.path.join(self.work_folder, "result-merged.txt") + worker = ResultMergingProcess(result_queue, target_paths, result_merged_path) + worker.start() + #ctx = ResultContext() + #while len(target_paths) > 0: + # result_path = result_queue.get(timeout = 20 * 60) + # ctx.merge_result_file(result_path) + # os.remove(result_path) + # target_paths.remove(result_path) + # print " - Merged result, % i left" % len(target_paths) + + # Check that downloaders downloaded correctly + for downloader in downloaders: + downloader.join() + if downloader.exitcode != 0: + sys.exit(1) + + worker.join() + if worker.exitcode != 0: + sys.exit(1) + + results2disk(result_merged_path, self.data_folder, False, False) + print " - Processed results" + + + # Update FILES_PROCESSED and FILES_MISSING + for msg in msgs: + # If there's no target-prefix the message failed + if msg['target-prefix'] is None: + # If target-prefix is None, then the message failed... we add the + # input files to list of missing files + files_missing_path = os.path.join(self.data_folder, 'FILES_MISSING') + with open(files_missing_path, 'a+') as files_missing: + for f in msg['files']: + files_missing.write(f + "\n") + else: + # Update FILES_PROCESSED + files_processed_path = os.path.join(self.data_folder, 'FILES_PROCESSED') + with open(files_processed_path, 'a+') as files_processed: + for f in msg['files']: + files_processed.write(f + "\n") + def main(): p = ArgumentParser( description = 'Aggregated and upload dashboard results', diff --git a/dashboard/mergeresults.py b/dashboard/mergeresults.py new file mode 100644 index 0000000..b31ab3b --- /dev/null +++ b/dashboard/mergeresults.py @@ -0,0 +1,59 @@ +try: + import simplejson as json +except ImportError: + import json +from urllib2 import urlopen, HTTPError +from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter +from boto.sqs import connect_to_region as sqs_connect +from multiprocessing import Process +import sys, os + +from auxiliary import HistogramAggregator +from s3get import s3get, Downloader + +class ResultContext: + def __init__(self): + self.cache = {} + + def merge_result_file(self, result_file): + with open(result_file, 'r') as f: + for line in f: + filePath, blob = line.split('\t') + blob = json.loads(blob) + self.merge_blob(filePath, blob) + + def merge_blob(self, filePath, blob): + existing_blob = self.cache.setdefault(filePath, {}) + for filterPath, dump in blob.iteritems(): + aggregator = existing_blob.get(filterPath, None) + if aggregator is None: + aggregator = HistogramAggregator(**dump) + existing_blob[filterPath] = aggregator + else: + aggregator.merge(**dump) + + def output(self, result_file): + with open(result_file, 'w') as f: + for filePath, blob in self.cache.iteritems(): + for filterPath, aggregator in blob.iteritems(): + blob[filterPath] = aggregator.dump() + f.write(filePath + "\t") + f.write(json.dumps(blob)) + f.write('\n') + +class ResultMergingProcess(Process): + def __init__(self, queue, path_set, result_path): + super(ResultMergingProcess, self).__init__() + self.queue = queue + self.path_set = path_set + self.result_path = result_path + self.ctx = ResultContext() + + def run(self): + while len(self.path_set) > 0: + input_path = self.queue.get(timeout = 20 * 60) + self.path_set.remove(input_path) + self.ctx.merge_result_file(input_path) + os.remove(input_path) + print " - Merged result, % i left" % len(self.path_set) + self.ctx.output(self.result_path) \ No newline at end of file diff --git a/dashboard/results2disk.py b/dashboard/results2disk.py index af2e1bb..45678bf 100644 --- a/dashboard/results2disk.py +++ b/dashboard/results2disk.py @@ -40,12 +40,15 @@ def save_json_to_file(path, value, compress, pretty_print): class ChannelVersionManager: """ Manages data stored for a specific channel / version """ - def __init__(self, root_folder, channel, version, compress, pretty_print): + def __init__(self, root_folder, channel, version, compress, pretty_print, cached = False): self.data_folder = os.path.join(root_folder, channel, version) mkdirp(self.data_folder) self.compress = compress self.pretty_print = pretty_print self.max_filter_id = None + self.cached = cached + if cached: + self.cache = {} # Load filter-tree self.filter_tree = self.json_from_file( @@ -61,13 +64,23 @@ class ChannelVersionManager: def json_from_file(self, filename, fallback_value): """ Load json from file, return fallback_value if no file exists """ + if self.cached: + data = self.cache.get(filename, None) + if data is None: + path = os.path.join(self.data_folder, filename) + data = load_json_from_file(path, self.compress, fallback_value) + self.cache[filename] = data + return data path = os.path.join(self.data_folder, filename) return load_json_from_file(path, self.compress, fallback_value) def json_to_file(self, filename, value): """ Write JSON to file """ - path = os.path.join(self.data_folder, filename) - save_json_to_file(path, value, self.compress, self.pretty_print) + if self.cached: + self.cache[filename] = value + else: + path = os.path.join(self.data_folder, filename) + save_json_to_file(path, value, self.compress, self.pretty_print) def get_next_filter_id(self): """ Get the next filter id available """ @@ -103,6 +116,12 @@ class ChannelVersionManager: def flush(self): """ Output cache values """ + # Output all files + if self.cached: + for filename, value in self.cache.iteritems(): + path = os.path.join(self.data_folder, filename) + save_json_to_file(path, value, self.compress, self.pretty_print) + # Output filter tree self.json_to_file('filter-tree.json', self.filter_tree) @@ -245,7 +264,7 @@ def results2disk(result_file, output_folder, compress, pretty_print): if manager is None: manager = ChannelVersionManager(output_folder, channel, majorVersion, - compress, pretty_print) + compress, pretty_print, False) cache[(channel, majorVersion)] = manager manager.merge_in_blob(measure, byDateType, blob) diff --git a/dashboard/s3get.py b/dashboard/s3get.py index d0419a6..75aa4a8 100644 --- a/dashboard/s3get.py +++ b/dashboard/s3get.py @@ -1,6 +1,6 @@ from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter import sys, os, gzip -from cStringIO import StringIO +from StringIO import StringIO from utils import mkdirp from multiprocessing import Process, Queue, cpu_count from boto.s3 import connect_to_region as s3_connect diff --git a/dashboard/s3put.py b/dashboard/s3put.py index 6d5f25e..06682ac 100644 --- a/dashboard/s3put.py +++ b/dashboard/s3put.py @@ -1,6 +1,6 @@ from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter import sys, os, gzip -from cStringIO import StringIO +from StringIO import StringIO from utils import mkdirp from multiprocessing import Process, Queue, cpu_count from boto.s3 import connect_to_region as s3_connect diff --git a/html/js/telemetry.js b/html/js/telemetry.js index 9716160..0210a3e 100644 --- a/html/js/telemetry.js +++ b/html/js/telemetry.js @@ -7,7 +7,7 @@ var Telemetry = {}; // Data folder from which data will be loaded, another level indicating current // folder will be initialized by Telemetry.init() -var _data_folder = 'https://s3-us-west-2.amazonaws.com/telemetry-dashboard/v3'; +var _data_folder = 'https://s3-us-west-2.amazonaws.com/telemetry-dashboard/v2'; // Boolean tracks if we've initialized var _initialized = false;