[AIRFLOW-1109] Use kill signal to kill processes and log results

The kill_process_tree function comments state that
it uses SIGKILL when
it uses SIGTERM. We should update this to be
correct as well as log
results.

Closes #2241 from saguziel/aguziel-kill-processes
This commit is contained in:
Alex Guziel 2017-04-13 18:52:43 -07:00
Родитель 8f9f5084bf
Коммит 9bdfb8c700
2 изменённых файлов: 126 добавлений и 98 удалений

Просмотреть файл

@ -35,7 +35,7 @@ from airflow.exceptions import AirflowException
# When killing processes, time to wait after issuing a SIGTERM before issuing a
# SIGKILL.
TIME_TO_WAIT_AFTER_SIGTERM = 5
DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = 5
def validate_key(k, max_length=250):
@ -182,20 +182,32 @@ def pprinttable(rows):
return s
def kill_using_shell(pid, signal=signal.SIGTERM):
process = psutil.Process(pid)
# Use sudo only when necessary - consider SubDagOperator and SequentialExecutor case.
if process.username() != getpass.getuser():
args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)]
else:
args = ["kill", "-{}".format(int(signal)), str(pid)]
# PID may not exist and return a non-zero error code
subprocess.call(args)
def kill_using_shell(logger, pid, signal=signal.SIGTERM):
try:
process = psutil.Process(pid)
# Use sudo only when necessary - consider SubDagOperator and SequentialExecutor case.
if process.username() != getpass.getuser():
args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)]
else:
args = ["kill", "-{}".format(int(signal)), str(pid)]
# PID may not exist and return a non-zero error code
logger.error(subprocess.check_output(args))
logger.info("Killed process {} with signal {}".format(pid, signal))
return True
except psutil.NoSuchProcess as e:
logger.warning("Process {} no longer exists".format(pid))
return False
except subprocess.CalledProcessError as e:
logger.warning("Failed to kill process {} with signal {}. Output: {}"
.format(pid, signal, e.output))
return False
def kill_process_tree(logger, pid):
def kill_process_tree(logger, pid, timeout=DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM):
"""
Kills the process and all of the descendants. Kills using the `kill`
TODO(saguziel): also kill the root process after killing descendants
Kills the process's descendants. Kills using the `kill`
shell command so that it can change users. Note: killing via PIDs
has the potential to the wrong process if the process dies and the
PID gets recycled in a narrow time window.
@ -215,23 +227,21 @@ def kill_process_tree(logger, pid):
if x.is_running()]
if len(descendant_processes) != 0:
logger.warning("Terminating descendant processes of {} PID: {}"
.format(root_process.cmdline(),
root_process.pid))
logger.info("Terminating descendant processes of {} PID: {}"
.format(root_process.cmdline(),
root_process.pid))
temp_processes = descendant_processes[:]
for descendant in temp_processes:
logger.warning("Terminating descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
try:
kill_using_shell(descendant.pid, signal.SIGTERM)
except psutil.NoSuchProcess:
logger.info("Terminating descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
if not kill_using_shell(logger, descendant.pid, signal.SIGTERM):
descendant_processes.remove(descendant)
logger.warning("Waiting up to {}s for processes to exit..."
.format(TIME_TO_WAIT_AFTER_SIGTERM))
logger.info("Waiting up to {}s for processes to exit..."
.format(timeout))
try:
psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM)
logger.warning("Done waiting")
psutil.wait_procs(descendant_processes, timeout)
logger.info("Done waiting")
except psutil.TimeoutExpired:
logger.warning("Ran out of time while waiting for "
"processes to exit")
@ -242,85 +252,19 @@ def kill_process_tree(logger, pid):
if len(descendant_processes) > 0:
temp_processes = descendant_processes[:]
for descendant in temp_processes:
logger.warning("Killing descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
try:
kill_using_shell(descendant.pid, signal.SIGTERM)
descendant.wait()
except psutil.NoSuchProcess:
logger.info("Killing descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
if not kill_using_shell(logger, descendant.pid, signal.SIGKILL):
descendant_processes.remove(descendant)
logger.warning("Killed all descendant processes of {} PID: {}"
.format(root_process.cmdline(),
root_process.pid))
else:
descendant.wait()
logger.info("Killed all descendant processes of {} PID: {}"
.format(root_process.cmdline(),
root_process.pid))
else:
logger.debug("There are no descendant processes to kill")
def kill_descendant_processes(logger, pids_to_kill=None):
"""
Kills all descendant processes of this process.
:param logger: logger
:type logger: logging.Logger
:param pids_to_kill: if specified, kill only these PIDs
:type pids_to_kill: list[int]
"""
# First try SIGTERM
this_process = psutil.Process(os.getpid())
# Only check child processes to ensure that we don't have a case
# where a child process died but the PID got reused.
descendant_processes = [x for x in this_process.children(recursive=True)
if x.is_running()]
if pids_to_kill:
descendant_processes = [x for x in descendant_processes
if x.pid in pids_to_kill]
if len(descendant_processes) == 0:
logger.debug("There are no descendant processes that can be killed")
return
logger.warning("Terminating descendant processes of {} PID: {}"
.format(this_process.cmdline(),
this_process.pid))
temp_processes = descendant_processes[:]
for descendant in temp_processes:
try:
logger.warning("Terminating descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
descendant.terminate()
except psutil.NoSuchProcess:
descendant_processes.remove(descendant)
logger.warning("Waiting up to {}s for processes to exit..."
.format(TIME_TO_WAIT_AFTER_SIGTERM))
try:
psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM)
logger.warning("Done waiting")
except psutil.TimeoutExpired:
logger.warning("Ran out of time while waiting for "
"processes to exit")
# Then SIGKILL
descendant_processes = [x for x in this_process.children(recursive=True)
if x.is_running()]
if pids_to_kill:
descendant_processes = [x for x in descendant_processes
if x.pid in pids_to_kill]
if len(descendant_processes) > 0:
for descendant in descendant_processes:
logger.warning("Killing descendant process {} PID: {}"
.format(descendant.cmdline(), descendant.pid))
try:
descendant.kill()
descendant.wait()
except psutil.NoSuchProcess:
pass
logger.warning("Killed all descendant processes of {} PID: {}"
.format(this_process.cmdline(),
this_process.pid))
class AirflowImporter(object):
"""
Importer that dynamically loads a class and module from its parent. This

