Added cmake rule for creating job_bundle.tar.gz

This commit is contained in:
Jonas Finnemann Jensen 2013-12-03 17:57:16 -08:00
Родитель 7180d9522a
Коммит 9a8890c68f
4 изменённых файлов: 82 добавлений и 2 удалений

Просмотреть файл

@ -34,4 +34,5 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-local-typedefs")
add_subdirectory(src)
add_subdirectory(dashboard)
add_subdirectory(tests)

11
dashboard/CMakeLists.txt Normal file
Просмотреть файл

@ -0,0 +1,11 @@
set(JOB_BUNDLE_SRC
analysis.py
auxiliary.py
processor
)
add_custom_target(
job_bundle ALL
COMMAND tar "-C" "${CMAKE_CURRENT_SOURCE_DIR}" "-czf" "job_bundle.tar.gz" ${JOB_BUNDLE_SRC}
)

Просмотреть файл

@ -1,9 +1,13 @@
#!/usr/bin/env python
try:
import simplejson as json
except ImportError:
import json
from datetime import datetime
from traceback import print_exc
import math, sys, os
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from subprocess import Popen, PIPE
# Import histogram specs and generated by makefile using specgen.py
# This is imported the zipfile that this module was loaded from
#HistogramAggregator = __loader__.load_module("auxiliary").HistogramAggregator
@ -63,8 +67,6 @@ simple_measures_buckets = (
############################## End of ugly hacks for simple measures
class DashboardProcessor:
INTERFACE = 'parsed-json'
def __init__(self, output_folder):
self.output_folder = output_folder
self.cache = {}
@ -177,3 +179,67 @@ class DashboardProcessor:
for filterPath, aggregator in cacheSet.iteritems():
output["/".join(filterPath)] = aggregator.dump()
out.write("/".join(filePath) + "\t" + json.dumps(output) + "\n")
############################## Driver
def main():
""" The processor, reading input file-names from stdin """
p = ArgumentParser(
description = 'Analyze files as they are given by stdin',
formatter_class = ArgumentDefaultsHelpFormatter
)
p.add_argument(
"-o", "--output-folder",
help = "Location to put final output"
)
cfg = p.parse_args()
processor = DashboardProcessor(cfg.output_folder)
# Handle input as it is fetched
for line in sys.stdin:
# Find s3 prefix (key) and physical filepath
prefix, filepath = line.split("\t")
filepath = filepath.strip()
# Find dimensions
dims = prefix.split('/')
dims += dims.pop().split('.')[:2]
# Open a compressor
raw_handle = open(filepath, "rb")
decompressor = Popen(
['xz', '-d', '-c'],
bufsize = 65536,
stdin = raw_handle,
stdout = PIPE,
stderr = sys.stderr
)
# Process each line
line_nb = 0
errors = 0
for line in decompressor.stdout:
line_nb += 1
try:
uid, payload = line.split("\t", 1)
payload = json.loads(payload)
processor.process(uid, dims, payload)
except:
print >> sys.stderr, ("Bad input line: %i of %s" %
(line_nb, prefix))
print_exc(file = sys.stderr)
errors += 1
# Close decompressor
decompressor.stdout.close()
raw_handle.close()
# Remove file after processing
os.remove(filepath)
# Write aggregates to disk
processor.flush()
if __name__ == "__main__":
sys.exit(main())

2
dashboard/processor Executable file
Просмотреть файл

@ -0,0 +1,2 @@
#!/bin/bash
python analysis.py -o $1