[android] Add timeout_retry module to pylib/utils/.

BUG=318387
TEST=None
NOTRY=True

Review URL: https://codereview.chromium.org/60043003

git-svn-id: http://src.chromium.org/svn/trunk/src/build@235776 4ff67af0-8c30-449e-8e8b-ad334ec8d88c
This commit is contained in:
craigdh@chromium.org 2013-11-18 18:52:06 +00:00
Родитель 433f48e336
Коммит 794996b688
3 изменённых файлов: 114 добавлений и 3 удалений

Просмотреть файл

@ -4,14 +4,13 @@
"""A wrapper for subprocess to make calling shell commands easier.""" """A wrapper for subprocess to make calling shell commands easier."""
import os
import logging import logging
import pipes import pipes
import signal import signal
import subprocess import subprocess
import tempfile import tempfile
import constants from utils import timeout_retry
def Popen(args, stdout=None, stderr=None, shell=None, cwd=None, env=None): def Popen(args, stdout=None, stderr=None, shell=None, cwd=None, env=None):
@ -73,7 +72,7 @@ def GetCmdStatusAndOutput(args, cwd=None, shell=False):
shell: Whether to execute args as a shell command. shell: Whether to execute args as a shell command.
Returns: Returns:
The tuple (exit code, output). The 2-tuple (exit code, output).
""" """
if isinstance(args, basestring): if isinstance(args, basestring):
args_repr = args args_repr = args
@ -104,3 +103,18 @@ def GetCmdStatusAndOutput(args, cwd=None, shell=False):
logging.debug('Truncated output:') logging.debug('Truncated output:')
logging.debug(stdout[:4096]) logging.debug(stdout[:4096])
return (exit_code, stdout) return (exit_code, stdout)
def GetCmdStatusAndOutputWithTimeoutAndRetries(args, timeout, retries):
"""Executes a subprocess with a timeout and retries.
Args:
args: List of arguments to the program, the program to execute is the first
element.
timeout: the timeout in seconds.
retries: the number of retries.
Returns:
The 2-tuple (exit code, output).
"""
return timeout_retry.Run(GetCmdStatusAndOutput, timeout, retries, [args])

Просмотреть файл

@ -0,0 +1,45 @@
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A utility to run functions with timeouts and retries."""
import functools
import threading
import reraiser_thread
import watchdog_timer
def Run(func, timeout, retries, args=[], kwargs={}):
"""Runs the passed function in a separate thread with timeouts and retries.
Args:
func: the function to be wrapped.
timeout: the timeout in seconds for each try.
retries: the number of retries.
args: list of positional args to pass to |func|.
kwargs: dictionary of keyword args to pass to |func|.
Returns:
The return value of func(*args, **kwargs).
"""
# The return value uses a list because Python variables are references, not
# values. Closures make a copy of the reference, so updating the closure's
# reference wouldn't update where the original reference pointed.
ret = [None]
def RunOnTimeoutThread():
ret[0] = func(*args, **kwargs)
while True:
try:
name = 'TimeoutThread-for-%s' % threading.current_thread().name
thread_group = reraiser_thread.ReraiserThreadGroup(
[reraiser_thread.ReraiserThread(RunOnTimeoutThread, name=name)])
thread_group.StartAll()
thread_group.JoinAll(watchdog_timer.WatchdogTimer(timeout))
return ret[0]
except:
if retries <= 0:
raise
retries -= 1

Просмотреть файл

@ -0,0 +1,52 @@
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unittests for timeout_and_retry.py."""
import unittest
import reraiser_thread
import timeout_retry
class TestException(Exception):
pass
def _NeverEnding(tries=[0]):
tries[0] += 1
while True:
pass
def _CountTries(tries):
tries[0] += 1
raise TestException
class TestRun(unittest.TestCase):
"""Tests for timeout_retry.Run."""
def testRun(self):
self.assertTrue(timeout_retry.Run(
lambda x: x, 30, 3, [True], {}))
def testTimeout(self):
tries = [0]
self.assertRaises(reraiser_thread.TimeoutError,
timeout_retry.Run, lambda: _NeverEnding(tries), 0, 3)
self.assertEqual(tries[0], 4)
def testRetries(self):
tries = [0]
self.assertRaises(TestException,
timeout_retry.Run, lambda: _CountTries(tries), 30, 3)
self.assertEqual(tries[0], 4)
def testReturnValue(self):
self.assertTrue(timeout_retry.Run(lambda: True, 30, 3))
if __name__ == '__main__':
unittest.main()