[Android] Support generic parallel execution across devices.

BUG=267773

Review URL: https://codereview.chromium.org/290573004

git-svn-id: http://src.chromium.org/svn/trunk/src/build@272159 4ff67af0-8c30-449e-8e8b-ad334ec8d88c
This commit is contained in:
jbudorick@chromium.org 2014-05-22 11:13:40 +00:00
Родитель b9287efd01
Коммит 0408408ab5
6 изменённых файлов: 416 добавлений и 53 удалений

Просмотреть файл

@ -6,7 +6,6 @@
"""Utility script to install APKs from the command line quickly.""" """Utility script to install APKs from the command line quickly."""
import multiprocessing
import optparse import optparse
import os import os
import sys import sys
@ -50,13 +49,6 @@ def ValidateInstallAPKOption(option_parser, options):
options.apk) options.apk)
def _InstallApk(args):
apk_path, apk_package, keep_data, device = args
device_utils.DeviceUtils(device=device).old_interface.ManagedInstall(
apk_path, keep_data, apk_package)
print '----- Installed on %s -----' % device
def main(argv): def main(argv):
parser = optparse.OptionParser() parser = optparse.OptionParser()
AddInstallAPKOption(parser) AddInstallAPKOption(parser)
@ -73,13 +65,10 @@ def main(argv):
if not options.apk_package: if not options.apk_package:
options.apk_package = apk_helper.GetPackageName(options.apk) options.apk_package = apk_helper.GetPackageName(options.apk)
pool = multiprocessing.Pool(len(devices)) device_utils.DeviceUtils.parallel(devices).old_interface.ManagedInstall(
# Send a tuple (apk_path, apk_package, device) per device. options.apk, options.keep_data, options.apk_package).pFinish(None)
pool.map(_InstallApk, zip([options.apk] * len(devices),
[options.apk_package] * len(devices),
[options.keep_data] * len(devices),
devices))
if __name__ == '__main__': if __name__ == '__main__':
sys.exit(main(sys.argv)) sys.exit(main(sys.argv))

Просмотреть файл

@ -24,6 +24,10 @@ from pylib import device_settings
from pylib.cmd_helper import GetCmdOutput from pylib.cmd_helper import GetCmdOutput
from pylib.device import device_utils from pylib.device import device_utils
sys.path.append(os.path.join(constants.DIR_SOURCE_ROOT,
'third_party', 'android_testrunner'))
import errors
def KillHostHeartbeat(): def KillHostHeartbeat():
ps = subprocess.Popen(['ps', 'aux'], stdout = subprocess.PIPE) ps = subprocess.Popen(['ps', 'aux'], stdout = subprocess.PIPE)
stdout, _ = ps.communicate() stdout, _ = ps.communicate()
@ -172,7 +176,11 @@ def main(argv):
for device_serial in devices: for device_serial in devices:
device = device_utils.DeviceUtils(device_serial) device = device_utils.DeviceUtils(device_serial)
WipeDeviceData(device) WipeDeviceData(device)
device_utils.RebootDevices() try:
(device_utils.DeviceUtils.parallel(devices)
.old_interface.Reboot(True).pFinish(None))
except errors.DeviceUnresponsiveError:
pass
else: else:
ProvisionDevices(options) ProvisionDevices(options)

Просмотреть файл

@ -9,52 +9,16 @@ Eventually, this will be based on adb_wrapper.
""" """
# pylint: disable=W0613 # pylint: disable=W0613
import multiprocessing
import os
import sys
import pylib.android_commands import pylib.android_commands
from pylib.device import adb_wrapper from pylib.device import adb_wrapper
from pylib.device import decorators from pylib.device import decorators
from pylib.device import device_errors from pylib.device import device_errors
from pylib.utils import parallelizer
CHROME_SRC_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', '..', '..', '..'))
sys.path.append(os.path.join(
CHROME_SRC_DIR, 'third_party', 'android_testrunner'))
import errors
_DEFAULT_TIMEOUT = 30 _DEFAULT_TIMEOUT = 30
_DEFAULT_RETRIES = 3 _DEFAULT_RETRIES = 3
# multiprocessing map_async requires a top-level function for pickle library.
def RebootDeviceSafe(device):
"""Reboot a device, wait for it to start, and squelch timeout exceptions."""
try:
DeviceUtils(device).old_interface.Reboot(True)
except errors.DeviceUnresponsiveError as e:
return e
def RebootDevices():
"""Reboot all attached and online devices."""
devices = pylib.android_commands.GetAttachedDevices()
print 'Rebooting: %s' % devices
if devices:
pool = multiprocessing.Pool(len(devices))
results = pool.map_async(RebootDeviceSafe, devices).get(99999)
for device, result in zip(devices, results):
if result:
print '%s failed to startup.' % device
if any(results):
print 'RebootDevices() Warning: %s' % results
else:
print 'Reboots complete.'
@decorators.WithExplicitTimeoutAndRetries( @decorators.WithExplicitTimeoutAndRetries(
_DEFAULT_TIMEOUT, _DEFAULT_RETRIES) _DEFAULT_TIMEOUT, _DEFAULT_RETRIES)
def GetAVDs(): def GetAVDs():
@ -150,3 +114,26 @@ class DeviceUtils(object):
raise device_errors.CommandFailedError( raise device_errors.CommandFailedError(
'adb root', 'Could not enable root.') 'adb root', 'Could not enable root.')
def __str__(self):
"""Returns the device serial."""
return self.old_interface.GetDevice()
@staticmethod
def parallel(devices):
""" Creates a Parallelizer to operate over the provided list of devices.
If |devices| is either |None| or an empty list, the Parallelizer will
operate over all attached devices.
Args:
devices: A list of either DeviceUtils instances or objects from
from which DeviceUtils instances can be constructed.
Returns:
A Parallelizer operating over |devices|.
"""
if not devices or len(devices) == 0:
devices = pylib.android_commands.AndroidCommands.GetAttachedDevices()
return parallelizer.Parallelizer([
d if isinstance(d, DeviceUtils) else DeviceUtils(d)
for d in devices])

Просмотреть файл

@ -0,0 +1,196 @@
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
""" Wrapper that allows method execution in parallel.
This class wraps a list of objects of the same type, emulates their
interface, and executes any functions called on the objects in parallel
in ReraiserThreads.
This means that, given a list of objects:
class Foo:
def __init__(self):
self.baz = Baz()
def bar(self, my_param):
// do something
list_of_foos = [Foo(1), Foo(2), Foo(3)]
we can take a sequential operation on that list of objects:
for f in list_of_foos:
f.bar('Hello')
and run it in parallel across all of the objects:
Parallelizer(list_of_foos).bar('Hello')
It can also handle (non-method) attributes of objects, so that this:
for f in list_of_foos:
f.baz.myBazMethod()
can be run in parallel with:
Parallelizer(list_of_foos).baz.myBazMethod()
Because it emulates the interface of the wrapped objects, a Parallelizer
can be passed to a method or function that takes objects of that type:
def DoesSomethingWithFoo(the_foo):
the_foo.bar('Hello')
the_foo.bar('world')
the_foo.baz.myBazMethod
DoesSomethingWithFoo(Parallelizer(list_of_foos))
Note that this class spins up a thread for each object. Using this class
to parallelize operations that are already fast will incur a net performance
penalty.
"""
# pylint: disable=W0613
from pylib.utils import reraiser_thread
from pylib.utils import watchdog_timer
_DEFAULT_TIMEOUT = 30
_DEFAULT_RETRIES = 3
class Parallelizer(object):
"""Allows parallel execution of method calls across a group of objects."""
def __init__(self, objs):
assert (objs is not None and len(objs) > 0), (
"Passed empty list to 'Parallelizer'")
self._orig_objs = objs
self._objs = objs
def __getattr__(self, name):
"""Emulate getting the |name| attribute of |self|.
Args:
name: The name of the attribute to retrieve.
Returns:
A Parallelizer emulating the |name| attribute of |self|.
"""
self.pGet(None)
r = Parallelizer(self._orig_objs)
r._objs = [getattr(o, name) for o in self._objs]
return r
def __getitem__(self, index):
"""Emulate getting the value of |self| at |index|.
Returns:
A Parallelizer emulating the value of |self| at |index|.
"""
self.pGet(None)
r = Parallelizer(self._orig_objs)
r._objs = [o[index] for o in self._objs]
return r
def __call__(self, *args, **kwargs):
"""Emulate calling |self| with |args| and |kwargs|.
Note that this call is asynchronous. Call pFinish on the return value to
block until the call finishes.
Returns:
A Parallelizer wrapping the ReraiserThreadGroup running the call in
parallel.
Raises:
AttributeError if the wrapped objects aren't callable.
"""
self.pGet(None)
if not self._objs:
raise AttributeError('Nothing to call.')
for o in self._objs:
if not callable(o):
raise AttributeError("'%s' is not callable" % o.__name__)
r = Parallelizer(self._orig_objs)
r._objs = reraiser_thread.ReraiserThreadGroup(
[reraiser_thread.ReraiserThread(
o, args=args, kwargs=kwargs,
name='%s.%s' % (str(d), o.__name__))
for d, o in zip(self._orig_objs, self._objs)])
r._objs.StartAll() # pylint: disable=W0212
return r
def pFinish(self, timeout):
"""Finish any outstanding asynchronous operations.
Args:
timeout: The maximum number of seconds to wait for an individual
result to return, or None to wait forever.
Returns:
self, now emulating the return values.
"""
self._assertNoShadow('pFinish')
if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
self._objs.JoinAll()
self._objs = self._objs.GetAllReturnValues(
watchdog_timer.WatchdogTimer(timeout))
return self
def pGet(self, timeout):
"""Get the current wrapped objects.
Args:
timeout: Same as |pFinish|.
Returns:
A list of the results, in order of the provided devices.
Raises:
Any exception raised by any of the called functions.
"""
self._assertNoShadow('pGet')
self.pFinish(timeout)
return self._objs
def _assertNoShadow(self, attr_name):
"""Ensures that |attr_name| isn't shadowing part of the wrapped obejcts.
If the wrapped objects _do_ have an |attr_name| attribute, it will be
inaccessible to clients.
Args:
attr_name: The attribute to check.
Raises:
AssertionError if the wrapped objects have an attribute named 'attr_name'
or '_assertNoShadow'.
"""
if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup):
assert(not hasattr(self._objs, '_assertNoShadow'))
assert(not hasattr(self._objs, 'pGet'))
else:
assert(not any(hasattr(o, '_assertNoShadow') for o in self._objs))
assert(not any(hasattr(o, 'pGet') for o in self._objs))
class SyncParallelizer(Parallelizer):
"""A Parallelizer that blocks on function calls."""
#override
def __call__(self, *args, **kwargs):
"""Emulate calling |self| with |args| and |kwargs|.
Note that this call is synchronous.
Returns:
A Parallelizer emulating the value returned from calling |self| with
|args| and |kwargs|.
Raises:
AttributeError if the wrapped objects aren't callable.
"""
r = super(SyncParallelizer, self).__call__(*args, **kwargs)
r.pFinish(None)
return r

