[AIRFLOW-1879] Handle ti log entirely within ti
Previously logging was setup outside a TaskInstance, this puts everything inside. Also propery closes the logging. Closes #2837 from bolkedebruin/AIRFLOW-1879
This commit is contained in:
Родитель
06b41fbe1b
Коммит
301ce6b4f0
|
@ -362,15 +362,10 @@ def run(args, dag=None):
|
|||
task = dag.get_task(task_id=args.task_id)
|
||||
ti = TaskInstance(task, args.execution_date)
|
||||
ti.refresh_from_db()
|
||||
|
||||
log = logging.getLogger('airflow.task')
|
||||
if args.raw:
|
||||
log = logging.getLogger('airflow.task.raw')
|
||||
|
||||
set_context(log, ti)
|
||||
ti.init_run_context()
|
||||
|
||||
hostname = socket.getfqdn()
|
||||
log.info("Running on host %s", hostname)
|
||||
log.info("Running %s on host %s", ti, hostname)
|
||||
|
||||
with redirect_stdout(log, logging.INFO), redirect_stderr(log, logging.WARN):
|
||||
if args.local:
|
||||
|
@ -428,13 +423,7 @@ def run(args, dag=None):
|
|||
if args.raw:
|
||||
return
|
||||
|
||||
# Force the log to flush. The flush is important because we
|
||||
# might subsequently read from the log to insert into S3 or
|
||||
# Google cloud storage. Explicitly close the handler is
|
||||
# needed in order to upload to remote storage services.
|
||||
for handler in log.handlers:
|
||||
handler.flush()
|
||||
handler.close()
|
||||
logging.shutdown()
|
||||
|
||||
|
||||
def task_failed_deps(args):
|
||||
|
|
|
@ -1855,6 +1855,12 @@ class TaskInstance(Base, LoggingMixin):
|
|||
TI.state == State.RUNNING
|
||||
).count()
|
||||
|
||||
def init_run_context(self):
|
||||
"""
|
||||
Sets the log context.
|
||||
"""
|
||||
self._set_context(self)
|
||||
|
||||
|
||||
class TaskFail(Base):
|
||||
"""
|
||||
|
|
|
@ -30,8 +30,7 @@ class LoggingMixin(object):
|
|||
Convenience super-class to have a logger configured with the class name
|
||||
"""
|
||||
def __init__(self, context=None):
|
||||
if context is not None:
|
||||
set_context(self.log, context)
|
||||
self._set_context(context)
|
||||
|
||||
# We want to deprecate the logger property in Airflow 2.0
|
||||
# The log property is the de facto standard in most programming languages
|
||||
|
@ -56,6 +55,10 @@ class LoggingMixin(object):
|
|||
)
|
||||
return self._log
|
||||
|
||||
def _set_context(self, context):
|
||||
if context is not None:
|
||||
set_context(self.log, context)
|
||||
|
||||
|
||||
class StreamLogWriter(object):
|
||||
encoding = False
|
||||
|
|
Загрузка…
Ссылка в новой задаче