From 0408408ab56635a664df164d510732afe30e12b3 Mon Sep 17 00:00:00 2001 From: "jbudorick@chromium.org" Date: Thu, 22 May 2014 11:13:40 +0000 Subject: [PATCH] [Android] Support generic parallel execution across devices. BUG=267773 Review URL: https://codereview.chromium.org/290573004 git-svn-id: http://src.chromium.org/svn/trunk/src/build@272159 4ff67af0-8c30-449e-8e8b-ad334ec8d88c --- android/adb_install_apk.py | 17 +- android/provision_devices.py | 10 +- android/pylib/device/device_utils.py | 61 +++---- android/pylib/utils/parallelizer.py | 196 +++++++++++++++++++++++ android/pylib/utils/parallelizer_test.py | 166 +++++++++++++++++++ android/pylib/utils/reraiser_thread.py | 19 ++- 6 files changed, 416 insertions(+), 53 deletions(-) create mode 100644 android/pylib/utils/parallelizer.py create mode 100644 android/pylib/utils/parallelizer_test.py diff --git a/android/adb_install_apk.py b/android/adb_install_apk.py index 5493439fe..a91510137 100755 --- a/android/adb_install_apk.py +++ b/android/adb_install_apk.py @@ -6,7 +6,6 @@ """Utility script to install APKs from the command line quickly.""" -import multiprocessing import optparse import os import sys @@ -50,13 +49,6 @@ def ValidateInstallAPKOption(option_parser, options): options.apk) -def _InstallApk(args): - apk_path, apk_package, keep_data, device = args - device_utils.DeviceUtils(device=device).old_interface.ManagedInstall( - apk_path, keep_data, apk_package) - print '----- Installed on %s -----' % device - - def main(argv): parser = optparse.OptionParser() AddInstallAPKOption(parser) @@ -73,13 +65,10 @@ def main(argv): if not options.apk_package: options.apk_package = apk_helper.GetPackageName(options.apk) - pool = multiprocessing.Pool(len(devices)) - # Send a tuple (apk_path, apk_package, device) per device. - pool.map(_InstallApk, zip([options.apk] * len(devices), - [options.apk_package] * len(devices), - [options.keep_data] * len(devices), - devices)) + device_utils.DeviceUtils.parallel(devices).old_interface.ManagedInstall( + options.apk, options.keep_data, options.apk_package).pFinish(None) if __name__ == '__main__': sys.exit(main(sys.argv)) + diff --git a/android/provision_devices.py b/android/provision_devices.py index 0549e81ad..834c6313e 100755 --- a/android/provision_devices.py +++ b/android/provision_devices.py @@ -24,6 +24,10 @@ from pylib import device_settings from pylib.cmd_helper import GetCmdOutput from pylib.device import device_utils +sys.path.append(os.path.join(constants.DIR_SOURCE_ROOT, + 'third_party', 'android_testrunner')) +import errors + def KillHostHeartbeat(): ps = subprocess.Popen(['ps', 'aux'], stdout = subprocess.PIPE) stdout, _ = ps.communicate() @@ -172,7 +176,11 @@ def main(argv): for device_serial in devices: device = device_utils.DeviceUtils(device_serial) WipeDeviceData(device) - device_utils.RebootDevices() + try: + (device_utils.DeviceUtils.parallel(devices) + .old_interface.Reboot(True).pFinish(None)) + except errors.DeviceUnresponsiveError: + pass else: ProvisionDevices(options) diff --git a/android/pylib/device/device_utils.py b/android/pylib/device/device_utils.py index ea1019d3d..ff99fccdc 100644 --- a/android/pylib/device/device_utils.py +++ b/android/pylib/device/device_utils.py @@ -9,52 +9,16 @@ Eventually, this will be based on adb_wrapper. """ # pylint: disable=W0613 -import multiprocessing -import os -import sys - import pylib.android_commands from pylib.device import adb_wrapper from pylib.device import decorators from pylib.device import device_errors - -CHROME_SRC_DIR = os.path.abspath( - os.path.join(os.path.dirname(__file__), '..', '..', '..', '..')) -sys.path.append(os.path.join( - CHROME_SRC_DIR, 'third_party', 'android_testrunner')) -import errors +from pylib.utils import parallelizer _DEFAULT_TIMEOUT = 30 _DEFAULT_RETRIES = 3 -# multiprocessing map_async requires a top-level function for pickle library. -def RebootDeviceSafe(device): - """Reboot a device, wait for it to start, and squelch timeout exceptions.""" - try: - DeviceUtils(device).old_interface.Reboot(True) - except errors.DeviceUnresponsiveError as e: - return e - - -def RebootDevices(): - """Reboot all attached and online devices.""" - devices = pylib.android_commands.GetAttachedDevices() - print 'Rebooting: %s' % devices - if devices: - pool = multiprocessing.Pool(len(devices)) - results = pool.map_async(RebootDeviceSafe, devices).get(99999) - - for device, result in zip(devices, results): - if result: - print '%s failed to startup.' % device - - if any(results): - print 'RebootDevices() Warning: %s' % results - else: - print 'Reboots complete.' - - @decorators.WithExplicitTimeoutAndRetries( _DEFAULT_TIMEOUT, _DEFAULT_RETRIES) def GetAVDs(): @@ -150,3 +114,26 @@ class DeviceUtils(object): raise device_errors.CommandFailedError( 'adb root', 'Could not enable root.') + def __str__(self): + """Returns the device serial.""" + return self.old_interface.GetDevice() + + @staticmethod + def parallel(devices): + """ Creates a Parallelizer to operate over the provided list of devices. + + If |devices| is either |None| or an empty list, the Parallelizer will + operate over all attached devices. + + Args: + devices: A list of either DeviceUtils instances or objects from + from which DeviceUtils instances can be constructed. + Returns: + A Parallelizer operating over |devices|. + """ + if not devices or len(devices) == 0: + devices = pylib.android_commands.AndroidCommands.GetAttachedDevices() + return parallelizer.Parallelizer([ + d if isinstance(d, DeviceUtils) else DeviceUtils(d) + for d in devices]) + diff --git a/android/pylib/utils/parallelizer.py b/android/pylib/utils/parallelizer.py new file mode 100644 index 000000000..203dc1108 --- /dev/null +++ b/android/pylib/utils/parallelizer.py @@ -0,0 +1,196 @@ +# Copyright 2014 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +""" Wrapper that allows method execution in parallel. + +This class wraps a list of objects of the same type, emulates their +interface, and executes any functions called on the objects in parallel +in ReraiserThreads. + +This means that, given a list of objects: + + class Foo: + def __init__(self): + self.baz = Baz() + + def bar(self, my_param): + // do something + + list_of_foos = [Foo(1), Foo(2), Foo(3)] + +we can take a sequential operation on that list of objects: + + for f in list_of_foos: + f.bar('Hello') + +and run it in parallel across all of the objects: + + Parallelizer(list_of_foos).bar('Hello') + +It can also handle (non-method) attributes of objects, so that this: + + for f in list_of_foos: + f.baz.myBazMethod() + +can be run in parallel with: + + Parallelizer(list_of_foos).baz.myBazMethod() + +Because it emulates the interface of the wrapped objects, a Parallelizer +can be passed to a method or function that takes objects of that type: + + def DoesSomethingWithFoo(the_foo): + the_foo.bar('Hello') + the_foo.bar('world') + the_foo.baz.myBazMethod + + DoesSomethingWithFoo(Parallelizer(list_of_foos)) + +Note that this class spins up a thread for each object. Using this class +to parallelize operations that are already fast will incur a net performance +penalty. + +""" +# pylint: disable=W0613 + +from pylib.utils import reraiser_thread +from pylib.utils import watchdog_timer + +_DEFAULT_TIMEOUT = 30 +_DEFAULT_RETRIES = 3 + + +class Parallelizer(object): + """Allows parallel execution of method calls across a group of objects.""" + + def __init__(self, objs): + assert (objs is not None and len(objs) > 0), ( + "Passed empty list to 'Parallelizer'") + self._orig_objs = objs + self._objs = objs + + def __getattr__(self, name): + """Emulate getting the |name| attribute of |self|. + + Args: + name: The name of the attribute to retrieve. + Returns: + A Parallelizer emulating the |name| attribute of |self|. + """ + self.pGet(None) + + r = Parallelizer(self._orig_objs) + r._objs = [getattr(o, name) for o in self._objs] + return r + + def __getitem__(self, index): + """Emulate getting the value of |self| at |index|. + + Returns: + A Parallelizer emulating the value of |self| at |index|. + """ + self.pGet(None) + + r = Parallelizer(self._orig_objs) + r._objs = [o[index] for o in self._objs] + return r + + def __call__(self, *args, **kwargs): + """Emulate calling |self| with |args| and |kwargs|. + + Note that this call is asynchronous. Call pFinish on the return value to + block until the call finishes. + + Returns: + A Parallelizer wrapping the ReraiserThreadGroup running the call in + parallel. + Raises: + AttributeError if the wrapped objects aren't callable. + """ + self.pGet(None) + + if not self._objs: + raise AttributeError('Nothing to call.') + for o in self._objs: + if not callable(o): + raise AttributeError("'%s' is not callable" % o.__name__) + + r = Parallelizer(self._orig_objs) + r._objs = reraiser_thread.ReraiserThreadGroup( + [reraiser_thread.ReraiserThread( + o, args=args, kwargs=kwargs, + name='%s.%s' % (str(d), o.__name__)) + for d, o in zip(self._orig_objs, self._objs)]) + r._objs.StartAll() # pylint: disable=W0212 + return r + + def pFinish(self, timeout): + """Finish any outstanding asynchronous operations. + + Args: + timeout: The maximum number of seconds to wait for an individual + result to return, or None to wait forever. + Returns: + self, now emulating the return values. + """ + self._assertNoShadow('pFinish') + if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): + self._objs.JoinAll() + self._objs = self._objs.GetAllReturnValues( + watchdog_timer.WatchdogTimer(timeout)) + return self + + def pGet(self, timeout): + """Get the current wrapped objects. + + Args: + timeout: Same as |pFinish|. + Returns: + A list of the results, in order of the provided devices. + Raises: + Any exception raised by any of the called functions. + """ + self._assertNoShadow('pGet') + self.pFinish(timeout) + return self._objs + + def _assertNoShadow(self, attr_name): + """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts. + + If the wrapped objects _do_ have an |attr_name| attribute, it will be + inaccessible to clients. + + Args: + attr_name: The attribute to check. + Raises: + AssertionError if the wrapped objects have an attribute named 'attr_name' + or '_assertNoShadow'. + """ + if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): + assert(not hasattr(self._objs, '_assertNoShadow')) + assert(not hasattr(self._objs, 'pGet')) + else: + assert(not any(hasattr(o, '_assertNoShadow') for o in self._objs)) + assert(not any(hasattr(o, 'pGet') for o in self._objs)) + + +class SyncParallelizer(Parallelizer): + """A Parallelizer that blocks on function calls.""" + + #override + def __call__(self, *args, **kwargs): + """Emulate calling |self| with |args| and |kwargs|. + + Note that this call is synchronous. + + Returns: + A Parallelizer emulating the value returned from calling |self| with + |args| and |kwargs|. + Raises: + AttributeError if the wrapped objects aren't callable. + """ + r = super(SyncParallelizer, self).__call__(*args, **kwargs) + r.pFinish(None) + return r + diff --git a/android/pylib/utils/parallelizer_test.py b/android/pylib/utils/parallelizer_test.py new file mode 100644 index 000000000..6e0c7e79a --- /dev/null +++ b/android/pylib/utils/parallelizer_test.py @@ -0,0 +1,166 @@ +# Copyright 2014 The Chromium Authors. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Unit tests for the contents of parallelizer.py.""" + +# pylint: disable=W0212 +# pylint: disable=W0613 + +import os +import tempfile +import time +import unittest + +from pylib.utils import parallelizer + + +class ParallelizerTestObject(object): + """Class used to test parallelizer.Parallelizer.""" + + parallel = parallelizer.Parallelizer + + def __init__(self, thing, completion_file_name=None): + self._thing = thing + self._completion_file_name = completion_file_name + self.helper = ParallelizerTestObjectHelper(thing) + + @staticmethod + def doReturn(what): + return what + + @classmethod + def doRaise(cls, what): + raise what + + def doSetTheThing(self, new_thing): + self._thing = new_thing + + def doReturnTheThing(self): + return self._thing + + def doRaiseTheThing(self): + raise self._thing + + def doRaiseIfExceptionElseSleepFor(self, sleep_duration): + if isinstance(self._thing, Exception): + raise self._thing + time.sleep(sleep_duration) + self._write_completion_file() + return self._thing + + def _write_completion_file(self): + if self._completion_file_name and len(self._completion_file_name): + with open(self._completion_file_name, 'w+b') as completion_file: + completion_file.write('complete') + + def __getitem__(self, index): + return self._thing[index] + + def __str__(self): + return type(self).__name__ + + +class ParallelizerTestObjectHelper(object): + + def __init__(self, thing): + self._thing = thing + + def doReturnStringThing(self): + return str(self._thing) + + +class ParallelizerTest(unittest.TestCase): + + def testInitWithNone(self): + with self.assertRaises(AssertionError): + parallelizer.Parallelizer(None) + + def testInitEmptyList(self): + with self.assertRaises(AssertionError): + parallelizer.Parallelizer([]) + + def testMethodCall(self): + test_data = ['abc_foo', 'def_foo', 'ghi_foo'] + expected = ['abc_bar', 'def_bar', 'ghi_bar'] + r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1) + self.assertEquals(expected, r) + + def testMutate(self): + devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)] + self.assertTrue(all(d.doReturnTheThing() for d in devices)) + ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1) + self.assertTrue(not any(d.doReturnTheThing() for d in devices)) + + def testAllReturn(self): + devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)] + results = ParallelizerTestObject.parallel( + devices).doReturnTheThing().pGet(1) + self.assertTrue(isinstance(results, list)) + self.assertEquals(10, len(results)) + self.assertTrue(all(results)) + + def testAllRaise(self): + devices = [ParallelizerTestObject(Exception('thing %d' % i)) + for i in xrange(0, 10)] + p = ParallelizerTestObject.parallel(devices).doRaiseTheThing() + with self.assertRaises(Exception): + p.pGet(1) + + def testOneFailOthersComplete(self): + parallel_device_count = 10 + exception_index = 7 + exception_msg = 'thing %d' % exception_index + + try: + completion_files = [tempfile.NamedTemporaryFile(delete=False) + for _ in xrange(0, parallel_device_count)] + devices = [ + ParallelizerTestObject( + i if i != exception_index else Exception(exception_msg), + completion_files[i].name) + for i in xrange(0, parallel_device_count)] + for f in completion_files: + f.close() + p = ParallelizerTestObject.parallel(devices) + with self.assertRaises(Exception) as e: + p.doRaiseIfExceptionElseSleepFor(2).pGet(3) + self.assertTrue(exception_msg in str(e.exception)) + for i in xrange(0, parallel_device_count): + with open(completion_files[i].name) as f: + if i == exception_index: + self.assertEquals('', f.read()) + else: + self.assertEquals('complete', f.read()) + finally: + for f in completion_files: + os.remove(f.name) + + def testReusable(self): + devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)] + p = ParallelizerTestObject.parallel(devices) + results = p.doReturn(True).pGet(1) + self.assertTrue(all(results)) + results = p.doReturn(True).pGet(1) + self.assertTrue(all(results)) + with self.assertRaises(Exception): + results = p.doRaise(Exception('reusableTest')).pGet(1) + + def testContained(self): + devices = [ParallelizerTestObject(i) for i in xrange(0, 10)] + results = (ParallelizerTestObject.parallel(devices).helper + .doReturnStringThing().pGet(1)) + self.assertTrue(isinstance(results, list)) + self.assertEquals(10, len(results)) + for i in xrange(0, 10): + self.assertEquals(str(i), results[i]) + + def testGetItem(self): + devices = [ParallelizerTestObject(range(i, i+10)) for i in xrange(0, 10)] + results = ParallelizerTestObject.parallel(devices)[9].pGet(1) + self.assertEquals(range(9, 19), results) + + +if __name__ == '__main__': + unittest.main(verbosity=2) + diff --git a/android/pylib/utils/reraiser_thread.py b/android/pylib/utils/reraiser_thread.py index eed72c173..64196b26e 100644 --- a/android/pylib/utils/reraiser_thread.py +++ b/android/pylib/utils/reraiser_thread.py @@ -56,6 +56,7 @@ class ReraiserThread(threading.Thread): self._func = func self._args = args self._kwargs = kwargs + self._ret = None self._exc_info = None def ReraiseIfException(self): @@ -63,11 +64,16 @@ class ReraiserThread(threading.Thread): if self._exc_info: raise self._exc_info[0], self._exc_info[1], self._exc_info[2] + def GetReturnValue(self): + """Reraise exception if present, otherwise get the return value.""" + self.ReraiseIfException() + return self._ret + #override def run(self): """Overrides Thread.run() to add support for reraising exceptions.""" try: - self._func(*self._args, **self._kwargs) + self._ret = self._func(*self._args, **self._kwargs) except: self._exc_info = sys.exc_info() raise @@ -138,3 +144,14 @@ class ReraiserThreadGroup(object): for thread in (t for t in self._threads if t.isAlive()): LogThreadStack(thread) raise + + def GetAllReturnValues(self, watcher=watchdog_timer.WatchdogTimer(None)): + """Get all return values, joining all threads if necessary. + + Args: + watcher: same as in |JoinAll|. Only used if threads are alive. + """ + if any([t.isAlive() for t in self._threads]): + self.JoinAll(watcher) + return [t.GetReturnValue() for t in self._threads] +