зеркало из https://github.com/mozilla/gecko-dev.git
Bug 827960 - Allow jit_test.py to run tests in parallel. r=terrence
This commit is contained in:
Родитель
e5fca8ca12
Коммит
c8b531511d
|
@ -10,6 +10,14 @@ import datetime, os, re, sys, tempfile, traceback, time, shlex
|
|||
import subprocess
|
||||
from subprocess import *
|
||||
from threading import Thread
|
||||
import signal
|
||||
|
||||
try:
|
||||
from multiprocessing import Process, Queue, Manager, cpu_count
|
||||
HAVE_MULTIPROCESSING = True
|
||||
except ImportError:
|
||||
HAVE_MULTIPROCESSING = False
|
||||
|
||||
|
||||
def add_libdir_to_path():
|
||||
from os.path import dirname, exists, join, realpath
|
||||
|
@ -178,6 +186,29 @@ def run_timeout_cmd(cmdline, options, timeout=60.0):
|
|||
l = [ None, None ]
|
||||
timed_out = False
|
||||
th = Thread(target=th_run_cmd, args=(cmdline, options, l))
|
||||
|
||||
# If our SIGINT handler is set to SIG_IGN (ignore)
|
||||
# then we are running as a child process for parallel
|
||||
# execution and we must ensure to kill our child
|
||||
# when we are signaled to exit.
|
||||
import signal
|
||||
sigint_handler = signal.getsignal(signal.SIGINT)
|
||||
sigterm_handler = signal.getsignal(signal.SIGTERM)
|
||||
if (sigint_handler == signal.SIG_IGN):
|
||||
def handleChildSignal(sig, frame):
|
||||
try:
|
||||
if sys.platform != 'win32':
|
||||
os.kill(l[0].pid, signal.SIGKILL)
|
||||
else:
|
||||
import ctypes
|
||||
ctypes.windll.kernel32.TerminateProcess(int(l[0]._handle), -1)
|
||||
except OSError:
|
||||
pass
|
||||
if (sig == signal.SIGTERM):
|
||||
sys.exit(0)
|
||||
signal.signal(signal.SIGINT, handleChildSignal)
|
||||
signal.signal(signal.SIGTERM, handleChildSignal)
|
||||
|
||||
th.start()
|
||||
th.join(timeout)
|
||||
while th.isAlive():
|
||||
|
@ -187,13 +218,23 @@ def run_timeout_cmd(cmdline, options, timeout=60.0):
|
|||
import signal
|
||||
if sys.platform != 'win32':
|
||||
os.kill(l[0].pid, signal.SIGKILL)
|
||||
else:
|
||||
import ctypes
|
||||
ctypes.windll.kernel32.TerminateProcess(int(l[0]._handle), -1)
|
||||
time.sleep(.1)
|
||||
timed_out = True
|
||||
except OSError:
|
||||
# Expecting a "No such process" error
|
||||
pass
|
||||
th.join()
|
||||
|
||||
# Restore old signal handlers
|
||||
if (sigint_handler == signal.SIG_IGN):
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
signal.signal(signal.SIGTERM, sigterm_handler)
|
||||
|
||||
(out, err, code) = l[1]
|
||||
|
||||
return (out, err, code, timed_out)
|
||||
|
||||
def run_cmd(cmdline, env, timeout):
|
||||
|
@ -266,31 +307,191 @@ def check_output(out, err, rc, test):
|
|||
return True
|
||||
|
||||
def print_tinderbox(label, test, message=None):
|
||||
if (test != None):
|
||||
jitflags = " ".join(test.jitflags)
|
||||
result = "%s | jit_test.py %-15s| %s" % (label, jitflags, test.path)
|
||||
else:
|
||||
result = "%s | jit_test.py " % label
|
||||
|
||||
if message:
|
||||
result += ": " + message
|
||||
print result
|
||||
|
||||
def run_tests(tests, test_dir, lib_dir, shell_args):
|
||||
def wrap_parallel_run_test(test, lib_dir, shell_args, resultQueue, options, js):
|
||||
# This is necessary because on Windows global variables are not automatically
|
||||
# available in the children, while on Linux and OSX this is the case (because of fork).
|
||||
global OPTIONS
|
||||
global JS
|
||||
OPTIONS = options
|
||||
JS = js
|
||||
|
||||
# Ignore SIGINT in the child
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
|
||||
result = run_test(test, lib_dir, shell_args) + (test,)
|
||||
resultQueue.put(result)
|
||||
return result
|
||||
|
||||
def run_tests_parallel(tests, test_dir, lib_dir, shell_args):
|
||||
# This queue will contain the results of the various tests run.
|
||||
# We could make this queue a global variable instead of using
|
||||
# a manager to share, but this will not work on Windows.
|
||||
queue_manager = Manager()
|
||||
async_test_result_queue = queue_manager.Queue()
|
||||
|
||||
# This queue will be used by the result process to indicate
|
||||
# that it has received a result and we can start a new process
|
||||
# on our end. The advantage is that we don't have to sleep and
|
||||
# check for worker completion ourselves regularly.
|
||||
notify_queue = queue_manager.Queue()
|
||||
|
||||
# This queue will contain the return value of the function
|
||||
# processing the test results.
|
||||
result_process_return_queue = queue_manager.Queue()
|
||||
result_process = Process(target=process_test_results_parallel, args=(async_test_result_queue, result_process_return_queue, notify_queue, len(tests), OPTIONS, JS))
|
||||
result_process.start()
|
||||
|
||||
# Ensure that a SIGTERM is handled the same way as SIGINT
|
||||
# to terminate all child processes.
|
||||
sigint_handler = signal.getsignal(signal.SIGINT)
|
||||
signal.signal(signal.SIGTERM, sigint_handler)
|
||||
|
||||
worker_processes = []
|
||||
|
||||
def remove_completed_workers(workers):
|
||||
new_workers = []
|
||||
for worker in workers:
|
||||
if worker.is_alive():
|
||||
new_workers.append(worker)
|
||||
else:
|
||||
worker.join()
|
||||
return new_workers
|
||||
|
||||
try:
|
||||
testcnt = 0
|
||||
# Initially start as many jobs as allowed to run parallel
|
||||
for i in range(min(OPTIONS.max_jobs,len(tests))):
|
||||
notify_queue.put(True)
|
||||
|
||||
# For every item in the notify queue, start one new worker.
|
||||
# Every completed worker adds a new item to this queue.
|
||||
while notify_queue.get():
|
||||
if (testcnt < len(tests)):
|
||||
# Start one new worker
|
||||
worker_process = Process(target=wrap_parallel_run_test, args=(tests[testcnt], lib_dir, shell_args, async_test_result_queue, OPTIONS, JS))
|
||||
worker_processes.append(worker_process)
|
||||
worker_process.start()
|
||||
testcnt += 1
|
||||
|
||||
# Collect completed workers
|
||||
worker_processes = remove_completed_workers(worker_processes)
|
||||
else:
|
||||
break
|
||||
|
||||
# Wait for all processes to terminate
|
||||
while len(worker_processes) > 0:
|
||||
worker_processes = remove_completed_workers(worker_processes)
|
||||
|
||||
# Signal completion to result processor, then wait for it to complete on its own
|
||||
async_test_result_queue.put(None)
|
||||
result_process.join()
|
||||
|
||||
# Return what the result process has returned to us
|
||||
return result_process_return_queue.get()
|
||||
except (Exception, KeyboardInterrupt) as e:
|
||||
# Print the exception if it's not an interrupt,
|
||||
# might point to a bug or other faulty condition
|
||||
if not isinstance(e,KeyboardInterrupt):
|
||||
traceback.print_exc()
|
||||
|
||||
for worker in worker_processes:
|
||||
try:
|
||||
worker.terminate()
|
||||
except:
|
||||
pass
|
||||
|
||||
result_process.terminate()
|
||||
|
||||
return False
|
||||
|
||||
def get_parallel_results(async_test_result_queue, notify_queue):
|
||||
while True:
|
||||
async_test_result = async_test_result_queue.get()
|
||||
|
||||
# Check if we are supposed to terminate
|
||||
if (async_test_result == None):
|
||||
return
|
||||
|
||||
# Notify parent that we got a result
|
||||
notify_queue.put(True)
|
||||
|
||||
yield async_test_result
|
||||
|
||||
def process_test_results_parallel(async_test_result_queue, return_queue, notify_queue, num_tests, options, js):
|
||||
gen = get_parallel_results(async_test_result_queue, notify_queue)
|
||||
ok = process_test_results(gen, num_tests, options, js)
|
||||
return_queue.put(ok)
|
||||
|
||||
def print_test_summary(failures, complete, doing, options):
|
||||
if failures:
|
||||
if options.write_failures:
|
||||
try:
|
||||
out = open(options.write_failures, 'w')
|
||||
# Don't write duplicate entries when we are doing multiple failures per job.
|
||||
written = set()
|
||||
for test, fout, ferr, fcode, _ in failures:
|
||||
if test.path not in written:
|
||||
out.write(os.path.relpath(test.path, test_dir) + '\n')
|
||||
if options.write_failure_output:
|
||||
out.write(fout)
|
||||
out.write(ferr)
|
||||
out.write('Exit code: ' + str(fcode) + "\n")
|
||||
written.add(test.path)
|
||||
out.close()
|
||||
except IOError:
|
||||
sys.stderr.write("Exception thrown trying to write failure file '%s'\n"%
|
||||
options.write_failures)
|
||||
traceback.print_exc()
|
||||
sys.stderr.write('---\n')
|
||||
|
||||
def show_test(test):
|
||||
if options.show_failed:
|
||||
print(' ' + subprocess.list2cmdline(get_test_cmd(test.path, test.jitflags, lib_dir, shell_args)))
|
||||
else:
|
||||
print(' ' + ' '.join(test.jitflags + [ test.path ]))
|
||||
|
||||
print('FAILURES:')
|
||||
for test, _, __, ___, timed_out in failures:
|
||||
if not timed_out:
|
||||
show_test(test)
|
||||
|
||||
print('TIMEOUTS:')
|
||||
for test, _, __, ___, timed_out in failures:
|
||||
if timed_out:
|
||||
show_test(test)
|
||||
|
||||
return False
|
||||
else:
|
||||
print('PASSED ALL' + ('' if complete else ' (partial run -- interrupted by user %s)'%doing))
|
||||
return True
|
||||
|
||||
def process_test_results(results, num_tests, options, js):
|
||||
pb = NullProgressBar()
|
||||
if not OPTIONS.hide_progress and not OPTIONS.show_cmd and ProgressBar.conservative_isatty():
|
||||
if not options.hide_progress and not options.show_cmd and ProgressBar.conservative_isatty():
|
||||
fmt = [
|
||||
{'value': 'PASS', 'color': 'green'},
|
||||
{'value': 'FAIL', 'color': 'red'},
|
||||
{'value': 'TIMEOUT', 'color': 'blue'},
|
||||
{'value': 'SKIP', 'color': 'brightgray'},
|
||||
]
|
||||
pb = ProgressBar(len(tests), fmt)
|
||||
pb = ProgressBar(num_tests, fmt)
|
||||
|
||||
failures = []
|
||||
timeouts = 0
|
||||
complete = False
|
||||
doing = 'before starting'
|
||||
try:
|
||||
for i, test in enumerate(tests):
|
||||
doing = 'on %s'%test.path
|
||||
ok, out, err, code, timed_out = run_test(test, lib_dir, shell_args)
|
||||
for i, (ok, out, err, code, timed_out, test) in enumerate(results):
|
||||
doing = 'after %s'%test.path
|
||||
|
||||
if not ok:
|
||||
|
@ -299,7 +500,7 @@ def run_tests(tests, test_dir, lib_dir, shell_args):
|
|||
if timed_out:
|
||||
timeouts += 1
|
||||
|
||||
if OPTIONS.tinderbox:
|
||||
if options.tinderbox:
|
||||
if ok:
|
||||
print_tinderbox("TEST-PASS", test);
|
||||
else:
|
||||
|
@ -320,51 +521,21 @@ def run_tests(tests, test_dir, lib_dir, shell_args):
|
|||
)
|
||||
complete = True
|
||||
except KeyboardInterrupt:
|
||||
print_tinderbox("TEST-UNEXPECTED-FAIL", test);
|
||||
print_tinderbox("TEST-UNEXPECTED-FAIL", None, "Test execution interrupted by user");
|
||||
|
||||
pb.finish(True)
|
||||
return print_test_summary(failures, complete, doing, options)
|
||||
|
||||
if failures:
|
||||
if OPTIONS.write_failures:
|
||||
try:
|
||||
out = open(OPTIONS.write_failures, 'w')
|
||||
# Don't write duplicate entries when we are doing multiple failures per job.
|
||||
written = set()
|
||||
for test, fout, ferr, fcode, _ in failures:
|
||||
if test.path not in written:
|
||||
out.write(os.path.relpath(test.path, test_dir) + '\n')
|
||||
if OPTIONS.write_failure_output:
|
||||
out.write(fout)
|
||||
out.write(ferr)
|
||||
out.write('Exit code: ' + str(fcode) + "\n")
|
||||
written.add(test.path)
|
||||
out.close()
|
||||
except IOError:
|
||||
sys.stderr.write("Exception thrown trying to write failure file '%s'\n"%
|
||||
OPTIONS.write_failures)
|
||||
traceback.print_exc()
|
||||
sys.stderr.write('---\n')
|
||||
|
||||
def show_test(test):
|
||||
if OPTIONS.show_failed:
|
||||
print(' ' + subprocess.list2cmdline(get_test_cmd(test.path, test.jitflags, lib_dir, shell_args)))
|
||||
else:
|
||||
print(' ' + ' '.join(test.jitflags + [ test.path ]))
|
||||
def get_serial_results(tests, lib_dir, shell_args):
|
||||
for test in tests:
|
||||
result = run_test(test, lib_dir, shell_args)
|
||||
yield result + (test,)
|
||||
|
||||
print('FAILURES:')
|
||||
for test, _, __, ___, timed_out in failures:
|
||||
if not timed_out:
|
||||
show_test(test)
|
||||
|
||||
print('TIMEOUTS:')
|
||||
for test, _, __, ___, timed_out in failures:
|
||||
if timed_out:
|
||||
show_test(test)
|
||||
|
||||
return False
|
||||
else:
|
||||
print('PASSED ALL' + ('' if complete else ' (partial run -- interrupted by user %s)'%doing))
|
||||
return True
|
||||
def run_tests(tests, test_dir, lib_dir, shell_args):
|
||||
gen = get_serial_results(tests, lib_dir, shell_args)
|
||||
ok = process_test_results(gen, len(tests), OPTIONS, JS)
|
||||
return ok
|
||||
|
||||
def parse_jitflags():
|
||||
jitflags = [ [ '-' + flag for flag in flags ]
|
||||
|
@ -400,6 +571,15 @@ def main(argv):
|
|||
test_dir = os.path.join(script_dir, 'tests')
|
||||
lib_dir = os.path.join(script_dir, 'lib')
|
||||
|
||||
# If no multiprocessing is available, fallback to serial test execution
|
||||
max_jobs_default = 1
|
||||
if HAVE_MULTIPROCESSING:
|
||||
try:
|
||||
max_jobs_default = cpu_count()
|
||||
print "Defaulting to %d jobs in parallel" % max_jobs_default
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
# The [TESTS] optional arguments are paths of test files relative
|
||||
# to the jit-test/tests directory.
|
||||
|
||||
|
@ -446,6 +626,9 @@ def main(argv):
|
|||
help='Run tests once with --ion-eager and once with --no-jm (ignores --jitflags)')
|
||||
op.add_option('--tbpl', dest='tbpl', action='store_true',
|
||||
help='Run tests with all IonMonkey option combinations (ignores --jitflags)')
|
||||
op.add_option('-j', '--worker-count', dest='max_jobs', type=int, default=max_jobs_default,
|
||||
help='Number of tests to run in parallel (default %default)')
|
||||
|
||||
(OPTIONS, args) = op.parse_args(argv)
|
||||
if len(args) < 1:
|
||||
op.error('missing JS_SHELL argument')
|
||||
|
@ -560,6 +743,10 @@ def main(argv):
|
|||
sys.exit()
|
||||
|
||||
try:
|
||||
ok = None
|
||||
if OPTIONS.max_jobs > 1 and HAVE_MULTIPROCESSING:
|
||||
ok = run_tests_parallel(job_list, test_dir, lib_dir, shell_args)
|
||||
else:
|
||||
ok = run_tests(job_list, test_dir, lib_dir, shell_args)
|
||||
if not ok:
|
||||
sys.exit(2)
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
// |jit-test| slow;
|
||||
|
||||
eval(" function x() {}" + Array(241).join(" "));
|
||||
for (var i = 0; i < 100; i++) {
|
||||
gczeal(4, 2);
|
||||
|
|
Загрузка…
Ссылка в новой задаче