2013-10-30 02:03:49 +04:00
|
|
|
#!/usr/bin/env python
|
2013-11-05 04:15:16 +04:00
|
|
|
try:
|
|
|
|
import simplejson as json
|
|
|
|
except ImportError:
|
|
|
|
import json
|
2013-10-30 02:03:49 +04:00
|
|
|
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
|
|
|
|
from multiprocessing import Process, Queue
|
2013-10-26 01:23:48 +04:00
|
|
|
from traceback import print_exc
|
|
|
|
from subprocess import Popen, PIPE
|
2013-10-30 02:03:49 +04:00
|
|
|
from utils import mkdirp
|
|
|
|
from boto.s3.connection import S3Connection
|
|
|
|
from shutil import rmtree, copyfile, copytree
|
2013-11-05 04:15:16 +04:00
|
|
|
from pkg_resources import WorkingSet
|
2013-10-30 02:03:49 +04:00
|
|
|
from time import sleep
|
2013-12-04 04:08:21 +04:00
|
|
|
import tarfile
|
2013-10-30 02:03:49 +04:00
|
|
|
import os, sys
|
2013-10-26 01:23:48 +04:00
|
|
|
|
|
|
|
class AnalysisWorker(Process):
|
2013-10-30 02:03:49 +04:00
|
|
|
"""
|
|
|
|
Analysis worker that processes files from input_queue and
|
|
|
|
adds to output_queue when nb_files have been received
|
|
|
|
"""
|
2013-11-05 04:15:16 +04:00
|
|
|
def __init__(self, job_bundle, nb_files, aws_cred, input_queue, output_queue,
|
|
|
|
work_dir):
|
2013-10-26 01:23:48 +04:00
|
|
|
super(AnalysisWorker, self).__init__()
|
2013-10-30 02:03:49 +04:00
|
|
|
self.job_bundle_bucket, self.job_bundle_prefix = job_bundle
|
|
|
|
self.aws_cred = aws_cred
|
2013-10-26 01:23:48 +04:00
|
|
|
self.work_folder = work_dir
|
2013-10-30 02:03:49 +04:00
|
|
|
self.input_queue = input_queue
|
|
|
|
self.output_queue = output_queue
|
2013-10-26 01:23:48 +04:00
|
|
|
self.output_folder = os.path.join(self.work_folder, "output")
|
2013-10-30 02:03:49 +04:00
|
|
|
self.nb_files = nb_files
|
2013-10-26 01:23:48 +04:00
|
|
|
|
|
|
|
def setup(self):
|
|
|
|
# Remove work folder, no failures allowed
|
|
|
|
if os.path.exists(self.work_folder):
|
2013-10-30 02:03:49 +04:00
|
|
|
rmtree(self.work_folder, ignore_errors = False)
|
2013-10-26 01:23:48 +04:00
|
|
|
|
2013-10-30 02:03:49 +04:00
|
|
|
# Create work folder
|
|
|
|
mkdirp(self.work_folder)
|
2013-10-26 01:23:48 +04:00
|
|
|
mkdirp(self.output_folder)
|
|
|
|
|
2013-12-04 04:08:21 +04:00
|
|
|
job_bundle_target = os.path.join(self.work_folder, "job_bundle.tar.gz")
|
2013-10-30 02:03:49 +04:00
|
|
|
# If job_bundle_bucket is None then the bundle is stored locally
|
|
|
|
if self.job_bundle_bucket == None:
|
|
|
|
copyfile(self.job_bundle_prefix, job_bundle_target)
|
|
|
|
else:
|
|
|
|
s3 = S3Connection(**self.aws_cred)
|
|
|
|
bucket = s3.get_bucket(self.job_bundle_bucket, validate = False)
|
|
|
|
key = bucket.get_key(self.job_bundle_prefix)
|
|
|
|
key.get_contents_to_filename(job_bundle_target)
|
2013-10-26 01:23:48 +04:00
|
|
|
|
2013-12-04 04:08:21 +04:00
|
|
|
# Extract job_bundle
|
|
|
|
self.processor_path = os.path.join(self.work_folder, "code")
|
|
|
|
mkdirp(self.processor_path)
|
2013-12-04 04:38:25 +04:00
|
|
|
tar = tarfile.open(job_bundle_target)
|
2013-12-04 04:08:21 +04:00
|
|
|
tar.extractall(path = self.processor_path)
|
|
|
|
tar.close()
|
|
|
|
|
|
|
|
# Create processor
|
|
|
|
self.processor = Popen(
|
2013-12-04 04:38:25 +04:00
|
|
|
['./processor', os.path.relpath(self.output_folder, self.processor_path)],
|
2013-12-04 04:08:21 +04:00
|
|
|
cwd = self.processor_path,
|
|
|
|
bufsize = 1,
|
|
|
|
stdin = PIPE,
|
|
|
|
stdout = sys.stdout,
|
|
|
|
stderr = sys.stderr
|
|
|
|
)
|
2013-10-26 01:23:48 +04:00
|
|
|
|
|
|
|
def run(self):
|
|
|
|
try:
|
|
|
|
self.setup()
|
2013-10-30 02:03:49 +04:00
|
|
|
while self.nb_files > 0:
|
|
|
|
virtual_name, path = self.input_queue.get()
|
|
|
|
if path != None:
|
|
|
|
self.process_file(virtual_name, path)
|
|
|
|
self.nb_files -= 1
|
|
|
|
self.finish()
|
2013-10-26 01:23:48 +04:00
|
|
|
except:
|
|
|
|
print >> sys.stderr, "Failed job, cleaning up after this:"
|
|
|
|
print_exc(file = sys.stderr)
|
2013-10-30 02:03:49 +04:00
|
|
|
self.output_queue.put(False)
|
2013-10-26 01:23:48 +04:00
|
|
|
|
2013-10-30 02:03:49 +04:00
|
|
|
def finish(self):
|
2013-10-26 01:23:48 +04:00
|
|
|
# Ask processor to write output
|
2013-12-04 04:08:21 +04:00
|
|
|
self.processor.stdin.close()
|
|
|
|
self.processor.wait()
|
2013-10-26 01:23:48 +04:00
|
|
|
|
2013-12-04 04:38:25 +04:00
|
|
|
# Check return code
|
|
|
|
if self.processor.returncode == 0:
|
|
|
|
# Put output files to uploaders
|
|
|
|
for path, folder, files in os.walk(self.output_folder):
|
|
|
|
for f in files:
|
|
|
|
source = os.path.join(path, f)
|
|
|
|
target = os.path.relpath(os.path.join(path, f), self.output_folder)
|
|
|
|
self.output_queue.put((source, target))
|
|
|
|
|
|
|
|
# Put finished message
|
|
|
|
self.output_queue.put(True)
|
|
|
|
else:
|
|
|
|
print >> sys.stderr, "Processor exited non-zero, task failed"
|
|
|
|
self.output_queue.put(False)
|
2013-10-26 01:23:48 +04:00
|
|
|
|
2013-11-05 04:15:16 +04:00
|
|
|
def process_file(self, prefix, path):
|
2013-12-04 04:38:25 +04:00
|
|
|
path = os.path.relpath(path, self.processor_path)
|
2013-12-04 04:08:21 +04:00
|
|
|
self.processor.stdin.write("%s\t%s\n" % (prefix, path))
|
2013-10-26 01:23:48 +04:00
|
|
|
|
2013-10-30 02:03:49 +04:00
|
|
|
def main():
|
|
|
|
""" Run the worker with a job_bundle on a local input-file for debugging """
|
|
|
|
p = ArgumentParser(
|
|
|
|
description = 'Debug analysis script',
|
|
|
|
formatter_class = ArgumentDefaultsHelpFormatter
|
|
|
|
)
|
|
|
|
p.add_argument(
|
|
|
|
"job_bundle",
|
|
|
|
help = "The analysis bundle to run"
|
|
|
|
)
|
|
|
|
p.add_argument(
|
2013-12-04 04:54:05 +04:00
|
|
|
"-f", "--input",
|
|
|
|
help = "File with 'prefix <TAB> path' for files to process"
|
2013-10-30 02:03:49 +04:00
|
|
|
)
|
|
|
|
p.add_argument(
|
|
|
|
"-w", "--work-dir",
|
|
|
|
help = "Location to put temporary work files"
|
|
|
|
)
|
|
|
|
cfg = p.parse_args()
|
|
|
|
|
|
|
|
# Get a clean work folder
|
|
|
|
rmtree(cfg.work_dir, ignore_errors = False)
|
|
|
|
|
|
|
|
# Create work directories
|
|
|
|
work_dir = os.path.join(cfg.work_dir, "work-folder")
|
|
|
|
data_dir = os.path.join(cfg.work_dir, "data-folder")
|
|
|
|
mkdirp(work_dir)
|
2013-12-04 04:54:05 +04:00
|
|
|
mkdirp(data_dir)
|
2013-10-30 02:03:49 +04:00
|
|
|
|
|
|
|
# Setup queues
|
|
|
|
input_queue = Queue()
|
|
|
|
output_queue = Queue()
|
|
|
|
|
|
|
|
# Put input files in queue
|
2013-12-04 04:38:25 +04:00
|
|
|
nb_files = 0
|
2013-12-04 04:54:05 +04:00
|
|
|
with open(cfg.input, 'r') as input:
|
|
|
|
for line in input:
|
|
|
|
prefix, path = line.strip().split('\t')
|
|
|
|
source = os.path.join(data_dir, "file-%i" % nb_files)
|
|
|
|
copyfile(path, source)
|
|
|
|
input_queue.put((prefix, source))
|
2013-12-04 04:38:25 +04:00
|
|
|
nb_files += 1
|
2013-10-30 02:03:49 +04:00
|
|
|
|
|
|
|
# The empty set of AWS credentials
|
|
|
|
aws_cred = {
|
|
|
|
'aws_access_key_id': None,
|
|
|
|
'aws_secret_access_key': None
|
|
|
|
}
|
|
|
|
|
|
|
|
# Job bundle is stored locally
|
|
|
|
job_bundle = (None, cfg.job_bundle)
|
|
|
|
|
|
|
|
# Start analysis worker
|
2013-12-04 04:38:25 +04:00
|
|
|
worker = AnalysisWorker(job_bundle, nb_files, aws_cred,
|
2013-10-30 02:03:49 +04:00
|
|
|
input_queue, output_queue,
|
|
|
|
work_dir)
|
|
|
|
worker.start()
|
|
|
|
|
|
|
|
# Print messages from worker
|
|
|
|
while True:
|
|
|
|
msg = output_queue.get()
|
|
|
|
if msg is True:
|
|
|
|
print "Done with success"
|
|
|
|
break
|
|
|
|
elif msg is False:
|
|
|
|
print "Done with failure"
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
print "Upload: %s => %s" % msg
|
|
|
|
|
|
|
|
# Give it time to shutdown correctly
|
|
|
|
sleep(0.5)
|
|
|
|
|
|
|
|
# Terminate worker and join
|
|
|
|
worker.terminate()
|
|
|
|
worker.join()
|
|
|
|
print "Worker finished: %s" % worker.exitcode
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
sys.exit(main())
|