[AIRFLOW-1776] Capture stdout and stderr for logging

The new logging framework was not properly
capturing stdout/stderr
output. Redirection the the correct logging
facility is required.

Closes #2745 from bolkedebruin/redirect_std
This commit is contained in:
Bolke de Bruin 2017-11-01 23:39:52 +01:00
Родитель 0e27e1b209
Коммит 5b06b66662
6 изменённых файлов: 196 добавлений и 58 удалений

Просмотреть файл

@ -54,7 +54,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
from airflow.utils import db as db_utils
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.logging_mixin import LoggingMixin, redirect_stderr, redirect_stdout
from airflow.www.app import cached_app
from sqlalchemy import func
@ -327,6 +327,7 @@ def run(args, dag=None):
# Disable connection pooling to reduce the # of connections on the DB
# while it's waiting for the task to finish.
settings.configure_orm(disable_connection_pool=True)
db_utils.pessimistic_connection_handling()
if dag:
args.dag_id = dag.dag_id
@ -377,56 +378,57 @@ def run(args, dag=None):
hostname = socket.getfqdn()
log.info("Running on host %s", hostname)
if args.local:
run_job = jobs.LocalTaskJob(
task_instance=ti,
mark_success=args.mark_success,
pickle_id=args.pickle,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=args.ignore_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
ignore_ti_state=args.force,
pool=args.pool)
run_job.run()
elif args.raw:
ti._run_raw_task(
mark_success=args.mark_success,
job_id=args.job_id,
pool=args.pool,
)
else:
pickle_id = None
if args.ship_dag:
try:
# Running remotely, so pickling the DAG
session = settings.Session()
pickle = DagPickle(dag)
session.add(pickle)
session.commit()
pickle_id = pickle.id
# TODO: This should be written to a log
print((
'Pickled dag {dag} '
'as pickle_id:{pickle_id}').format(**locals()))
except Exception as e:
print('Could not pickle the DAG')
print(e)
raise e
with redirect_stdout(log, logging.INFO), redirect_stderr(log, logging.WARN):
if args.local:
run_job = jobs.LocalTaskJob(
task_instance=ti,
mark_success=args.mark_success,
pickle_id=args.pickle,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=args.ignore_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
ignore_ti_state=args.force,
pool=args.pool)
run_job.run()
elif args.raw:
ti._run_raw_task(
mark_success=args.mark_success,
job_id=args.job_id,
pool=args.pool,
)
else:
pickle_id = None
if args.ship_dag:
try:
# Running remotely, so pickling the DAG
session = settings.Session()
pickle = DagPickle(dag)
session.add(pickle)
session.commit()
pickle_id = pickle.id
# TODO: This should be written to a log
print((
'Pickled dag {dag} '
'as pickle_id:{pickle_id}').format(**locals()))
except Exception as e:
print('Could not pickle the DAG')
print(e)
raise e
executor = GetDefaultExecutor()
executor.start()
print("Sending to executor.")
executor.queue_task_instance(
ti,
mark_success=args.mark_success,
pickle_id=pickle_id,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=args.ignore_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
ignore_ti_state=args.force,
pool=args.pool)
executor.heartbeat()
executor.end()
executor = GetDefaultExecutor()
executor.start()
print("Sending to executor.")
executor.queue_task_instance(
ti,
mark_success=args.mark_success,
pickle_id=pickle_id,
ignore_all_deps=args.ignore_all_dependencies,
ignore_depends_on_past=args.ignore_depends_on_past,
ignore_task_deps=args.ignore_dependencies,
ignore_ti_state=args.force,
pool=args.pool)
executor.heartbeat()
executor.end()
# Child processes should not flush or upload to remote
if args.raw:

Просмотреть файл

