Fix quarantined tests - TestCliWebServer (#9598)

This commit is contained in:
Kamil Breguła 2020-07-01 16:12:47 +02:00 коммит произвёл GitHub
Родитель f3e1f9a313
Коммит 48a8316646
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 139 добавлений и 128 удалений

Просмотреть файл

@ -17,6 +17,7 @@
"""Webserver command"""
import hashlib
import logging
import os
import signal
import subprocess
@ -40,6 +41,8 @@ from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.process_utils import check_if_pidfile_process_is_running
from airflow.www.app import cached_app, create_app
log = logging.getLogger(__name__)
class GunicornMonitor(LoggingMixin):
"""
@ -64,8 +67,8 @@ class GunicornMonitor(LoggingMixin):
respectively. Gunicorn guarantees that on TTOU workers are terminated
gracefully and that the oldest worker is terminated.
:param gunicorn_master_proc: handle for the main Gunicorn process
:param num_workers_expected: Number of workers to run the Gunicorn web server
:param gunicorn_master_pid: PID for the main Gunicorn process
:param num_workers_expected: Number of workers to run the Gunicorn web server
:param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
doesn't respond
:param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
@ -77,7 +80,7 @@ class GunicornMonitor(LoggingMixin):
"""
def __init__(
self,
gunicorn_master_proc: psutil.Process,
gunicorn_master_pid: int,
num_workers_expected: int,
master_timeout: int,
worker_refresh_interval: int,
@ -85,7 +88,7 @@ class GunicornMonitor(LoggingMixin):
reload_on_plugin_change: bool
):
super().__init__()
self.gunicorn_master_proc = gunicorn_master_proc
self.gunicorn_master_proc = psutil.Process(gunicorn_master_pid)
self.num_workers_expected = num_workers_expected
self.master_timeout = master_timeout
self.worker_refresh_interval = worker_refresh_interval
@ -208,8 +211,8 @@ class GunicornMonitor(LoggingMixin):
timeout=self.master_timeout
)
while True:
if self.gunicorn_master_proc.poll() is not None:
sys.exit(self.gunicorn_master_proc.returncode)
if not self.gunicorn_master_proc.is_running():
sys.exit(1)
self._check_workers()
# Throttle loop
sleep(1)
@ -386,19 +389,20 @@ def webserver(args):
gunicorn_master_proc = None
def kill_proc(dummy_signum, dummy_frame): # pylint: disable=unused-argument
def kill_proc(signum, _): # pylint: disable=unused-argument
log.info("Received signal: %s. Closing gunicorn.", signum)
gunicorn_master_proc.terminate()
gunicorn_master_proc.wait()
sys.exit(0)
def monitor_gunicorn(gunicorn_master_proc):
def monitor_gunicorn(gunicorn_master_pid: int):
# Register signal handlers
signal.signal(signal.SIGINT, kill_proc)
signal.signal(signal.SIGTERM, kill_proc)
# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
GunicornMonitor(
gunicorn_master_proc=gunicorn_master_proc,
gunicorn_master_pid=gunicorn_master_pid,
num_workers_expected=num_workers,
master_timeout=conf.getint('webserver', 'web_server_master_timeout'),
worker_refresh_interval=conf.getint('webserver', 'worker_refresh_interval', fallback=30),
@ -432,8 +436,8 @@ def webserver(args):
# Run Gunicorn monitor
gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
monitor_gunicorn(gunicorn_master_proc)
monitor_gunicorn(gunicorn_master_proc.pid)
else:
gunicorn_master_proc = subprocess.Popen(run_args, close_fds=True)
monitor_gunicorn(gunicorn_master_proc)
monitor_gunicorn(gunicorn_master_proc.pid)

Просмотреть файл

@ -72,9 +72,9 @@ beautifulsoup4==4.7.1
billiard==3.6.3.0
black==19.10b0
blinker==1.4
boto3==1.14.12
boto3==1.14.14
boto==2.49.0
botocore==1.17.12
botocore==1.17.14
bowler==0.8.0
cached-property==1.5.1
cachetools==4.1.1
@ -144,7 +144,7 @@ google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.1
google-auth==1.18.0
google-cloud-automl==1.0.1
google-cloud-bigquery-datatransfer==1.0.0
google-cloud-bigquery-datatransfer==1.1.0
google-cloud-bigquery==1.25.0
google-cloud-bigtable==1.2.1
google-cloud-container==1.0.1
@ -206,7 +206,7 @@ jsonpickle==1.4.1
jsonpointer==2.0
jsonschema==3.2.0
junit-xml==1.9
jupyter-client==6.1.3
jupyter-client==6.1.5
jupyter-core==4.6.3
kombu==4.6.11
kubernetes==11.0.0
@ -225,7 +225,7 @@ more-itertools==8.4.0
moto==1.3.14
msgpack==1.0.0
msrest==0.6.17
msrestazure==0.6.3
msrestazure==0.6.4
multi-key-dict==2.0.3
multidict==4.7.6
mypy-extensions==0.4.3
@ -247,7 +247,7 @@ oscrypto==1.2.0
packaging==20.4
pandas-gbq==0.13.2
pandas==1.0.5
papermill==2.1.1
papermill==2.1.2
parameterized==0.7.4
paramiko==2.7.1
parso==0.7.0
@ -299,14 +299,14 @@ pytest-rerunfailures==9.0
pytest-timeout==1.4.1
pytest-xdist==1.32.0
pytest==5.4.3
python-daemon==2.1.2
python-daemon==2.2.4
python-dateutil==2.8.1
python-editor==1.0.4
python-http-client==3.2.7
python-jenkins==1.7.0
python-jose==3.1.0
python-nvd3==0.15.0
python-slugify==4.0.0
python-slugify==4.0.1
python3-openid==3.2.0
pytz==2020.1
pytzdata==2019.3

Просмотреть файл

@ -72,9 +72,9 @@ beautifulsoup4==4.7.1
billiard==3.6.3.0
black==19.10b0
blinker==1.4
boto3==1.14.12
boto3==1.14.14
boto==2.49.0
botocore==1.17.12
botocore==1.17.14
bowler==0.8.0
cached-property==1.5.1
cachetools==4.1.1
@ -143,7 +143,7 @@ google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.1
google-auth==1.18.0
google-cloud-automl==1.0.1
google-cloud-bigquery-datatransfer==1.0.0
google-cloud-bigquery-datatransfer==1.1.0
google-cloud-bigquery==1.25.0
google-cloud-bigtable==1.2.1
google-cloud-container==1.0.1
@ -202,7 +202,7 @@ jsonpickle==1.4.1
jsonpointer==2.0
jsonschema==3.2.0
junit-xml==1.9
jupyter-client==6.1.3
jupyter-client==6.1.5
jupyter-core==4.6.3
kombu==4.6.11
kubernetes==11.0.0
@ -221,7 +221,7 @@ more-itertools==8.4.0
moto==1.3.14
msgpack==1.0.0
msrest==0.6.17
msrestazure==0.6.3
msrestazure==0.6.4
multi-key-dict==2.0.3
multidict==4.7.6
mypy-extensions==0.4.3
@ -243,7 +243,7 @@ oscrypto==1.2.0
packaging==20.4
pandas-gbq==0.13.2
pandas==1.0.5
papermill==2.1.1
papermill==2.1.2
parameterized==0.7.4
paramiko==2.7.1
parso==0.7.0
@ -294,14 +294,14 @@ pytest-rerunfailures==9.0
pytest-timeout==1.4.1
pytest-xdist==1.32.0
pytest==5.4.3
python-daemon==2.1.2
python-daemon==2.2.4
python-dateutil==2.8.1
python-editor==1.0.4
python-http-client==3.2.7
python-jenkins==1.7.0
python-jose==3.1.0
python-nvd3==0.15.0
python-slugify==4.0.0
python-slugify==4.0.1
python3-openid==3.2.0
pytz==2020.1
pytzdata==2019.3

Просмотреть файл

@ -72,9 +72,9 @@ beautifulsoup4==4.7.1
billiard==3.6.3.0
black==19.10b0
blinker==1.4
boto3==1.14.12
boto3==1.14.14
boto==2.49.0
botocore==1.17.12
botocore==1.17.14
bowler==0.8.0
cached-property==1.5.1
cachetools==4.1.1
@ -143,7 +143,7 @@ google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.1
google-auth==1.18.0
google-cloud-automl==1.0.1
google-cloud-bigquery-datatransfer==1.0.0
google-cloud-bigquery-datatransfer==1.1.0
google-cloud-bigquery==1.25.0
google-cloud-bigtable==1.2.1
google-cloud-container==1.0.1
@ -202,7 +202,7 @@ jsonpickle==1.4.1
jsonpointer==2.0
jsonschema==3.2.0
junit-xml==1.9
jupyter-client==6.1.3
jupyter-client==6.1.5
jupyter-core==4.6.3
kombu==4.6.11
kubernetes==11.0.0
@ -221,7 +221,7 @@ more-itertools==8.4.0
moto==1.3.14
msgpack==1.0.0
msrest==0.6.17
msrestazure==0.6.3
msrestazure==0.6.4
multi-key-dict==2.0.3
multidict==4.7.6
mypy-extensions==0.4.3
@ -243,7 +243,7 @@ oscrypto==1.2.0
packaging==20.4
pandas-gbq==0.13.2
pandas==1.0.5
papermill==2.1.1
papermill==2.1.2
parameterized==0.7.4
paramiko==2.7.1
parso==0.7.0
@ -293,14 +293,14 @@ pytest-rerunfailures==9.0
pytest-timeout==1.4.1
pytest-xdist==1.32.0
pytest==5.4.3
python-daemon==2.1.2
python-daemon==2.2.4
python-dateutil==2.8.1
python-editor==1.0.4
python-http-client==3.2.7
python-jenkins==1.7.0
python-jose==3.1.0
python-nvd3==0.15.0
python-slugify==4.0.0
python-slugify==4.0.1
python3-openid==3.2.0
pytz==2020.1
pytzdata==2019.3

Просмотреть файл

@ -1 +1 @@
ab047ae7da10b1a5efb746c9c4a403fe /opt/airflow/setup.py
e50c855fa7de97b2eca8152389400a5b /opt/airflow/setup.py

Просмотреть файл

@ -1 +1 @@
ab047ae7da10b1a5efb746c9c4a403fe /opt/airflow/setup.py
e50c855fa7de97b2eca8152389400a5b /opt/airflow/setup.py

Просмотреть файл

@ -1 +1 @@
ab047ae7da10b1a5efb746c9c4a403fe /opt/airflow/setup.py
e50c855fa7de97b2eca8152389400a5b /opt/airflow/setup.py

Просмотреть файл

@ -716,7 +716,7 @@ INSTALL_REQUIREMENTS = [
'pep562~=1.0;python_version<"3.7"',
'psutil>=4.2.0, <6.0.0',
'pygments>=2.0.1, <3.0',
'python-daemon>=2.1.1, <2.2',
'python-daemon>=2.1.1',
'python-dateutil>=2.3, <3',
'python-nvd3~=0.15.0',
'python-slugify>=3.0.0,<5.0',

Просмотреть файл

@ -1,48 +0,0 @@
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import re
import unittest
class TestOnlyUseLongOption(unittest.TestCase):
def test_command_only_long_option(self):
"""
Make sure all cli.commands test use long option for more clearer intent
"""
pattern_1 = re.compile("\"-[a-zA-Z]\"")
pattern_2 = re.compile("'-[a-zA-Z]'")
current_dir = os.path.dirname(os.path.abspath(__file__))
ignore = ["__init__.py", "__pycache__", os.path.basename(__file__)]
for test_file in os.listdir(current_dir):
if test_file in ignore:
continue
match = []
with open(os.path.join(current_dir, test_file), "r") as f:
content = f.read()
match.extend(pattern_1.findall(content))
match.extend(pattern_2.findall(content))
self.assertListEqual(
[],
match,
"Should use long option in test for more clearer intent, "
f"but get {match} in {test_file}"
)

Просмотреть файл

@ -23,7 +23,6 @@ from time import sleep, time
from unittest import mock
import psutil
import pytest
from airflow import settings
from airflow.cli import cli_parser
@ -35,10 +34,9 @@ from tests.test_utils.config import conf_vars
class TestGunicornMonitor(unittest.TestCase):
def setUp(self,) -> None:
self.gunicorn_master_proc = mock.Mock(pid=2137)
def setUp(self) -> None:
self.monitor = GunicornMonitor(
gunicorn_master_proc=self.gunicorn_master_proc,
gunicorn_master_pid=1,
num_workers_expected=4,
master_timeout=60,
worker_refresh_interval=60,
@ -132,7 +130,7 @@ class TestGunicornMonitorGeneratePluginState(unittest.TestCase):
self._prepare_test_file(f"{tempdir}/file3.txt", 300)
monitor = GunicornMonitor(
gunicorn_master_proc=mock.MagicMock(),
gunicorn_master_pid=1,
num_workers_expected=4,
master_timeout=60,
worker_refresh_interval=60,
@ -179,12 +177,11 @@ class TestCLIGetNumReadyWorkersRunning(unittest.TestCase):
cls.parser = cli_parser.get_parser()
def setUp(self):
self.gunicorn_master_proc = mock.Mock(pid=2137)
self.children = mock.MagicMock()
self.child = mock.MagicMock()
self.process = mock.MagicMock()
self.monitor = GunicornMonitor(
gunicorn_master_proc=self.gunicorn_master_proc,
gunicorn_master_pid=1,
num_workers_expected=4,
master_timeout=60,
worker_refresh_interval=60,
@ -220,7 +217,6 @@ class TestCLIGetNumReadyWorkersRunning(unittest.TestCase):
self.assertEqual(self.monitor._get_num_ready_workers_running(), 0)
@pytest.mark.quarantined
class TestCliWebServer(unittest.TestCase):
@classmethod
def setUpClass(cls):
@ -231,17 +227,24 @@ class TestCliWebServer(unittest.TestCase):
self._clean_pidfiles()
def _check_processes(self):
try:
# Confirm that webserver hasn't been launched.
# pgrep returns exit status 1 if no process matched.
self.assertEqual(1, subprocess.Popen(["pgrep", "--full", "--count", "airflow webserver"]).wait())
self.assertEqual(1, subprocess.Popen(["pgrep", "--count", "gunicorn"]).wait())
except: # noqa: E722
# Confirm that webserver hasn't been launched.
# pgrep returns exit status 1 if no process matched.
exit_code_pgrep_webserver = subprocess.Popen(["pgrep", "-c", "-f", "airflow webserver"]).wait()
exit_code_pgrep_gunicorn = subprocess.Popen(["pgrep", "-c", "-f", "gunicorn"]).wait()
if exit_code_pgrep_webserver != 1 or exit_code_pgrep_gunicorn != 1:
subprocess.Popen(["ps", "-ax"]).wait()
raise
if exit_code_pgrep_webserver != 1:
subprocess.Popen(["pkill", "-9", "-f", "airflow webserver"]).wait()
if exit_code_pgrep_gunicorn != 1:
subprocess.Popen(["pkill", "-9", "-f", "gunicorn"]).wait()
raise AssertionError(
"Background processes are running that prevent the test from passing successfully."
)
def tearDown(self) -> None:
self._check_processes()
self._clean_pidfiles()
def _clean_pidfiles(self):
pidfile_webserver = setup_locations("webserver")[0]
@ -252,49 +255,101 @@ class TestCliWebServer(unittest.TestCase):
os.remove(pidfile_monitor)
def _wait_pidfile(self, pidfile):
start_time = time()
while True:
try:
with open(pidfile) as file:
return int(file.read())
except Exception: # pylint: disable=broad-except
if start_time - time() > 60:
raise
sleep(1)
def test_cli_webserver_foreground(self):
# Run webserver in foreground and terminate it.
proc = subprocess.Popen(["airflow", "webserver"])
proc.terminate()
proc.wait()
with mock.patch.dict(
"os.environ",
AIRFLOW__CORE__DAGS_FOLDER="/dev/null",
AIRFLOW__CORE__LOAD_EXAMPLES="False",
AIRFLOW__WEBSERVER__WORKERS="1"
):
# Run webserver in foreground and terminate it.
proc = subprocess.Popen(["airflow", "webserver"])
self.assertEqual(None, proc.poll())
def test_cli_webserver_foreground_with_pid(self):
# Run webserver in foreground with --pid option
pidfile = tempfile.mkstemp()[1]
proc = subprocess.Popen(["airflow", "webserver", "--pid", pidfile])
# Check the file specified by --pid option exists
self._wait_pidfile(pidfile)
# Wait for process
sleep(10)
# Terminate webserver
proc.terminate()
proc.wait()
# -15 - the server was stopped before it started
# 0 - the server terminated correctly
self.assertIn(proc.wait(60), (-15, 0))
def test_cli_webserver_foreground_with_pid(self):
with tempfile.TemporaryDirectory(prefix='tmp-pid') as tmpdir:
pidfile = "{}/pidfile".format(tmpdir)
with mock.patch.dict(
"os.environ",
AIRFLOW__CORE__DAGS_FOLDER="/dev/null",
AIRFLOW__CORE__LOAD_EXAMPLES="False",
AIRFLOW__WEBSERVER__WORKERS="1"
):
proc = subprocess.Popen(["airflow", "webserver", "--pid", pidfile])
self.assertEqual(None, proc.poll())
# Check the file specified by --pid option exists
self._wait_pidfile(pidfile)
# Terminate webserver
proc.terminate()
self.assertEqual(0, proc.wait(60))
def test_cli_webserver_background(self):
pidfile_webserver = setup_locations("webserver")[0]
pidfile_monitor = setup_locations("webserver-monitor")[0]
with tempfile.TemporaryDirectory(prefix="gunicorn") as tmpdir, \
mock.patch.dict(
"os.environ",
AIRFLOW__CORE__DAGS_FOLDER="/dev/null",
AIRFLOW__CORE__LOAD_EXAMPLES="False",
AIRFLOW__WEBSERVER__WORKERS="1"):
pidfile_webserver = "{}/pidflow-webserver.pid".format(tmpdir)
pidfile_monitor = "{}/pidflow-webserver-monitor.pid".format(tmpdir)
stdout = "{}/airflow-webserver.out".format(tmpdir)
stderr = "{}/airflow-webserver.err".format(tmpdir)
logfile = "{}/airflow-webserver.log".format(tmpdir)
try:
# Run webserver as daemon in background. Note that the wait method is not called.
proc = subprocess.Popen([
"airflow",
"webserver",
"--daemon",
"--pid", pidfile_webserver,
"--stdout", stdout,
"--stderr", stderr,
"--log-file", logfile,
])
self.assertEqual(None, proc.poll())
# Run webserver as daemon in background. Note that the wait method is not called.
subprocess.Popen(["airflow", "webserver", "--daemon"])
pid_monitor = self._wait_pidfile(pidfile_monitor)
self._wait_pidfile(pidfile_webserver)
pid_monitor = self._wait_pidfile(pidfile_monitor)
self._wait_pidfile(pidfile_webserver)
# Assert that gunicorn and its monitor are launched.
self.assertEqual(
0, subprocess.Popen(["pgrep", "-f", "-c", "airflow webserver --daemon"]).wait()
)
self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "-f", "gunicorn: master"]).wait())
# Assert that gunicorn and its monitor are launched.
self.assertEqual(0, subprocess.Popen(["pgrep", "--full", "--count", "airflow webserver"]).wait())
self.assertEqual(0, subprocess.Popen(["pgrep", "--count", "gunicorn"]).wait())
# Terminate monitor process.
proc = psutil.Process(pid_monitor)
proc.terminate()
self.assertIn(proc.wait(120), (0, None))
# Terminate monitor process.
proc = psutil.Process(pid_monitor)
proc.terminate()
proc.wait()
self._check_processes()
except Exception:
# List all logs
subprocess.Popen(["ls", "-lah", tmpdir]).wait()
# Dump all logs
subprocess.Popen(["bash", "-c", "ls {}/* | xargs -n 1 -t cat".format(tmpdir)]).wait()
raise
# Patch for causing webserver timeout
@mock.patch("airflow.cli.commands.webserver_command.GunicornMonitor._get_num_workers_running",
@ -317,4 +372,4 @@ class TestCliWebServer(unittest.TestCase):
return_code,
"webserver terminated with return code {} in debug mode".format(return_code))
proc.terminate()
proc.wait()
self.assertEqual(-15, proc.wait(60))