Aggregator hacks to handle data fast... on local laptops when we process a lot

This commit is contained in:
Jonas Finnemann Jensen 2013-11-06 17:04:52 -08:00
Родитель 342a331465
Коммит c84798fb5f
7 изменённых файлов: 196 добавлений и 45 удалений

Просмотреть файл

@ -6,20 +6,16 @@ External users should include `telemetry.js` and consume data through this
interface. Reading the raw data is hard and these data format may change, but
the javascript interface is designed to be reasonably stable.
Processor Output Format
-----------------------
Analysis Tasks Output Format
----------------------------
The analysis task writes a single file to disk called `result.txt`.
/my/dim/../ JSON
JSON:
revision:
buildid:
histogram:
filtrerPath:
revision:
buildid:
histogram:
@ -56,28 +52,28 @@ v2/
revisions.json
Web Facing Format
Web Facing JSON Format:
-----------------
/<channel>/<version>
MEASURE.json
{
<filter_id>: [
bucket0,
bucket1,
...,
bucketN,
sum, # -1, if missing
log_sum, # -1, if missing
log_sum_squares, # -1, if missing
sum_squares_lo, # -1, if missing
sum_squares_hi, # -1, if missing
count
],
<filter_id>...
}
[
[
bucket0,
bucket1,
...,
bucketN,
sum, # -1, if missing
log_sum, # -1, if missing
log_sum_squares, # -1, if missing
sum_squares_lo, # -1, if missing
sum_squares_hi, # -1, if missing
count,
filter_id,
],
]
filters.json
{

Просмотреть файл

@ -5,7 +5,7 @@ from boto.sqs import connect_to_region as sqs_connect
from boto.s3 import connect_to_region as s3_connect
from boto.s3.key import Key
from boto.sqs.jsonmessage import JSONMessage
from multiprocessing import Process, Queue, cpu_count
from multiprocessing import Queue, cpu_count
from time import sleep
import os, sys, shutil, gzip
import json
@ -16,6 +16,7 @@ from utils import mkdirp
from results2disk import results2disk
from s3put import s3put
from s3get import s3get, Downloader
from mergeresults import ResultContext, ResultMergingProcess
class Aggregator:
def __init__(self, input_queue, work_folder, bucket, prefix, region, aws_cred):
@ -121,6 +122,7 @@ class Aggregator:
return (datetime.utcnow() - last_checkpoint).days
def publish_results(self):
print "Uploading Results"
# s3put compressed to current/...
date = datetime.utcnow().strftime("%Y%m%d%H%M%S")
current_prefix = self.prefix + 'current/%s/' % date
@ -144,17 +146,19 @@ class Aggregator:
processed_msgblocks = []
last_flush = datetime.utcnow()
while True:
# get 10 new_messages from sqs
msgs = queue.get_messages(
num_messages = 10,
wait_time_seconds = 20
)
print "Fetched %i messages" % len(msgs)
# get new_messages from sqs
messages = []
for i in xrange(0, 2):
msgs = queue.get_messages(num_messages = 10)
messages += msgs
if len(msgs) > 0:
processed_msgblocks.append(msgs)
else:
break
print "Fetched %i messages" % len(messages)
# process msgs
self.process_messages(msgs)
if len(msgs) > 0:
processed_msgblocks.append(msgs)
else:
self.process_messages(messages)
if len(messages) == 0:
sleep(120)
# Publish if necessary
if (datetime.utcnow() - last_flush).seconds > 60 * 25:
@ -238,6 +242,79 @@ class Aggregator:
for f in msg['files']:
files_processed.write(f + "\n")
def process_messages_merging(self, msgs):
# Find results to download
results = []
for msg in msgs:
if msg['target-prefix'] != None:
results.append(msg['target-prefix'] + 'result.txt')
# Download results
if len(results) > 0:
target_paths = []
download_queue = Queue()
result_queue = Queue()
# Make a job queue
i = 0
for result in results:
i += 1
result_path = os.path.join(self.work_folder, "result-%i.txt" % i)
download_queue.put((result, result_path))
target_paths.append(result_path)
# Start downloaders
downloaders = []
for i in xrange(0, 16):
downloader = Downloader(download_queue, result_queue,
self.analysis_bucket_name, False, False,
self.region, self.aws_cred)
downloaders.append(downloader)
downloader.start()
download_queue.put(None)
# Wait and process results as they are downloaded
result_merged_path = os.path.join(self.work_folder, "result-merged.txt")
worker = ResultMergingProcess(result_queue, target_paths, result_merged_path)
worker.start()
#ctx = ResultContext()
#while len(target_paths) > 0:
# result_path = result_queue.get(timeout = 20 * 60)
# ctx.merge_result_file(result_path)
# os.remove(result_path)
# target_paths.remove(result_path)
# print " - Merged result, % i left" % len(target_paths)
# Check that downloaders downloaded correctly
for downloader in downloaders:
downloader.join()
if downloader.exitcode != 0:
sys.exit(1)
worker.join()
if worker.exitcode != 0:
sys.exit(1)
results2disk(result_merged_path, self.data_folder, False, False)
print " - Processed results"
# Update FILES_PROCESSED and FILES_MISSING
for msg in msgs:
# If there's no target-prefix the message failed
if msg['target-prefix'] is None:
# If target-prefix is None, then the message failed... we add the
# input files to list of missing files
files_missing_path = os.path.join(self.data_folder, 'FILES_MISSING')
with open(files_missing_path, 'a+') as files_missing:
for f in msg['files']:
files_missing.write(f + "\n")
else:
# Update FILES_PROCESSED
files_processed_path = os.path.join(self.data_folder, 'FILES_PROCESSED')
with open(files_processed_path, 'a+') as files_processed:
for f in msg['files']:
files_processed.write(f + "\n")
def main():
p = ArgumentParser(
description = 'Aggregated and upload dashboard results',

59
dashboard/mergeresults.py Normal file
Просмотреть файл

@ -0,0 +1,59 @@
try:
import simplejson as json
except ImportError:
import json
from urllib2 import urlopen, HTTPError
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from boto.sqs import connect_to_region as sqs_connect
from multiprocessing import Process
import sys, os
from auxiliary import HistogramAggregator
from s3get import s3get, Downloader
class ResultContext:
def __init__(self):
self.cache = {}
def merge_result_file(self, result_file):
with open(result_file, 'r') as f:
for line in f:
filePath, blob = line.split('\t')
blob = json.loads(blob)
self.merge_blob(filePath, blob)
def merge_blob(self, filePath, blob):
existing_blob = self.cache.setdefault(filePath, {})
for filterPath, dump in blob.iteritems():
aggregator = existing_blob.get(filterPath, None)
if aggregator is None:
aggregator = HistogramAggregator(**dump)
existing_blob[filterPath] = aggregator
else:
aggregator.merge(**dump)
def output(self, result_file):
with open(result_file, 'w') as f:
for filePath, blob in self.cache.iteritems():
for filterPath, aggregator in blob.iteritems():
blob[filterPath] = aggregator.dump()
f.write(filePath + "\t")
f.write(json.dumps(blob))
f.write('\n')
class ResultMergingProcess(Process):
def __init__(self, queue, path_set, result_path):
super(ResultMergingProcess, self).__init__()
self.queue = queue
self.path_set = path_set
self.result_path = result_path
self.ctx = ResultContext()
def run(self):
while len(self.path_set) > 0:
input_path = self.queue.get(timeout = 20 * 60)
self.path_set.remove(input_path)
self.ctx.merge_result_file(input_path)
os.remove(input_path)
print " - Merged result, % i left" % len(self.path_set)
self.ctx.output(self.result_path)

Просмотреть файл

@ -40,12 +40,15 @@ def save_json_to_file(path, value, compress, pretty_print):
class ChannelVersionManager:
""" Manages data stored for a specific channel / version """
def __init__(self, root_folder, channel, version, compress, pretty_print):
def __init__(self, root_folder, channel, version, compress, pretty_print, cached = False):
self.data_folder = os.path.join(root_folder, channel, version)
mkdirp(self.data_folder)
self.compress = compress
self.pretty_print = pretty_print
self.max_filter_id = None
self.cached = cached
if cached:
self.cache = {}
# Load filter-tree
self.filter_tree = self.json_from_file(
@ -61,13 +64,23 @@ class ChannelVersionManager:
def json_from_file(self, filename, fallback_value):
""" Load json from file, return fallback_value if no file exists """
if self.cached:
data = self.cache.get(filename, None)
if data is None:
path = os.path.join(self.data_folder, filename)
data = load_json_from_file(path, self.compress, fallback_value)
self.cache[filename] = data
return data
path = os.path.join(self.data_folder, filename)
return load_json_from_file(path, self.compress, fallback_value)
def json_to_file(self, filename, value):
""" Write JSON to file """
path = os.path.join(self.data_folder, filename)
save_json_to_file(path, value, self.compress, self.pretty_print)
if self.cached:
self.cache[filename] = value
else:
path = os.path.join(self.data_folder, filename)
save_json_to_file(path, value, self.compress, self.pretty_print)
def get_next_filter_id(self):
""" Get the next filter id available """
@ -103,6 +116,12 @@ class ChannelVersionManager:
def flush(self):
""" Output cache values """
# Output all files
if self.cached:
for filename, value in self.cache.iteritems():
path = os.path.join(self.data_folder, filename)
save_json_to_file(path, value, self.compress, self.pretty_print)
# Output filter tree
self.json_to_file('filter-tree.json', self.filter_tree)
@ -245,7 +264,7 @@ def results2disk(result_file, output_folder, compress, pretty_print):
if manager is None:
manager = ChannelVersionManager(output_folder,
channel, majorVersion,
compress, pretty_print)
compress, pretty_print, False)
cache[(channel, majorVersion)] = manager
manager.merge_in_blob(measure, byDateType, blob)

Просмотреть файл

@ -1,6 +1,6 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import sys, os, gzip
from cStringIO import StringIO
from StringIO import StringIO
from utils import mkdirp
from multiprocessing import Process, Queue, cpu_count
from boto.s3 import connect_to_region as s3_connect

Просмотреть файл

@ -1,6 +1,6 @@
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import sys, os, gzip
from cStringIO import StringIO
from StringIO import StringIO
from utils import mkdirp
from multiprocessing import Process, Queue, cpu_count
from boto.s3 import connect_to_region as s3_connect

Просмотреть файл

@ -7,7 +7,7 @@ var Telemetry = {};
// Data folder from which data will be loaded, another level indicating current
// folder will be initialized by Telemetry.init()
var _data_folder = 'https://s3-us-west-2.amazonaws.com/telemetry-dashboard/v3';
var _data_folder = 'https://s3-us-west-2.amazonaws.com/telemetry-dashboard/v2';
// Boolean tracks if we've initialized
var _initialized = false;