@ -78,11 +78,20 @@ DEFAULT_LOGGING_CONFIG = {
# },
},
'loggers': {
'airflow.processor' : {
'handlers': ['file.processor'],
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['file.task'],
'level': LOG_LEVEL,
@ -93,10 +102,5 @@ DEFAULT_LOGGING_CONFIG = {
'level': LOG_LEVEL,
'propagate': True,
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
}
}

Просмотреть файл

@ -54,7 +54,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
list_py_file_paths)
from airflow.utils.db import provide_session, pessimistic_connection_handling
from airflow.utils.email import send_email
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter
from airflow.utils.state import State
Base = models.Base
@ -344,6 +344,10 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
def helper():
# This helper runs in the newly created process
log = logging.getLogger("airflow.processor")
stdout = StreamLogWriter(log, logging.INFO)
stderr = StreamLogWriter(log, logging.WARN)
for handler in log.handlers:
try:
handler.set_context(file_path)
@ -353,6 +357,10 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
pass
try:
# redirect stdout/stderr to log
sys.stdout = stdout
sys.stderr = stderr
# Re-configure the ORM engine as there are issues with multiple processes
settings.configure_orm()
@ -376,6 +384,9 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
# Log exceptions through the logging framework.
log.exception("Got an exception! Propagating...")
raise
finally:
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
p = multiprocessing.Process(target=helper,
args=(),

Просмотреть файл

@ -34,6 +34,7 @@ import itertools
import zipfile
import jinja2
import json
import logging
import os
import pickle
import re
@ -798,6 +799,7 @@ class TaskInstance(Base, LoggingMixin):
self.state = state
self.hostname = ''
self.init_on_load()
self._log = logging.getLogger("airflow.task")
@reconstructor
def init_on_load(self):

Просмотреть файл

@ -18,8 +18,11 @@ from __future__ import print_function
from __future__ import unicode_literals
import logging
import sys
import warnings
from builtins import object
from contextlib import contextmanager
class LoggingMixin(object):
@ -59,3 +62,68 @@ class LoggingMixin(object):
handler.set_context(task_instance)
except AttributeError:
pass
class StreamLogWriter(object):
encoding = False
"""
Allows to redirect stdout and stderr to logger
"""
def __init__(self, logger, level):
"""
:param log: The log level method to write to, ie. log.debug, log.warning
:return:
"""
self.logger = logger
self.level = level
self._buffer = str()
def write(self, message):
"""
Do whatever it takes to actually log the specified logging record
:param message: message to log
"""
if not message.endswith("\n"):
self._buffer += message
else:
self._buffer += message
self.logger.log(self.level, self._buffer)
self._buffer = str()
def flush(self):
"""
Ensure all logging output has been flushed
"""
if len(self._buffer) > 0:
self.logger.log(self.level, self._buffer)
self._buffer = str()
def isatty(self):
"""
Returns False to indicate the fd is not connected to a tty(-like) device.
For compatibility reasons.
"""
return False
@contextmanager
def redirect_stdout(logger, level):
writer = StreamLogWriter(logger, level)
try:
sys.stdout = writer
yield
finally:
sys.stdout = sys.__stdout__
@contextmanager
def redirect_stderr(logger, level):
writer = StreamLogWriter(logger, level)
try:
sys.stderr = writer
yield
finally:
sys.stderr = sys.__stderr__

Просмотреть файл

@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import unittest
import warnings
from airflow.operators.bash_operator import BashOperator
from airflow.utils.log.logging_mixin import StreamLogWriter
from tests.test_utils.reset_warning_registry import reset_warning_registry
@ -48,3 +50,52 @@ class TestLoggingMixin(unittest.TestCase):
def tearDown(self):
warnings.resetwarnings()
class TestStreamLogWriter(unittest.TestCase):
def test_write(self):
logger = mock.MagicMock()
logger.log = mock.MagicMock()
log = StreamLogWriter(logger, 1)
msg = "test_message"
log.write(msg)
self.assertEqual(log._buffer, msg)
log.write("\n")
logger.log.assert_called_once_with(1, msg + "\n")
self.assertEqual(log._buffer, "")
def test_flush(self):
logger = mock.MagicMock()
logger.log = mock.MagicMock()
log = StreamLogWriter(logger, 1)
msg = "test_message"
log.write(msg)
self.assertEqual(log._buffer, msg)
log.flush()
logger.log.assert_called_once_with(1, msg)
self.assertEqual(log._buffer, "")
def test_isatty(self):
logger = mock.MagicMock()
logger.log = mock.MagicMock()
log = StreamLogWriter(logger, 1)
self.assertFalse(log.isatty())
def test_encoding(self):
logger = mock.MagicMock()
logger.log = mock.MagicMock()
log = StreamLogWriter(logger, 1)
self.assertFalse(log.encoding)