Merge pull request #34 from bsmedberg/scriptedoutput

Allow scripts to customize the output.
This commit is contained in:
tarasglek 2013-04-30 13:10:41 -07:00
Родитель 472d0a6c5f 51719bc9f1
Коммит a957b625dc
3 изменённых файлов: 108 добавлений и 36 удалений

Просмотреть файл

@ -29,8 +29,6 @@ class LocalContext:
for v in vlist:
self.result.setdefault(k, []).append(v)
# By default, if the job has a reduce function, we want to print both the key and the value.
# If no reduction is happening, users usually don't care about the key.
def outputwithkey(rlist):
for k, v in rlist:
print "%s\t%s" % (k, v)
@ -39,7 +37,7 @@ def outputnokey(rlist):
for k, v in rlist:
print v
def map_reduce(module, fd):
def map_reduce(module, fd, outputpath):
setupfunc = getattr(module, 'setupjob', None)
mapfunc = getattr(module, 'map', None)
reducefunc = getattr(module, 'reduce', None)
@ -72,23 +70,28 @@ def map_reduce(module, fd):
module.reduce(key, values, reduced_context)
context = reduced_context
# By default, if the job has a reduce function, we want to print both
# the key and the value.
# If no reduction is happening, users usually don't care about the key.
if hasattr(module, 'output'):
outputfunc = module.output
elif reducefunc:
outputfunc = outputwithkey
import jydoop
outputfunc = jydoop.outputWithKey
else:
outputfunc = outputnokey
import jydoop
outputfunc = jydoop.outputWithoutKey
outputfunc(iter(context))
outputfunc(outputpath, iter(context))
if __name__ == '__main__':
import imp, sys, os
if len(sys.argv) != 3:
print >>sys.stderr, "Usage: FileDriver.py <jobscript.py> <input.data or ->"
if len(sys.argv) != 4:
print >>sys.stderr, "Usage: FileDriver.py <jobscript.py> <input.data or -> <outputpath>"
sys.exit(1)
modulepath, filepath = sys.argv[1:]
modulepath, filepath, outputpath = sys.argv[1:]
if filepath == "-":
fd = sys.stdin
@ -101,4 +104,4 @@ if __name__ == '__main__':
sys.path.insert(0, os.path.dirname(modulepath))
module = imp.load_module('pydoop_main', modulefd, modulepath, ('.py', 'U', 1))
map_reduce(module, fd)
map_reduce(module, fd, outputpath)

Просмотреть файл

@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configured;
import org.python.core.Py;
import org.python.core.PyObject;
import org.python.core.PyIterator;
import org.python.core.PyTuple;
import org.python.core.util.StringUtil;
import org.apache.commons.lang.StringUtils;
@ -212,10 +213,10 @@ public class HBaseDriver extends Configured implements Tool {
String outPath = args[1];
Path outdir = new Path(outPath);
Configuration conf = getConf();
final Configuration conf = getConf();
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec");
FileSystem fs = FileSystem.get(conf);
final FileSystem fs = FileSystem.get(conf);
String jobname = "HBaseDriver: " + StringUtils.join(args, " ");
@ -223,7 +224,6 @@ public class HBaseDriver extends Configured implements Tool {
job.setJarByClass(HBaseDriver.class); // class that contains mapper
try {
fs.delete(outdir, true);
System.err.println("Deleted old " + outPath);
} catch(Exception e) {
}
FileOutputFormat.setOutputPath(job, outdir); // adjust directories as required
@ -258,32 +258,56 @@ public class HBaseDriver extends Configured implements Tool {
return 1;
}
// Now read the hadoop files and output a single local file from the results
FileOutputStream outfs = new FileOutputStream(outPath);
PrintStream outps = new PrintStream(outfs);
// Now read the hadoop files and call the output function
FileStatus[] files = fs.listStatus(outdir);
for (FileStatus file : files) {
if (file.getLen() == 0) {
continue;
final FileStatus[] files = fs.listStatus(outdir);
class KeyValueIterator extends PyIterator
{
int index;
SequenceFile.Reader r;
public KeyValueIterator() {
index = 0;
}
SequenceFile.Reader r = new SequenceFile.Reader(fs, file.getPath(), conf);
PythonKey k = new PythonKey();
PythonValue v = new PythonValue();
while (r.next(k, v)) {
// If this is a map-only job, the keys are usually not valuable so we default
// to printing only the value.
// TODO: tab-delimit tuples instead of just .toString on them
if (!maponly) {
outps.print(k.toString());
outps.print('\t');
public PyObject __iternext__()
{
PythonKey k = new PythonKey();
PythonValue v = new PythonValue();
try {
for ( ; index < files.length; r = null, ++index) {
if (r == null) {
if (files[index].getLen() == 0) {
continue;
}
r = new SequenceFile.Reader(fs, files[index].getPath(), conf);
}
if (r.next(k, v)) {
return new PyTuple(k.value, v.value);
}
}
}
outps.print(v.toString());
outps.println();
catch (IOException e) {
throw Py.IOError(e);
}
return null;
}
}
outps.close();
PyObject outputfunc = module.getFunction("output");
if (outputfunc == null) {
if (maponly) {
outputfunc = org.python.core.imp.load("jydoop").__getattr__("outputWithoutKey");
} else {
outputfunc = org.python.core.imp.load("jydoop").__getattr__("outputWithKey");
}
}
outputfunc.__call__(Py.newString(outPath), new KeyValueIterator());
// If we got here, the temporary files are irrelevant. Delete them.
fs.delete(outdir, true);
return 0;
}

Просмотреть файл

@ -2,6 +2,8 @@
Standard library of useful things for jydoop scripts.
"""
import csv
def isJython():
import platform
return platform.system() == 'Java'
@ -13,10 +15,10 @@ def sumreducer(k, vlist, cx):
"""
cx.write(k, sum(vlist))
"""
Read something out of driver.jar
"""
def getResource(path):
"""
Read something out of driver.jar
"""
if isJython():
import org.mozilla.jydoop.PythonValue as PythonValue
import org.python.core.util.FileUtil as FileUtil
@ -25,3 +27,46 @@ def getResource(path):
# Python case
f = open(path, 'r')
return f.read()
def unwrap(l, v):
"""
Unwrap a value into a list. Dicts are added in their repr form.
"""
if isinstance(v, (tuple, list)):
for e in v:
unwrap(l, e)
elif isinstance(v, dict):
l.append(repr(v))
else:
l.append(v)
def outputWithKey(path, results):
"""
Output key/values into a reasonable CSV.
All lists/tuples are unwrapped.
"""
f = open(path, 'w')
w = csv.writer(f)
for k, v in results:
l = []
unwrap(l, k)
unwrap(l, v)
w.writerow(l)
def outputWithoutKey(path, results):
"""
Output values into a reasonable text file. If the values are simple,
they are printed directly. If they are complex tuples/lists, they are
printed as csv.
"""
f = open(path, 'w')
w = csv.writer(f)
for k, v in results:
l = []
unwrap(l, v)
if len(l) == 1:
print >>f, v
else:
csv.writerow(l)