Bug 1400716 - Explicitly drain all queues on exit, r=maja_zf

The fact that the queues were not drained on exist seemed to cause an
intermittent failure where one process tried to write to a queue that
another had closed. Draining the queues explicitly should avoid this
and ensure we surface hidden problems in the log.

MozReview-Commit-ID: 8ulLNpIcj5z

--HG--
extra : rebase_source : 1234d368a4ef4eedd4a40835ff2d284bbe04f3f3
This commit is contained in:
James Graham 2017-11-27 16:16:55 +00:00
Родитель da41393bd7
Коммит 0a23cf28d8
3 изменённых файлов: 35 добавлений и 7 удалений

Просмотреть файл

@ -175,7 +175,11 @@ class MarionetteProtocol(Protocol):
# This can happen if there was a crash
return
if socket_timeout:
self.marionette.timeout.script = socket_timeout / 2
try:
self.marionette.timeout.script = socket_timeout / 2
except (socket.error, IOError):
self.logger.debug("Socket closed")
return
self.marionette.switch_to_window(self.runner_handle)
while True:

Просмотреть файл

@ -120,6 +120,13 @@ def start_runner(runner_command_queue, runner_result_queue,
executor_browser_cls, executor_browser_kwargs,
stop_flag):
"""Launch a TestRunner in a new process"""
def log(level, msg):
runner_result_queue.put(("log", (level, {"message": msg})))
def handle_error(e):
log("critical", traceback.format_exc())
stop_flag.set()
try:
browser = executor_browser_cls(**executor_browser_kwargs)
executor = executor_cls(browser, **executor_kwargs)
@ -128,10 +135,10 @@ def start_runner(runner_command_queue, runner_result_queue,
runner.run()
except KeyboardInterrupt:
stop_flag.set()
except Exception:
runner_result_queue.put(("log", ("critical", {"message": traceback.format_exc()})))
print >> sys.stderr, traceback.format_exc()
stop_flag.set()
except Exception as e:
handle_error(e)
except Exception as e:
handle_error(e)
finally:
runner_command_queue = None
runner_result_queue = None
@ -389,6 +396,7 @@ class TestRunnerManager(threading.Thread):
}
try:
command, data = self.command_queue.get(True, 1)
self.logger.debug("Got command: %r" % command)
except IOError:
self.logger.error("Got IOError from poll")
return RunnerManagerState.restarting(0)
@ -676,7 +684,18 @@ class TestRunnerManager(threading.Thread):
self.browser.cleanup()
while True:
try:
self.logger.warning(" ".join(map(repr, self.command_queue.get_nowait())))
cmd, data = self.command_queue.get_nowait()
except Empty:
break
else:
if cmd == "log":
self.log(*data)
else:
self.logger.warning("%r: %r" % (cmd, data))
while True:
try:
cmd, data = self.remote_queue.get_nowait()
self.logger.warning("%r: %r" % (cmd, data))
except Empty:
break

Просмотреть файл

@ -1,6 +1,7 @@
import logging
import sys
import threading
from Queue import Empty
from StringIO import StringIO
from multiprocessing import Queue
@ -51,7 +52,6 @@ class LogLevelRewriter(object):
return self.inner(data)
class LogThread(threading.Thread):
def __init__(self, queue, logger, level):
self.queue = queue
@ -126,5 +126,10 @@ class CaptureIO(object):
self.logging_queue.put(None)
if self.logging_thread is not None:
self.logging_thread.join(10)
while not self.logging_queue.empty():
try:
self.logger.warning("Dropping log message: %r", self.logging_queue.get())
except Exception:
pass
self.logging_queue.close()
self.logger.info("queue closed")