[AIRFLOW-3414] Fix reload_module in DagFileProcessorAgent (#4253)

This commit is contained in:
Kevin Yang 2018-12-02 09:45:08 -08:00 коммит произвёл Kaxil Naik
Родитель 1596ecab75
Коммит ded25e16c1
4 изменённых файлов: 152 добавлений и 7 удалений

Просмотреть файл

@ -57,9 +57,9 @@ def configure_logging():
.format(logging_class_path, err)
)
else:
from airflow.config_templates.airflow_local_settings import (
DEFAULT_LOGGING_CONFIG as logging_config
)
logging_class_path = 'airflow.config_templates.' \
'airflow_local_settings.DEFAULT_LOGGING_CONFIG'
logging_config = import_string(logging_class_path)
log.debug('Unable to load custom logging, using default config instead')
try:
@ -73,7 +73,7 @@ def configure_logging():
validate_logging_config(logging_config)
return logging_config
return logging_class_path
def validate_logging_config(logging_config):

Просмотреть файл

@ -261,7 +261,7 @@ try:
except Exception:
pass
configure_logging()
logging_class_path = configure_logging()
configure_vars()
configure_adapters()
# The webservers import this file from models.py with the default settings.

Просмотреть файл

@ -34,6 +34,7 @@ from abc import ABCMeta, abstractmethod
from collections import defaultdict
from collections import namedtuple
from datetime import timedelta
from importlib import import_module
import psutil
from six.moves import range, reload_module
@ -45,6 +46,7 @@ import airflow.models
from airflow import configuration as conf
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.exceptions import AirflowException
from airflow.settings import logging_class_path
from airflow.utils import timezone
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
@ -539,7 +541,9 @@ class DagFileProcessorAgent(LoggingMixin):
# e.g. RotatingFileHandler. And it can cause connection corruption if we
# do not recreate the SQLA connection pool.
os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
reload_module(airflow.config_templates.airflow_local_settings)
# Replicating the behavior of how logging module was loaded
# in logging_config.py
reload_module(import_module(logging_class_path.rsplit('.', 1)[0]))
reload_module(airflow.settings)
del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
processor_manager = DagFileProcessorManager(dag_directory,

Просмотреть файл

@ -18,12 +18,15 @@
# under the License.
import os
import sys
import tempfile
import unittest
from datetime import timedelta
from mock import MagicMock
from airflow import configuration as conf
from airflow.configuration import mkdir_p
from airflow.jobs import DagFileProcessor
from airflow.jobs import LocalTaskJob as LJ
from airflow.models import DagBag, TaskInstance as TI
@ -38,6 +41,96 @@ TEST_DAG_FOLDER = os.path.join(
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
SETTINGS_FILE_VALID = """
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': '[%%(asctime)s] {{%%(filename)s:%%(lineno)d}} %%(levelname)s - %%(message)s'
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'task': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
},
'loggers': {
'airflow': {
'handlers': ['console'],
'level': 'INFO',
'propagate': False
},
'airflow.task': {
'handlers': ['task'],
'level': 'INFO',
'propagate': False,
},
}
}
"""
SETTINGS_DEFAULT_NAME = 'custom_airflow_local_settings'
class settings_context(object):
"""
Sets a settings file and puts it in the Python classpath
:param content:
The content of the settings file
"""
def __init__(self, content, dir=None, name='LOGGING_CONFIG'):
self.content = content
self.settings_root = tempfile.mkdtemp()
filename = "{}.py".format(SETTINGS_DEFAULT_NAME)
if dir:
# Replace slashes by dots
self.module = dir.replace('/', '.') + '.' + SETTINGS_DEFAULT_NAME + '.' + name
# Create the directory structure
dir_path = os.path.join(self.settings_root, dir)
mkdir_p(dir_path)
# Add the __init__ for the directories
# This is required for Python 2.7
basedir = self.settings_root
for part in dir.split('/'):
open(os.path.join(basedir, '__init__.py'), 'w').close()
basedir = os.path.join(basedir, part)
open(os.path.join(basedir, '__init__.py'), 'w').close()
self.settings_file = os.path.join(dir_path, filename)
else:
self.module = SETTINGS_DEFAULT_NAME + '.' + name
self.settings_file = os.path.join(self.settings_root, filename)
def __enter__(self):
with open(self.settings_file, 'w') as handle:
handle.writelines(self.content)
sys.path.append(self.settings_root)
conf.set(
'core',
'logging_config_class',
self.module
)
return self.settings_file
def __exit__(self, *exc_info):
# shutil.rmtree(self.settings_root)
# Reset config
conf.set('core', 'logging_config_class', '')
sys.path.remove(self.settings_root)
class TestDagFileProcessorManager(unittest.TestCase):
def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
@ -123,6 +216,54 @@ class TestDagFileProcessorManager(unittest.TestCase):
class TestDagFileProcessorAgent(unittest.TestCase):
def test_reload_module(self):
"""
Configure the context to have core.logging_config_class set to a fake logging
class path, thus when reloading logging module the airflow.processor_manager
logger should not be configured.
"""
with settings_context(SETTINGS_FILE_VALID):
# Launch a process through DagFileProcessorAgent, which will try
# reload the logging module.
def processor_factory(file_path, zombies):
return DagFileProcessor(file_path,
False,
[],
zombies)
test_dag_path = os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py')
async_mode = 'sqlite' not in conf.get('core', 'sql_alchemy_conn')
log_file_loc = conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
try:
os.remove(log_file_loc)
except OSError:
pass
# Starting dag processing with 0 max_runs to avoid redundant operations.
processor_agent = DagFileProcessorAgent(test_dag_path,
[],
0,
processor_factory,
async_mode)
manager_process = \
processor_agent._launch_process(processor_agent._dag_directory,
processor_agent._file_paths,
processor_agent._max_runs,
processor_agent._processor_factory,
processor_agent._child_signal_conn,
processor_agent._stat_queue,
processor_agent._result_queue,
processor_agent._async_mode)
if not async_mode:
processor_agent.heartbeat()
manager_process.join()
# Since we are reloading logging config not creating this file,
# we should expect it to be nonexistent.
self.assertFalse(os.path.isfile(log_file_loc))
def test_parse_once(self):
def processor_factory(file_path, zombies):
return DagFileProcessor(file_path,
@ -164,7 +305,7 @@ class TestDagFileProcessorAgent(unittest.TestCase):
except OSError:
pass
# Starting dag processing with 0 max_runs to avoid redundent operations.
# Starting dag processing with 0 max_runs to avoid redundant operations.
processor_agent = DagFileProcessorAgent(test_dag_path,
[],
0,