Refactored task into a class and added shuffle phase

This commit is contained in:
Scott Robertson 2013-04-16 22:09:28 -07:00
Родитель f51849503e
Коммит aedfb2bc9b
12 изменённых файлов: 655 добавлений и 52 удалений

Просмотреть файл

@ -13,7 +13,7 @@ We reccomend using pip and virtualenv to install.
$ virtualenv leisure
$ cd leisure
$ . ./bin/activate
$ pip install -e git+git@github.com:trivio/leisure.git
$ pip install -e git+git@github.com:trivio/leisure.git#egg=leisure
$ pip install -r src/leisure/requirements.txt
```

Просмотреть файл

@ -29,10 +29,13 @@ from __future__ import absolute_import
import sys
from .disco import run_script
from . import shuffle
import tempfile
def main():
script = sys.argv[1]
run_script(script)
run_script(script, tempfile.mkdtemp())
if __name__ == "__main__":

Просмотреть файл

@ -5,25 +5,30 @@ import os
from StringIO import StringIO
from urlparse import urlparse, parse_qs
import threading
import re
from functools import partial
import hashlib
from datetime import datetime
import time
import disco
from disco.core import Disco
from .io import puts
from . import event_loop
from . import job_control
def run_script(script):
disco_url_regex = re.compile(".*?://.*?/disco/(.*)")
def run_script(script, data_root):
loop = start_event_loop()
job_control.set_event_loop(loop)
try:
patch_disco()
os.environ['DISCO_HOME'] = disco.__path__[0]
os.environ['DISCO_DATA'] = data_root
globals_ = {
"__name__" : "__main__",
"__file__" : script,
@ -100,4 +105,33 @@ def get_results():
(job.name, [job.status, job.results]) for job in jobs
])
def disco_url_path(url):
return disco_url_regex.match(url).group(1)
def job_home(job_name, root):
return os.path.join(root, hex_hash(job_name), job_name)
def job_url(host, job_name):
return os.path.join("disco", host, hex_hash(job_name), job_name)
def hex_hash(path):
"""
Return the first 2 hex digits of the md5 of the given path.
Suitable for creating sub dirs to break up a large directory
"""
return hashlib.md5(path).hexdigest()[:2]
def timestamp(dt=None):
"""
Return a timestamp in the format of hex(megasecconds)-hex(seconds)-hex(microseconds)
The timestamp should be monotonically increasing and hence usable as a uuid
"""
if dt is None:
dt = datetime.utcnow()
mega, seconds = map(int, divmod(time.mktime(dt.timetuple()), 10**6))
return "{:x}-{:x}-{:x}".format(mega, seconds, dt.microsecond)

Просмотреть файл

@ -12,8 +12,14 @@ def stop():
"""Stops the current event loop"""
current_event_loop().stop()
def call_later(when, method, *args):
return current_event_loop().call_later(when, method, *args)
def call_soon(method, *args):
return Future()
return current_event_loop().call_soon(method, *args)
def call_soon_threadsafe(method, *args):
return current_event_loop().call_soon_threadsafe(method, *args)
def current_event_loop():
if not hasattr(local, 'event_loop'):

Просмотреть файл

@ -10,6 +10,12 @@ def puts(msg, write=sys.stdout.write):
write(line)
write('\n')
def indent(msg, write=sys.stdout.write):
for line in msg.splitlines():
write(" ")
write(line)
write('\n')
def readuntil(stream,term):
b = bytearray()
while True:

Просмотреть файл

@ -4,22 +4,38 @@ from datetime import datetime
import os
import stat
import tempfile
import zipfile
from StringIO import StringIO
from disco.job import JobPack
import leisure
class Job(object):
def __init__(self, jobpack):
self.jobpack = JobPack.load(StringIO(jobpack))
self.job_dir = extract_jobhome(self.jobpack.jobhome)
self.host = "localhost"
self.data_root = os.environ['DISCO_DATA']
self.name = "{}@{}".format(self.prefix, leisure.disco.timestamp())
self.home = os.path.join(
self.host,
leisure.disco.hex_hash(self.name),
self.name
)
self.job_dir = extract_jobhome(
os.path.join(self.data_root, self.home),
self.jobpack.jobhome
)
self.save_jobfile(jobpack)
self.ensure_worker_executable()
self.results = []
self.status = "active"
self.name = self.prefix
@property
def prefix(self):
@ -41,6 +57,16 @@ class Job(object):
def jobfile_path(self):
return os.path.join(self.job_dir, "jobfile")
@property
def has_map_phase(self):
"""Return true if the job has a map phase"""
return self.jobpack.jobdict['map?']
@property
def has_reduce_phase(self):
"""Return true if the job has a map phase"""
return self.jobpack.jobdict['reduce?']
def info(self):
return dict(
timestamp = str(datetime.utcnow()),
@ -68,10 +94,9 @@ class Job(object):
os.chmod(worker_path, st.st_mode | stat.S_IEXEC)
def extract_jobhome(jobhome):
def extract_jobhome(path, jobhome):
"""Extract job to a tempporary directory and returns it's path"""
z = zipfile.ZipFile(StringIO(jobhome), 'r')
path = tempfile.mkdtemp()
z.extractall(path)
return path
return path

Просмотреть файл

@ -1,10 +1,30 @@
import os
import time
from collections import namedtuple
from .job import Job
from . import worker
from .task import Task
import leisure
jobs = {}
def get(name):
return jobs.get(name)
def all():
return jobs.values()
def active():
return filter(lambda j: j.status == "active", all())
def ready():
return filter(lambda j: j.status == "ready", all())
def dead():
return filter(lambda j: j.status == "dead", all())
event_loop = None
def set_event_loop(new_event_loop):
@ -15,13 +35,13 @@ def new(jobpack):
job = Job(jobpack)
store_with_unique_name(job)
handle = event_loop.call_soon(start, job)
handle = event_loop.call_soon(map_reduce, job)
return job
def get(name):
return jobs.get(name)
def store_with_unique_name(job):
jobs[job.name] = job
return job.name
while True:
name = "{}@{}".format(job.prefix, time.time())
if name not in jobs:
@ -29,23 +49,79 @@ def store_with_unique_name(job):
jobs[name] = job
return name
def start(job):
path = job.job_dir
for input in job.inputs:
worker.start(
job,
dict(
host = "localhost",
disco_data = os.path.join(path, "data"),
ddfs_data = os.path.join(path, "ddfs"),
master = "http://localhost:8989",
taskid = 0,
jobfile = job.jobfile_path,
mode = "map",
jobname = job.name,
disco_port = 8989,
put_port = 8990
),
input,
)
def map_reduce(job):
def _reduce(inputs):
return reduce(inputs, job, _finished)
def _finished(results):
job.status = "ready"
map(job.inputs, job, _reduce)
def map(inputs, job, cb):
if not job.has_map_phase:
return event_loop.call_soon(cb, inputs)
#return inuts
else:
return run_phase(map_inputs(inputs), "map", job, cb)
def map_inputs(inputs):
if not hasattr(inputs, '__iter__'):
inputs = [inputs]
return inputs
def reduce(inputs, job, cb):
if not job.has_reduce_phase:
return event_loop.call_soon(cb, inputs)
else:
return run_phase(map_inputs(inputs), "reduce", job, cb)
def results(job, mode, local_results, global_results, **state):
res = leisure.shuffle.combine_tasks(
job.data_root,
job.name, mode,
local_results
)
return sorted(set(global_results).union(res))
def run_phase(inputs, mode, job, cb):
if not inputs:
cb(inputs)
path = job.job_dir
state = dict(
mode = mode,
job = job,
cb = cb,
outstanding = len(inputs),
local_results = [],
global_results = []
#task_results = {}
)
for id, input in enumerate(inputs):
task = Task(id, job, input, mode)
task.on('done', on_task_done, task, state)
worker.start(task)
def on_task_done(task, state):
local_result,global_results = task.results()
if local_result:
state['local_results'].append((task.host, local_result))
state["global_results"].extend(global_results)
state["outstanding"] -= 1
if state["outstanding"] == 0:
state["cb"](results(**state))

Просмотреть файл

@ -1,5 +1,19 @@
import os
import os, errno
def relative(path1, path2):
if os.path.isfile(path1):
path1 = os.path.dirname(path1)
return os.path.abspath(os.path.join(path1,path2))
def makedirs(path):
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise
return path
def ensure_dir(path):
return makedirs(os.path.dirname(path))

101
leisure/shuffle.py Normal file
Просмотреть файл

@ -0,0 +1,101 @@
import os
from itertools import groupby, chain
from . import disco
from .path import makedirs
def combine_tasks(data_root, job, mode, task_results):
key_func = lambda a: a[0]
for node, urls in groupby(sorted(task_results, key=key_func), key_func):
dir_urls = [url for node, url in urls]
#print node, dir_urls
yield combine_tasks_node(node, data_root, job, mode, dir_urls)
def combine_tasks_node(host, data_root, job_name, mode, dir_urls):
job_home = disco.job_home(job_name, os.path.join(data_root, host))
results_home = os.path.join(job_home, ".disco")
job_link = disco.job_home(job_name, os.path.join(host, "disco", host))
results_link = os.path.join(job_link, ".disco")
part_dir = "partitions-{}".format(disco.timestamp())
part_path = os.path.join(results_home, part_dir)
part_url = os.path.join("disco://", results_link, part_dir)
index_file = "{}-index.txt.gz".format(mode)
index_path = os.path.join(results_home, index_file)
index_url = os.path.join("dir://", results_link, index_file)
makedirs(part_path)
index = merged_index(dir_urls, data_root, (part_path, part_url))
write_index(index_path, index)
return index_url
def merged_index(dir_urls, data_root, part_info):
return set(chain.from_iterable(
process_task(dir_url, data_root, part_info)
for dir_url in dir_urls
))
def process_task(dir_url, data_root, part_info):
task_path = disco.disco_url_path(dir_url)
for line in open(os.path.join(data_root, task_path)):
e = line.split()
yield process_url(e, data_root, part_info)
def process_url((label, url), data_root, (part_path, part_url)):
"""
Given a lable and a url merge the data in the url if
it is a local result (i.e. starts with "part://") and return
the url to the new part file. Otherwise returns the label and
url unmodified.
"""
if not url.startswith('part://'):
return "{} {}\n".format(label, url)
else:
part_file = "part-{}".format(label)
part_src = os.path.join(data_root, disco.disco_url_path(url))
part_dst = os.path.join(part_path, part_file)
concat(part_src, part_dst)
return "{} {}/{}\n".format(label, part_url, part_file)
def write_index(filename, lines):
"""
Atomic write of index
Output lines to a temporary file renaming it when done.
Returns the name of the file written
"""
tmp_path = "{}-{}".format(filename, disco.timestamp())
output = open(tmp_path, 'w')
output.writelines(lines)
output.close()
os.rename(tmp_path, filename)
return filename
def concat(src_path, dst_path):
src = open(src_path, 'rb')
dst = open(dst_path, 'ab')
while True:
chunk = src.read(524288)
if chunk:
dst.write(chunk)
else:
break
src.close()
dst.close()

101
leisure/task.py Normal file
Просмотреть файл

@ -0,0 +1,101 @@
import os
import time
from collections import defaultdict
from .path import ensure_dir
import leisure
class Task(object):
def __init__(self, id, job, input, mode):
self.id = id
self.job = job
self.input = input
self.mode = mode
self.persisted_output = []
self.output_file_name = None
self.output_file = None
self.host ="localhost"
self.callbacks = defaultdict(list)
def on(self, event, callback, *args):
self.callbacks[event].append((callback, args))
def fire(self, event):
for callback, args in self.callbacks[event]:
callback(*args)
def done(self):
if self.output_file:
self.output_file.close()
self.fire('done')
def info(self):
path = self.job.job_dir
return dict(
host = self.host,
disco_data = os.path.join(path, "data"),
ddfs_data = os.path.join(path, "ddfs"),
master = "http://localhost:8989",
taskid = self.id,
jobfile = self.job.jobfile_path,
mode = self.mode,
jobname = self.job.name,
disco_port = 8989,
put_port = 8990
)
@property
def job_dir(self):
return self.job.job_dir
@property
def worker_path(self):
return self.job.worker_path
def add_output(self, path, type, label):
if self.output_file is None:
self.new_output_file()
self.output_file.write(self.format_output_line(path, type, label))
def new_output_file(self):
self.output_file_name = os.path.join(self.job_dir, self.results_filename())
ensure_dir(self.output_file_name)
self.output_file = open(self.output_file_name, 'w')
def results_filename(self):
time_stamp = int(time.time() * 1000)
return os.path.join('.disco', "{}-{}-{:d}.results".format(
self.mode,
self.id,
time_stamp
))
def results(self):
if self.output_file_name:
local_results = self.local_results(self.output_file_name)
else:
local_results = None
return local_results, self.persisted_output
def local_results(self, file_name):
return "dir://{}/{}".format(
self.host,
self.url_path(os.path.relpath(file_name, self.job_dir))
)
def format_output_line(self, local_path, type, label="0"):
return "{} {}://{}/{}\n".format(label, type, self.host, self.url_path(local_path))
def url_path(self, local_path):
prefix = leisure.disco.job_url(self.host, self.job.name)
return os.path.join(prefix, local_path)

Просмотреть файл

@ -4,26 +4,27 @@ from subprocess import PIPE
import subprocess
import json
from .io import puts, readuntil, readbytes
from .io import puts, indent, readuntil, readbytes
from .event_loop import add_reader, remove_reader
from .path import relative
def start(job, task, input):
puts("Starting job in {}".format(job.job_dir))
def start(task):
puts("Starting job in {}".format(task.job_dir))
indent("inputs {}".format(task.input))
env = os.environ.copy()
proc = subprocess.Popen(
[job.worker_path],
[task.worker_path],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
cwd=job.job_dir,
cwd=task.job_dir,
env=env
)
add_reader(proc.stderr, worker_stream(proc, job, task, input), proc.stderr)
add_reader(proc.stderr, worker_stream(proc, task), proc.stderr)
def worker_stream(proc, job, task, input):
def worker_stream(proc, task):
"""
Returns a function bound to the supplied proc suitable for interpreting
the disco work protocol.
@ -35,7 +36,7 @@ def worker_stream(proc, job, task, input):
packet, packet_reader[0] = packet_reader[0](stream)
#print "<--" + str(packet),
r = response(proc, job, task, input, packet)
r = response(proc, task, packet)
#print "--> {}".format(r)
if r is not None:
proc.stdin.write(r)
@ -79,7 +80,7 @@ def done(proc):
remove_reader(proc.stderr)
return msg("OK", "ok")
def response(proc, job, task, input, packet):
def response(proc, task, packet):
type,size,payload = packet
payload = json.loads(payload)
@ -88,24 +89,28 @@ def response(proc, job, task, input, packet):
elif type == 'MSG':
puts(payload)
return msg("OK","")
elif type in ('ERROR','FATAL'):
job.status = "dead"
elif type == ('ERROR','FATAL'):
# todo: fail, the task
task.job.status = "dead"
done(proc)
puts("{}\n{}".format(type, payload))
return None
elif type == 'TASK':
return msg('TASK',task)
return msg('TASK',task.info())
elif type == "INPUT":
return msg('INPUT', [
u'done', [
[0, u'ok', [[0, input]]]
[0, u'ok', [[0, task.input]]]
]
])
elif type == "OUTPUT":
puts("{} {} {}".format(*packet))
task.add_output(*payload)
return msg("OK", 'ok')
elif type == "DONE":
job.status = "ready"
task.done()
return done(proc)
else:
pass

232
tests/test_shuffle.py Normal file
Просмотреть файл

@ -0,0 +1,232 @@
import os
from unittest import TestCase
from nose.tools import eq_
import tempfile
import shutil
from leisure import shuffle, disco
from leisure.path import makedirs
def cat(fname, content):
open(fname,'w').write(content)
class TestShuffle(TestCase):
def setUp(self):
self.data_root = tempfile.mkdtemp()
self.job_name = "Job@123"
self.host = "localhost"
self.job_home = disco.job_home(self.job_name, os.path.join(self.data_root, self.host))
self.job_url = disco.job_url(self.host, self.job_name)
makedirs(self.job_home)
self.part_info = self.make_part_info(self.job_home)
def tearDown(self):
shutil.rmtree(self.data_root)
def make_part_info(self, job_home):
part_dir = "partitions-{}".format(shuffle.timestamp())
part_path = os.path.join(
job_home,
part_dir
)
makedirs(part_path)
part_url = os.path.join("disco://localhost", self.job_url, part_dir)
return (
part_path,
part_url
)
def mk_output_file(self, name, content, job_home=None):
if job_home is None:
job_home = self.job_home
path = os.path.join(job_home, name)
cat(path, content)
return path
def mk_task_results(self, task_name, mode='map', host="localhost"):
"""
Creates a file suitable for using as task results and return it's url
"""
job_home = disco.job_home(self.job_name, os.path.join(self.data_root, host))
self.mk_output_file('{}-0'.format(mode),
'line1\n'
'line2\n',
job_home=job_home
)
self.mk_output_file('{}-1'.format(mode),
'line1\n'
'line2\n',
job_home=job_home
)
self.mk_output_file('{}-2'.format(mode),
'line1\n'
'line2\n',
job_home=job_home
)
job_url = disco.job_url(host, self.job_name)
makedirs(job_home)
task_result_path = os.path.join(job_home, task_name)
cat(task_result_path,
(
"0 part://{host}/{job_url}/{mode}-0\n"
"1 part://{host}/{job_url}/{mode}-1\n"
"0 part://{host}/{job_url}/{mode}-2\n"
).format(job_url = job_url, host=host, mode=mode)
)
return os.path.join("disco://", host, job_url, task_name)
def test_write_index(self):
index = [
"line1\n",
"line2\n"
]
filename = os.path.join(self.data_root, "blah")
shuffle.write_index(filename, index)
read_lines = open(filename).readlines()
self.assertSequenceEqual(index, read_lines)
def test_process_url_non_local(self):
eq_(
'0 tag://blah\n',
shuffle.process_url(
("0", "tag://blah"),
self.data_root,
self.part_info
)
)
def test_process_url_local(self):
self.mk_output_file('map-0',
'line1\n'
'line2\n'
)
self.mk_output_file('map-1',
'line3\n'
'line4\n'
)
part_path,part_url = self.part_info
part_dir = os.path.basename(part_path)
eq_(
'0 disco://localhost/{}/{}/part-0\n'.format(self.job_url, part_dir),
shuffle.process_url(
("0", "part://localhost/{}/map-0".format(self.job_url)),
self.data_root,
self.part_info
)
)
eq_(
open(os.path.join(part_path, "part-0")).read(),
'line1\n'
'line2\n'
)
eq_(
'0 disco://localhost/{}/{}/part-0\n'.format(self.job_url, part_dir),
shuffle.process_url(
("0", "part://localhost/{}/map-1".format(self.job_url)),
self.data_root,
self.part_info
)
)
eq_(
open(os.path.join(part_path, "part-0")).read(),
'line1\n'
'line2\n'
'line3\n'
'line4\n'
)
def test_process_task(self):
task_result_url = self.mk_task_results('task-1')
part_files = list(shuffle.process_task(
task_result_url,
self.data_root, self.part_info
))
part_url = self.part_info[1]
expected = [
s.format(part_url=part_url) for s in [
"0 {part_url}/part-0\n",
"1 {part_url}/part-1\n",
"0 {part_url}/part-0\n"
]
]
self.assertSequenceEqual(
expected,
part_files
)
def test_merged_index(self):
dir_urls = [self.mk_task_results('task-1')]
m_index = shuffle.merged_index(dir_urls, self.data_root, self.part_info)
part_url = self.part_info[1]
expected = [
s.format(part_url=part_url) for s in [
"0 {part_url}/part-0\n",
"1 {part_url}/part-1\n",
]
]
self.assertSequenceEqual(
set(expected),
m_index
)
def test_combine_tasks(self):
task_results =[
[
"node1",
self.mk_task_results('task-1', "node1",)
],
["node2", self.mk_task_results('task-1', "node2")],
["node1", self.mk_task_results('task-2', "node1")]
]
indexes = list(shuffle.combine_tasks(
data_root=self.data_root,
job=self.job_name,
mode="map",
task_results=task_results
))
import pdb; pdb.set_trace()
pass