Bug 1906191 - Change how ProcessHandler handles output_timeout. r=releng-reviewers,gbrown

First and foremost, this undoes the patch for bug 1845125, which causes
other problems (e.g. output being dropped past a certain point).

The rest of the patch addresses bug 1845125 at the root: Ultimately, this
is a similar problem to bug 1863675.

wait is blocked on joining the stream reader thread, but the stream
reader thread is blocked on readline because the process is finished but
hasn't been waited on yet.

So joining the stream reader thread after the process is known to have
been finished is expected to work more reliably... except when a child
has been spun up that keeps it alive. In this case, we just keep the
reader thread working in the background, which is a similar strategy to
what was done in bug 1863675.

Differential Revision: https://phabricator.services.mozilla.com/D218389
This commit is contained in:
Mike Hommey 2024-08-26 23:49:06 +00:00
Родитель b381b7581d
Коммит 4c28032ab8
2 изменённых файлов: 52 добавлений и 69 удалений

Просмотреть файл

@ -170,13 +170,10 @@ class ProcessExecutionMixin(LoggingMixin):
p.processOutput()
status = None
sig = None
# XXX: p.wait() sometimes fails to detect the process exit and never returns a status code.
# Time out and check if the pid still exists.
# See bug 1845125 for example.
while status is None and p.pid_exists(p.pid):
while status is None:
try:
if sig is None:
status = p.wait(5)
status = p.wait()
else:
status = p.kill(sig=sig)
except KeyboardInterrupt:

Просмотреть файл

@ -12,6 +12,7 @@
import codecs
import errno
import io
import os
import signal
import subprocess
@ -897,7 +898,7 @@ falling back to not using job objects for managing child processes""",
# Ensure that we first check for the reader status. Otherwise
# we might mark the process as finished while output is still getting
# processed.
elif self.reader.is_alive():
elif not self._ignore_children and self.reader.is_alive():
return None
elif hasattr(self, "returncode"):
return self.returncode
@ -940,18 +941,17 @@ falling back to not using job objects for managing child processes""",
- '0' if the process ended without failures
"""
# Thread.join() blocks the main thread until the reader thread is finished
# wake up once a second in case a keyboard interrupt is sent
if self.reader.thread and self.reader.thread is not threading.current_thread():
count = 0
while self.reader.is_alive():
if timeout is not None and count > timeout:
self.debug("wait timeout for reader thread")
return None
self.reader.join(timeout=1)
count += 1
self.returncode = self.proc.wait(timeout)
if (
self.returncode is not None
and self.reader.thread
and self.reader.thread is not threading.current_thread()
# If children are ignored and a child is still running because it's
# been daemonized or something, the reader might still be attached
# to that child'd output... and joining will deadlock.
and not self._ignore_children
):
self.reader.join()
return self.returncode
@property
@ -1072,87 +1072,73 @@ class ProcessReader(object):
return thread
def _read_stream(self, stream, queue, callback):
while True:
line = stream.readline()
if not line:
break
sentinel = "" if isinstance(stream, io.TextIOBase) else b""
for line in iter(stream.readline, sentinel):
queue.put((line, callback))
# Give a chance to the reading loop to exit without a timeout.
queue.put((b"", None))
stream.close()
def start(self, proc):
queue = Queue()
stdout_reader = None
readers = 0
if proc.stdout:
stdout_reader = self._create_stream_reader(
self._create_stream_reader(
"ProcessReaderStdout", proc.stdout, queue, self.stdout_callback
)
stderr_reader = None
readers += 1
if proc.stderr and proc.stderr != proc.stdout:
stderr_reader = self._create_stream_reader(
self._create_stream_reader(
"ProcessReaderStderr", proc.stderr, queue, self.stderr_callback
)
readers += 1
self.thread = threading.Thread(
name="ProcessReader",
target=self._read,
args=(stdout_reader, stderr_reader, queue),
args=(queue, readers),
)
self.thread.daemon = True
self.thread.start()
self.debug("ProcessReader started")
def _read(self, stdout_reader, stderr_reader, queue):
def _read(self, queue, readers):
start_time = time.time()
timed_out = False
timeout = self.timeout
if timeout is not None:
timeout += start_time
output_timeout = self.output_timeout
if output_timeout is not None:
output_timeout += start_time
while (stdout_reader and stdout_reader.is_alive()) or (
stderr_reader and stderr_reader.is_alive()
):
has_line = True
try:
line, callback = queue.get(True, INTERVAL_PROCESS_ALIVE_CHECK)
except Empty:
has_line = False
now = time.time()
if not has_line:
if output_timeout is not None and now > output_timeout:
timed_out = True
self.didOutputTimeout = True
break
else:
if output_timeout is not None:
output_timeout = now + self.output_timeout
callback(line.rstrip())
if timeout is not None and now > timeout:
timed_out = True
break
self.debug("_read loop exited")
# process remaining lines to read
while not queue.empty():
line, callback = queue.get(False)
try:
callback(line.rstrip())
except Exception:
traceback.print_exc()
if timed_out:
try:
self.timeout_callback()
except Exception:
traceback.print_exc()
if stdout_reader:
stdout_reader.join()
if stderr_reader:
stderr_reader.join()
if not timed_out:
def get_line():
queue_timeout = None
if timeout:
queue_timeout = timeout - time.time()
if output_timeout:
if queue_timeout:
queue_timeout = min(queue_timeout, output_timeout)
else:
queue_timeout = output_timeout
return queue.get(timeout=queue_timeout)
try:
# We need to wait for as many `(b"", None)` sentinels as there are
# reader threads setup in start.
for n in range(readers):
for line, callback in iter(get_line, (b"", None)):
try:
callback(line.rstrip())
except Exception:
traceback.print_exc()
try:
self.finished_callback()
except Exception:
traceback.print_exc()
except Empty:
if timeout and time.time() < timeout:
self.didOutputTimeout = True
try:
self.timeout_callback()
except Exception:
traceback.print_exc()
self.debug("_read exited")
def is_alive(self):