Bug 1213133 - Remove jit-test's legacy multiprocessing and serial task runners; r=sfink

This commit is contained in:
Terrence Cole 2015-10-09 13:39:17 -07:00
Родитель 1e2222982c
Коммит 9ba17a864f
1 изменённых файлов: 0 добавлений и 252 удалений

Просмотреть файл

@ -10,9 +10,6 @@ from __future__ import print_function
import os, posixpath, sys, tempfile, traceback, time
import subprocess
from collections import namedtuple
from subprocess import Popen, PIPE
from threading import Thread
import signal
import StringIO
if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
@ -20,12 +17,6 @@ if sys.platform.startswith('linux') or sys.platform.startswith('darwin'):
else:
from tasks_win import run_all_tests
try:
from multiprocessing import Process, Manager, cpu_count
HAVE_MULTIPROCESSING = True
except ImportError:
HAVE_MULTIPROCESSING = False
from progressbar import ProgressBar, NullProgressBar
from results import TestOutput
@ -296,125 +287,6 @@ def find_tests(substring=None):
ans.append(test)
return ans
def tmppath(token):
fd, path = tempfile.mkstemp(prefix=token)
os.close(fd)
return path
def read_and_unlink(path):
f = open(path)
d = f.read()
f.close()
os.unlink(path)
return d
def th_run_cmd(cmdline, options, l):
# close_fds is not supported on Windows and will cause a ValueError.
if sys.platform != 'win32':
options["close_fds"] = True
p = Popen(cmdline, stdin=PIPE, stdout=PIPE, stderr=PIPE, **options)
l[0] = p
out, err = p.communicate()
l[1] = (out, err, p.returncode)
def run_timeout_cmd(cmdline, options, timeout=60.0):
l = [None, None]
timed_out = False
th = Thread(target=th_run_cmd, args=(cmdline, options, l))
# If our SIGINT handler is set to SIG_IGN (ignore)
# then we are running as a child process for parallel
# execution and we must ensure to kill our child
# when we are signaled to exit.
sigint_handler = signal.getsignal(signal.SIGINT)
sigterm_handler = signal.getsignal(signal.SIGTERM)
if sigint_handler == signal.SIG_IGN:
def handleChildSignal(sig, frame):
try:
if sys.platform != 'win32':
os.kill(l[0].pid, signal.SIGKILL)
else:
import ctypes
ctypes.windll.kernel32.TerminateProcess(int(l[0]._handle),
-1)
except OSError:
pass
if sig == signal.SIGTERM:
sys.exit(0)
signal.signal(signal.SIGINT, handleChildSignal)
signal.signal(signal.SIGTERM, handleChildSignal)
th.start()
th.join(timeout)
while th.isAlive():
if l[0] is not None:
try:
# In Python 3, we could just do l[0].kill().
if sys.platform != 'win32':
os.kill(l[0].pid, signal.SIGKILL)
else:
import ctypes
ctypes.windll.kernel32.TerminateProcess(int(l[0]._handle),
-1)
time.sleep(.1)
timed_out = True
except OSError:
# Expecting a "No such process" error
pass
th.join()
# Restore old signal handlers
if sigint_handler == signal.SIG_IGN:
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, sigterm_handler)
(out, err, code) = l[1]
return (out, err, code, timed_out)
def run_cmd(cmdline, env, timeout):
return run_timeout_cmd(cmdline, {'env': env}, timeout)
def run_cmd_avoid_stdio(cmdline, env, timeout):
stdoutPath, stderrPath = tmppath('jsstdout'), tmppath('jsstderr')
env['JS_STDOUT'] = stdoutPath
env['JS_STDERR'] = stderrPath
_, __, code = run_timeout_cmd(cmdline, {'env': env}, timeout)
return read_and_unlink(stdoutPath), read_and_unlink(stderrPath), code
def run_test(test, prefix, options):
cmd = test.command(prefix, LIB_DIR)
if options.show_cmd:
print(subprocess.list2cmdline(cmd))
if options.avoid_stdio:
run = run_cmd_avoid_stdio
else:
run = run_cmd
env = os.environ.copy()
if test.tz_pacific:
env['TZ'] = 'PST8PDT'
# Ensure interpreter directory is in shared library path.
pathvar = ''
if sys.platform.startswith('linux'):
pathvar = 'LD_LIBRARY_PATH'
elif sys.platform.startswith('darwin'):
pathvar = 'DYLD_LIBRARY_PATH'
elif sys.platform.startswith('win'):
pathvar = 'PATH'
if pathvar:
bin_dir = os.path.dirname(cmd[0])
if pathvar in env:
env[pathvar] = '{}{}{}'.format(bin_dir, os.pathsep, env[pathvar])
else:
env[pathvar] = bin_dir
out, err, code, timed_out = run(cmd, env, options.timeout)
return TestOutput(test, cmd, out, err, code, None, timed_out)
def run_test_remote(test, device, prefix, options):
cmd = test.command(prefix,
posixpath.join(options.remote_test_root, 'lib/'),
@ -532,125 +404,6 @@ def print_automation_format(ok, res):
for line in res.err.splitlines():
print("INFO stderr 2> " + line.strip())
def wrap_parallel_run_test(test, prefix, resultQueue, options):
# Ignore SIGINT in the child
signal.signal(signal.SIGINT, signal.SIG_IGN)
result = run_test(test, prefix, options)
resultQueue.put(result)
return result
def run_tests_parallel(tests, prefix, options):
# This queue will contain the results of the various tests run.
# We could make this queue a global variable instead of using
# a manager to share, but this will not work on Windows.
queue_manager = Manager()
async_test_result_queue = queue_manager.Queue()
# This queue will be used by the result process to indicate
# that it has received a result and we can start a new process
# on our end. The advantage is that we don't have to sleep and
# check for worker completion ourselves regularly.
notify_queue = queue_manager.Queue()
# This queue will contain the return value of the function
# processing the test results.
total_tests = len(tests) * options.repeat
result_process_return_queue = queue_manager.Queue()
result_process = Process(target=process_test_results_parallel,
args=(async_test_result_queue,
result_process_return_queue,
notify_queue, total_tests, options))
result_process.start()
# Ensure that a SIGTERM is handled the same way as SIGINT
# to terminate all child processes.
sigint_handler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGTERM, sigint_handler)
worker_processes = []
def remove_completed_workers(workers):
new_workers = []
for worker in workers:
if worker.is_alive():
new_workers.append(worker)
else:
worker.join()
return new_workers
try:
testcnt = 0
# Initially start as many jobs as allowed to run parallel
# Always enqueue at least one to avoid a curious deadlock
for i in range(max(1, min(options.max_jobs, total_tests))):
notify_queue.put(True)
# For every item in the notify queue, start one new worker.
# Every completed worker adds a new item to this queue.
while notify_queue.get():
if testcnt < total_tests:
# Start one new worker
test = tests[testcnt % len(tests)]
worker_process = Process(target=wrap_parallel_run_test,
args=(test, prefix,
async_test_result_queue,
options))
worker_processes.append(worker_process)
worker_process.start()
testcnt += 1
# Collect completed workers
worker_processes = remove_completed_workers(worker_processes)
else:
break
# Wait for all processes to terminate
while len(worker_processes) > 0:
worker_processes = remove_completed_workers(worker_processes)
# Signal completion to result processor, then wait for it to complete
# on its own
async_test_result_queue.put(None)
result_process.join()
# Return what the result process has returned to us
return result_process_return_queue.get()
except (Exception, KeyboardInterrupt) as e:
# Print the exception if it's not an interrupt,
# might point to a bug or other faulty condition
if not isinstance(e, KeyboardInterrupt):
traceback.print_exc()
for worker in worker_processes:
try:
worker.terminate()
except:
pass
result_process.terminate()
return False
def get_parallel_results(async_test_result_queue, notify_queue):
while True:
async_test_result = async_test_result_queue.get()
# Check if we are supposed to terminate
if async_test_result == None:
return
# Notify parent that we got a result
notify_queue.put(True)
yield async_test_result
def process_test_results_parallel(async_test_result_queue, return_queue,
notify_queue, num_tests, options):
pb = create_progressbar(num_tests, options)
gen = get_parallel_results(async_test_result_queue, notify_queue)
ok = process_test_results(gen, num_tests, pb, options)
return_queue.put(ok)
def print_test_summary(num_tests, failures, complete, doing, options):
if failures:
if options.write_failures:
@ -773,11 +526,6 @@ def process_test_results(results, num_tests, pb, options):
pb.finish(True)
return print_test_summary(num_tests, failures, complete, doing, options)
def get_serial_results(tests, prefix, options):
for i in xrange(0, options.repeat):
for test in tests:
yield run_test(test, prefix, options)
def run_tests(tests, prefix, options):
# The jstests tasks runner requires the following options. The names are
# taken from the jstests options processing code, which are frequently