Bug 795360 - Make dump_syms from symbolstore.py run in parallel on multi-core machines. r=ted

This commit is contained in:
Benedict Singer 2012-12-11 09:52:53 -05:00
Родитель ed8d21619c
Коммит 79c5d7e390
2 изменённых файлов: 202 добавлений и 49 удалений

Просмотреть файл

@ -29,6 +29,8 @@ import textwrap
import fnmatch
import subprocess
import urlparse
import multiprocessing
import collections
from optparse import OptionParser
from xml.dom.minidom import parse
@ -303,6 +305,11 @@ def SourceIndex(fileStream, outputPath, vcs_root):
pdbStreamFile.close()
return result
def StartProcessFilesWork(dumper, files, arch_num, arch, vcs_root, after, after_arg):
"""multiprocessing can't handle methods as Process targets, so we define
a simple wrapper function around the work method."""
return dumper.ProcessFilesWork(files, arch_num, arch, vcs_root, after, after_arg)
class Dumper:
"""This class can dump symbols from a file with debug info, and
store the output in a directory structure that is valid for use as
@ -317,7 +324,13 @@ class Dumper:
You don't want to use this directly if you intend to call
ProcessDir. Instead, call GetPlatformSpecificDumper to
get an instance of a subclass."""
get an instance of a subclass.
Processing is performed asynchronously via worker processes; in
order to wait for processing to finish and cleanup correctly, you
must call Finish after all Process/ProcessDir calls have been made.
You must also call Dumper.GlobalInit before creating or using any
instances."""
def __init__(self, dump_syms, symbol_path,
archs=None,
srcdirs=[],
@ -342,6 +355,59 @@ class Dumper:
if repo_manifest:
self.parse_repo_manifest(repo_manifest)
# book-keeping to keep track of our jobs and the cleanup work per file tuple
self.files_record = {}
self.jobs_record = collections.defaultdict(int)
@classmethod
def GlobalInit(cls, module=multiprocessing):
"""Initialize the class globals for the multiprocessing setup; must
be called before any Dumper instances are created and used. Test cases
may pass in a different module to supply Manager and Pool objects,
usually multiprocessing.dummy."""
num_cpus = module.cpu_count()
if num_cpus is None:
# assume a dual core machine if we can't find out for some reason
# probably better on single core anyway due to I/O constraints
num_cpus = 2
# have to create any locks etc before the pool
cls.manager = module.Manager()
cls.jobs_condition = Dumper.manager.Condition()
cls.lock = Dumper.manager.RLock()
cls.pool = module.Pool(num_cpus)
def JobStarted(self, file_key):
"""Increments the number of submitted jobs for the specified key file,
defined as the original file we processed; note that a single key file
can generate up to 1 + len(self.archs) jobs in the Mac case."""
with Dumper.jobs_condition:
self.jobs_record[file_key] += 1
Dumper.jobs_condition.notify_all()
def JobFinished(self, file_key):
"""Decrements the number of submitted jobs for the specified key file,
defined as the original file we processed; once the count is back to 0,
remove the entry from our record."""
with Dumper.jobs_condition:
self.jobs_record[file_key] -= 1
if self.jobs_record[file_key] == 0:
del self.jobs_record[file_key]
Dumper.jobs_condition.notify_all()
def output(self, dest, output_str):
"""Writes |output_str| to |dest|, holding |lock|;
terminates with a newline."""
with Dumper.lock:
dest.write(output_str + "\n")
dest.flush()
def output_pid(self, dest, output_str):
"""Debugging output; prepends the pid to the string."""
self.output(dest, "%d: %s" % (os.getpid(), output_str))
def parse_repo_manifest(self, repo_manifest):
"""
Parse an XML manifest of repository info as produced
@ -417,19 +483,30 @@ class Dumper:
def CopyDebug(self, file, debug_file, guid):
pass
def Finish(self, stop_pool=True):
"""Wait for the expected number of jobs to be submitted, and then
wait for the pool to finish processing them. By default, will close
and clear the pool, but for testcases that need multiple runs, pass
stop_pool = False."""
with Dumper.jobs_condition:
while len(self.jobs_record) != 0:
Dumper.jobs_condition.wait()
if stop_pool:
Dumper.pool.close()
Dumper.pool.join()
def Process(self, file_or_dir):
"Process a file or all the (valid) files in a directory."
"""Process a file or all the (valid) files in a directory; processing is performed
asynchronously, and Finish must be called to wait for it complete and cleanup."""
if os.path.isdir(file_or_dir) and not self.ShouldSkipDir(file_or_dir):
return self.ProcessDir(file_or_dir)
self.ProcessDir(file_or_dir)
elif os.path.isfile(file_or_dir):
return self.ProcessFile(file_or_dir)
# maybe it doesn't exist?
return False
self.ProcessFiles((file_or_dir,))
def ProcessDir(self, dir):
"""Process all the valid files in this directory. Valid files
are determined by calling ShouldProcess."""
result = True
are determined by calling ShouldProcess; processing is performed
asynchronously, and Finish must be called to wait for it complete and cleanup."""
for root, dirs, files in os.walk(dir):
for d in dirs[:]:
if self.ShouldSkipDir(d):
@ -437,21 +514,48 @@ class Dumper:
for f in files:
fullpath = os.path.join(root, f)
if self.ShouldProcess(fullpath):
if not self.ProcessFile(fullpath):
result = False
return result
self.ProcessFiles((fullpath,))
def SubmitJob(self, file_key, func, args, callback):
"""Submits a job to the pool of workers; increments the number of submitted jobs."""
self.JobStarted(file_key)
Dumper.pool.apply_async(func, args=args, callback=callback)
def ProcessFilesFinished(self, res):
"""Callback from multiprocesing when ProcessFilesWork finishes;
run the cleanup work, if any"""
self.JobFinished(res['files'][-1])
# only run the cleanup function once per tuple of files
self.files_record[res['files']] += 1
if self.files_record[res['files']] == len(self.archs):
del self.files_record[res['files']]
if res['after']:
res['after'](res['status'], res['after_arg'])
def ProcessFiles(self, files, after=None, after_arg=None):
"""Dump symbols from these files into a symbol file, stored
in the proper directory structure in |symbol_path|; processing is performed
asynchronously, and Finish must be called to wait for it complete and cleanup.
All files after the first are fallbacks in case the first file does not process
successfully; if it does, no other files will be touched."""
self.output_pid(sys.stderr, "Submitting jobs for files: %s" % str(files))
def ProcessFile(self, file):
"""Dump symbols from this file into a symbol file, stored
in the proper directory structure in |symbol_path|."""
print >> sys.stderr, "Processing file: %s" % file
sys.stderr.flush()
result = False
sourceFileStream = ''
# tries to get the vcs root from the .mozconfig first - if it's not set
# the tinderbox vcs path will be assigned further down
vcs_root = os.environ.get("SRCSRV_ROOT")
for arch_num, arch in enumerate(self.archs):
self.files_record[files] = 0 # record that we submitted jobs for this tuple of files
self.SubmitJob(files[-1], StartProcessFilesWork, args=(self, files, arch_num, arch, vcs_root, after, after_arg), callback=self.ProcessFilesFinished)
def ProcessFilesWork(self, files, arch_num, arch, vcs_root, after, after_arg):
self.output_pid(sys.stderr, "Worker processing files: %s" % (files,))
# our result is a status, a cleanup function, an argument to that function, and the tuple of files we were called on
result = { 'status' : False, 'after' : after, 'after_arg' : after_arg, 'files' : files }
sourceFileStream = ''
for file in files:
# files is a tuple of files, containing fallbacks in case the first file doesn't process successfully
try:
proc = subprocess.Popen([self.dump_syms] + arch.split() + [file],
stdout=subprocess.PIPE)
@ -501,12 +605,12 @@ class Dumper:
# pass through all other lines unchanged
f.write(line)
# we want to return true only if at least one line is not a MODULE or FILE line
result = True
result['status'] = True
f.close()
proc.wait()
# we output relative paths so callers can get a list of what
# was generated
print rel_path
self.output(sys.stdout, rel_path)
if self.srcsrv and vcs_root:
# add source server indexing to the pdb file
self.SourceServerIndexing(file, guid, sourceFileStream, vcs_root)
@ -515,9 +619,12 @@ class Dumper:
self.CopyDebug(file, debug_file, guid)
except StopIteration:
pass
except:
print >> sys.stderr, "Unexpected error: ", sys.exc_info()[0]
except e:
self.output(sys.stderr, "Unexpected error: %s" % (str(e),))
raise
if result['status']:
# we only need 1 file to work
break
return result
# Platform-specific subclasses. For the most part, these just have
@ -576,9 +683,9 @@ class Dumper_Win32(Dumper):
stdout=open("NUL:","w"), stderr=subprocess.STDOUT)
if success == 0 and os.path.exists(compressed_file):
os.unlink(full_path)
print os.path.splitext(rel_path)[0] + ".pd_"
self.output(sys.stdout, os.path.splitext(rel_path)[0] + ".pd_")
else:
print rel_path
self.output(sys.stdout, rel_path)
def SourceServerIndexing(self, debug_file, guid, sourceFileStream, vcs_root):
# Creates a .pdb.stream file in the mozilla\objdir to be used for source indexing
@ -625,7 +732,7 @@ class Dumper_Linux(Dumper):
shutil.move(file_dbg, full_path)
# gzip the shipped debug files
os.system("gzip %s" % full_path)
print rel_path + ".gz"
self.output(sys.stdout, rel_path + ".gz")
else:
if os.path.isfile(file_dbg):
os.unlink(file_dbg)
@ -650,6 +757,16 @@ class Dumper_Solaris(Dumper):
return self.RunFileCommand(file).startswith("ELF")
return False
def StartProcessFilesWorkMac(dumper, file):
"""multiprocessing can't handle methods as Process targets, so we define
a simple wrapper function around the work method."""
return dumper.ProcessFilesWorkMac(file)
def AfterMac(status, dsymbundle):
"""Cleanup function to run on Macs after we process the file(s)."""
# CopyDebug will already have been run from Dumper.ProcessFiles
shutil.rmtree(dsymbundle)
class Dumper_Mac(Dumper):
def ShouldProcess(self, file):
"""This function will allow processing of files that are
@ -671,10 +788,28 @@ class Dumper_Mac(Dumper):
return True
return False
def ProcessFile(self, file):
def ProcessFiles(self, files, after=None, after_arg=None):
# also note, files must be len 1 here, since we're the only ones
# that ever add more than one file to the list
self.output_pid(sys.stderr, "Submitting job for Mac pre-processing on file: %s" % (files[0]))
self.SubmitJob(files[0], StartProcessFilesWorkMac, args=(self, files[0]), callback=self.ProcessFilesMacFinished)
def ProcessFilesMacFinished(self, result):
if result['status']:
# kick off new jobs per-arch with our new list of files
Dumper.ProcessFiles(self, result['files'], after=AfterMac, after_arg=result['files'][0])
# only decrement jobs *after* that, since otherwise we'll remove the record for this file
self.JobFinished(result['files'][-1])
def ProcessFilesWorkMac(self, file):
"""dump_syms on Mac needs to be run on a dSYM bundle produced
by dsymutil(1), so run dsymutil here and pass the bundle name
down to the superclass method instead."""
self.output_pid(sys.stderr, "Worker running Mac pre-processing on file: %s" % (file,))
# our return is a status and a tuple of files to dump symbols for
# the extra files are fallbacks; as soon as one is dumped successfully, we stop
result = { 'status' : False, 'files' : None, 'file_key' : file }
dsymbundle = file + ".dSYM"
if os.path.exists(dsymbundle):
shutil.rmtree(dsymbundle)
@ -684,20 +819,15 @@ class Dumper_Mac(Dumper):
stdout=open("/dev/null","w"))
if not os.path.exists(dsymbundle):
# dsymutil won't produce a .dSYM for files without symbols
return False
res = Dumper.ProcessFile(self, dsymbundle)
# CopyDebug will already have been run from Dumper.ProcessFile
shutil.rmtree(dsymbundle)
result['status'] = False
return result
# fallback for DWARF-less binaries
if not res:
print >> sys.stderr, "Couldn't read DWARF symbols in: %s" % dsymbundle
res = Dumper.ProcessFile(self, file)
return res
result['status'] = True
result['files'] = (dsymbundle, file)
return result
def CopyDebug(self, file, debug_file, guid):
"""ProcessFile has already produced a dSYM bundle, so we should just
"""ProcessFiles has already produced a dSYM bundle, so we should just
copy that to the destination directory. However, we'll package it
into a .tar.bz2 because the debug symbols are pretty huge, and
also because it's a bundle, so it's a directory. |file| here is the
@ -711,7 +841,7 @@ class Dumper_Mac(Dumper):
cwd=os.path.dirname(file),
stdout=open("/dev/null","w"), stderr=subprocess.STDOUT)
if success == 0 and os.path.exists(full_path):
print rel_path
self.output(sys.stdout, rel_path)
# Entry point if called as a standalone program
def main():
@ -763,7 +893,12 @@ produced by the `repo manifest -r` command.
repo_manifest=options.repo_manifest)
for arg in args[2:]:
dumper.Process(arg)
dumper.Finish()
# run main if run directly
if __name__ == "__main__":
# set up the multiprocessing infrastructure before we start;
# note that this needs to be in the __main__ guard, or else Windows will choke
Dumper.GlobalInit()
main()