84
tests/utils/helpers.py Normal file
Просмотреть файл

@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
import multiprocessing
import unittest
import psutil
import signal
import time
from airflow.utils import helpers
class TestHelpers(unittest.TestCase):
@staticmethod
def _ignores_sigterm(child_pid, setup_done):
def signal_handler(signum, frame):
pass
signal.signal(signal.SIGTERM, signal_handler)
child_pid.value = os.getpid()
setup_done.release()
while True:
time.sleep(1)
@staticmethod
def _parent_of_ignores_sigterm(child_process_killed, child_pid,
process_done, setup_done):
child = multiprocessing.Process(target=TestHelpers._ignores_sigterm,
args=[child_pid, setup_done])
child.start()
if setup_done.acquire(timeout=1.0):
helpers.kill_process_tree(logging.getLogger(), os.getpid(), timeout=1.0)
# Process.is_alive doesnt work with SIGKILL
if not psutil.pid_exists(child_pid.value):
child_process_killed.value = 1
process_done.release()
def test_kill_process_tree(self):
""" Spin up a process that can't be killed by SIGTERM and make sure it gets killed anyway. """
child_process_killed = multiprocessing.Value('i', 0)
process_done = multiprocessing.Semaphore(0)
child_pid = multiprocessing.Value('i', 0)
setup_done = multiprocessing.Semaphore(0)
args = [child_process_killed, child_pid, process_done, setup_done]
child = multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, args=args)
try:
child.start()
self.assertTrue(process_done.acquire(timeout=5.0))
self.assertEqual(1, child_process_killed.value)
finally:
try:
os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt work here
except OSError:
pass
child.terminate()
def test_kill_using_shell(self):
""" Test when no process exists. """
child_pid = multiprocessing.Value('i', 0)
setup_done = multiprocessing.Semaphore(0)
args = [child_pid, setup_done]
child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, args=args)
child.start()
self.assertTrue(setup_done.acquire(timeout=1.0))
pid_to_kill = child_pid.value
self.assertTrue(helpers.kill_using_shell(logging.getLogger(), pid_to_kill,
signal=signal.SIGKILL))
child.join() # remove orphan process
self.assertFalse(helpers.kill_using_shell(logging.getLogger(), pid_to_kill,
signal=signal.SIGKILL))