diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index c23b27273b..782e58d96a 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -362,15 +362,10 @@ def run(args, dag=None): task = dag.get_task(task_id=args.task_id) ti = TaskInstance(task, args.execution_date) ti.refresh_from_db() - - log = logging.getLogger('airflow.task') - if args.raw: - log = logging.getLogger('airflow.task.raw') - - set_context(log, ti) + ti.init_run_context() hostname = socket.getfqdn() - log.info("Running on host %s", hostname) + log.info("Running %s on host %s", ti, hostname) with redirect_stdout(log, logging.INFO), redirect_stderr(log, logging.WARN): if args.local: @@ -428,13 +423,7 @@ def run(args, dag=None): if args.raw: return - # Force the log to flush. The flush is important because we - # might subsequently read from the log to insert into S3 or - # Google cloud storage. Explicitly close the handler is - # needed in order to upload to remote storage services. - for handler in log.handlers: - handler.flush() - handler.close() + logging.shutdown() def task_failed_deps(args): diff --git a/airflow/models.py b/airflow/models.py index 22cf9f08fa..5837363bd9 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1855,6 +1855,12 @@ class TaskInstance(Base, LoggingMixin): TI.state == State.RUNNING ).count() + def init_run_context(self): + """ + Sets the log context. + """ + self._set_context(self) + class TaskFail(Base): """ diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index 892fae552d..03437bf740 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -30,8 +30,7 @@ class LoggingMixin(object): Convenience super-class to have a logger configured with the class name """ def __init__(self, context=None): - if context is not None: - set_context(self.log, context) + self._set_context(context) # We want to deprecate the logger property in Airflow 2.0 # The log property is the de facto standard in most programming languages @@ -56,6 +55,10 @@ class LoggingMixin(object): ) return self._log + def _set_context(self, context): + if context is not None: + set_context(self.log, context) + class StreamLogWriter(object): encoding = False