Просмотреть файл

@ -3,7 +3,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os, tempfile, unittest, shutil, struct, platform, subprocess
import os, tempfile, unittest, shutil, struct, platform, subprocess, multiprocessing.dummy
import mock
from mock import patch
import symbolstore
@ -66,15 +66,17 @@ class TestExclude(HelperMixin, unittest.TestCase):
Test that using an exclude list with a wildcard pattern works.
"""
processed = []
def mock_process_file(filename):
processed.append((filename[len(self.test_dir):] if filename.startswith(self.test_dir) else filename).replace('\\', '/'))
def mock_process_file(filenames):
for filename in filenames:
processed.append((filename[len(self.test_dir):] if filename.startswith(self.test_dir) else filename).replace('\\', '/'))
return True
self.add_test_files(add_extension(["foo", "bar", "abc/xyz", "abc/fooxyz", "def/asdf", "def/xyzfoo"]))
d = symbolstore.GetPlatformSpecificDumper(dump_syms="dump_syms",
symbol_path="symbol_path",
exclude=["*foo*"])
d.ProcessFile = mock_process_file
self.assertTrue(d.Process(self.test_dir))
d.ProcessFiles = mock_process_file
d.Process(self.test_dir)
d.Finish(stop_pool=False)
processed.sort()
expected = add_extension(["bar", "abc/xyz", "def/asdf"])
expected.sort()
@ -85,15 +87,17 @@ class TestExclude(HelperMixin, unittest.TestCase):
Test that excluding a filename without a wildcard works.
"""
processed = []
def mock_process_file(filename):
processed.append((filename[len(self.test_dir):] if filename.startswith(self.test_dir) else filename).replace('\\', '/'))
def mock_process_file(filenames):
for filename in filenames:
processed.append((filename[len(self.test_dir):] if filename.startswith(self.test_dir) else filename).replace('\\', '/'))
return True
self.add_test_files(add_extension(["foo", "bar", "abc/foo", "abc/bar", "def/foo", "def/bar"]))
d = symbolstore.GetPlatformSpecificDumper(dump_syms="dump_syms",
symbol_path="symbol_path",
exclude=add_extension(["foo"]))
d.ProcessFile = mock_process_file
self.assertTrue(d.Process(self.test_dir))
d.ProcessFiles = mock_process_file
d.Process(self.test_dir)
d.Finish(stop_pool=False)
processed.sort()
expected = add_extension(["bar", "abc/bar", "def/bar"])
expected.sort()
@ -129,13 +133,19 @@ class TestCopyDebugUniversal(HelperMixin, unittest.TestCase):
self._subprocess_popen = subprocess.Popen
subprocess.Popen = popen_factory(self.next_mock_stdout())
self.stdouts = []
self._shutil_rmtree = shutil.rmtree
shutil.rmtree = self.mock_rmtree
def tearDown(self):
HelperMixin.tearDown(self)
shutil.rmtree = self._shutil_rmtree
shutil.rmtree(self.symbol_dir)
subprocess.call = self._subprocess_call
subprocess.Popen = self._subprocess_popen
def mock_rmtree(self, path):
pass
def mock_call(self, args, **kwargs):
if args[0].endswith("dsymutil"):
filename = args[-1]
@ -164,7 +174,8 @@ class TestCopyDebugUniversal(HelperMixin, unittest.TestCase):
copy_debug=True,
archs="abc xyz")
d.CopyDebug = mock_copy_debug
self.assertTrue(d.Process(self.test_dir))
d.Process(self.test_dir)
d.Finish(stop_pool=False)
self.assertEqual(1, len(copied))
class TestGetVCSFilename(HelperMixin, unittest.TestCase):
@ -231,4 +242,11 @@ class TestRepoManifest(HelperMixin, unittest.TestCase):
symbolstore.GetVCSFilename(file3, d.srcdirs)[0])
if __name__ == '__main__':
unittest.main()
# use the multiprocessing.dummy module to use threading wrappers so
# that our mocking/module-patching works
symbolstore.Dumper.GlobalInit(module=multiprocessing.dummy)
unittest.main()
symbolstore.Dumper.pool.close()
symbolstore.Dumper.pool.join()