Просмотреть файл

@ -0,0 +1,166 @@
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for the contents of parallelizer.py."""
# pylint: disable=W0212
# pylint: disable=W0613
import os
import tempfile
import time
import unittest
from pylib.utils import parallelizer
class ParallelizerTestObject(object):
"""Class used to test parallelizer.Parallelizer."""
parallel = parallelizer.Parallelizer
def __init__(self, thing, completion_file_name=None):
self._thing = thing
self._completion_file_name = completion_file_name
self.helper = ParallelizerTestObjectHelper(thing)
@staticmethod
def doReturn(what):
return what
@classmethod
def doRaise(cls, what):
raise what
def doSetTheThing(self, new_thing):
self._thing = new_thing
def doReturnTheThing(self):
return self._thing
def doRaiseTheThing(self):
raise self._thing
def doRaiseIfExceptionElseSleepFor(self, sleep_duration):
if isinstance(self._thing, Exception):
raise self._thing
time.sleep(sleep_duration)
self._write_completion_file()
return self._thing
def _write_completion_file(self):
if self._completion_file_name and len(self._completion_file_name):
with open(self._completion_file_name, 'w+b') as completion_file:
completion_file.write('complete')
def __getitem__(self, index):
return self._thing[index]
def __str__(self):
return type(self).__name__
class ParallelizerTestObjectHelper(object):
def __init__(self, thing):
self._thing = thing
def doReturnStringThing(self):
return str(self._thing)
class ParallelizerTest(unittest.TestCase):
def testInitWithNone(self):
with self.assertRaises(AssertionError):
parallelizer.Parallelizer(None)
def testInitEmptyList(self):
with self.assertRaises(AssertionError):
parallelizer.Parallelizer([])
def testMethodCall(self):
test_data = ['abc_foo', 'def_foo', 'ghi_foo']
expected = ['abc_bar', 'def_bar', 'ghi_bar']
r = parallelizer.Parallelizer(test_data).replace('_foo', '_bar').pGet(0.1)
self.assertEquals(expected, r)
def testMutate(self):
devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
self.assertTrue(all(d.doReturnTheThing() for d in devices))
ParallelizerTestObject.parallel(devices).doSetTheThing(False).pFinish(1)
self.assertTrue(not any(d.doReturnTheThing() for d in devices))
def testAllReturn(self):
devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
results = ParallelizerTestObject.parallel(
devices).doReturnTheThing().pGet(1)
self.assertTrue(isinstance(results, list))
self.assertEquals(10, len(results))
self.assertTrue(all(results))
def testAllRaise(self):
devices = [ParallelizerTestObject(Exception('thing %d' % i))
for i in xrange(0, 10)]
p = ParallelizerTestObject.parallel(devices).doRaiseTheThing()
with self.assertRaises(Exception):
p.pGet(1)
def testOneFailOthersComplete(self):
parallel_device_count = 10
exception_index = 7
exception_msg = 'thing %d' % exception_index
try:
completion_files = [tempfile.NamedTemporaryFile(delete=False)
for _ in xrange(0, parallel_device_count)]
devices = [
ParallelizerTestObject(
i if i != exception_index else Exception(exception_msg),
completion_files[i].name)
for i in xrange(0, parallel_device_count)]
for f in completion_files:
f.close()
p = ParallelizerTestObject.parallel(devices)
with self.assertRaises(Exception) as e:
p.doRaiseIfExceptionElseSleepFor(2).pGet(3)
self.assertTrue(exception_msg in str(e.exception))
for i in xrange(0, parallel_device_count):
with open(completion_files[i].name) as f:
if i == exception_index:
self.assertEquals('', f.read())
else:
self.assertEquals('complete', f.read())
finally:
for f in completion_files:
os.remove(f.name)
def testReusable(self):
devices = [ParallelizerTestObject(True) for _ in xrange(0, 10)]
p = ParallelizerTestObject.parallel(devices)
results = p.doReturn(True).pGet(1)
self.assertTrue(all(results))
results = p.doReturn(True).pGet(1)
self.assertTrue(all(results))
with self.assertRaises(Exception):
results = p.doRaise(Exception('reusableTest')).pGet(1)
def testContained(self):
devices = [ParallelizerTestObject(i) for i in xrange(0, 10)]
results = (ParallelizerTestObject.parallel(devices).helper
.doReturnStringThing().pGet(1))
self.assertTrue(isinstance(results, list))
self.assertEquals(10, len(results))
for i in xrange(0, 10):
self.assertEquals(str(i), results[i])
def testGetItem(self):
devices = [ParallelizerTestObject(range(i, i+10)) for i in xrange(0, 10)]
results = ParallelizerTestObject.parallel(devices)[9].pGet(1)
self.assertEquals(range(9, 19), results)
if __name__ == '__main__':
unittest.main(verbosity=2)

Просмотреть файл

@ -56,6 +56,7 @@ class ReraiserThread(threading.Thread):
self._func = func self._func = func
self._args = args self._args = args
self._kwargs = kwargs self._kwargs = kwargs
self._ret = None
self._exc_info = None self._exc_info = None
def ReraiseIfException(self): def ReraiseIfException(self):
@ -63,11 +64,16 @@ class ReraiserThread(threading.Thread):
if self._exc_info: if self._exc_info:
raise self._exc_info[0], self._exc_info[1], self._exc_info[2] raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
def GetReturnValue(self):
"""Reraise exception if present, otherwise get the return value."""
self.ReraiseIfException()
return self._ret
#override #override
def run(self): def run(self):
"""Overrides Thread.run() to add support for reraising exceptions.""" """Overrides Thread.run() to add support for reraising exceptions."""
try: try:
self._func(*self._args, **self._kwargs) self._ret = self._func(*self._args, **self._kwargs)
except: except:
self._exc_info = sys.exc_info() self._exc_info = sys.exc_info()
raise raise
@ -138,3 +144,14 @@ class ReraiserThreadGroup(object):
for thread in (t for t in self._threads if t.isAlive()): for thread in (t for t in self._threads if t.isAlive()):
LogThreadStack(thread) LogThreadStack(thread)
raise raise
def GetAllReturnValues(self, watcher=watchdog_timer.WatchdogTimer(None)):
"""Get all return values, joining all threads if necessary.
Args:
watcher: same as in |JoinAll|. Only used if threads are alive.
"""
if any([t.isAlive() for t in self._threads]):
self.JoinAll(watcher)
return [t.GetReturnValue() for t in self._threads]