adapts name and behaviour of WithLogger

- logger is now accessed as a property
- logger is now memoized
- WithLogger is renamed to LoggingMixin + moved to utils.py
- mentioned the --logging-level=DEBUG option in CONTRIB.md
This commit is contained in:
Svend Vanderveken 2015-12-08 12:05:49 +01:00
Родитель 8c13c044d5
Коммит 95eacc45ba
11 изменённых файлов: 115 добавлений и 127 удалений

Просмотреть файл

@ -116,13 +116,13 @@ Once your unit test environment is setup, you should be able to simply run
For example, in order to just execute the "core" unit tests, run the following:
```
./run_unit_tests.sh tests.core:CoreTest -s
./run_unit_tests.sh tests.core:CoreTest -s --logging-level=DEBUG
```
or a single test method:
```
./run_unit_tests.sh tests.core:CoreTest.test_check_operators -s
./run_unit_tests.sh tests.core:CoreTest.test_check_operators -s --logging-level=DEBUG
```
For more information on how to run a subset of the tests, take a look at the nosetests docs.

Просмотреть файл

@ -80,7 +80,7 @@ def backfill(args):
def trigger_dag(args):
settings.log_to_stdout()
utils.log_to_stdout()
session = settings.Session()
# TODO: verify dag_id
execution_date = datetime.now()
@ -261,7 +261,7 @@ def list_tasks(args):
def test(args):
settings.log_to_stdout()
utils.log_to_stdout()
args.execution_date = dateutil.parser.parse(args.execution_date)
dagbag = DagBag(process_subdir(args.subdir))
if args.dag_id not in dagbag.dags:
@ -307,7 +307,7 @@ def clear(args):
def webserver(args):
print(settings.HEADER)
settings.log_to_stdout()
utils.log_to_stdout()
from airflow.www.app import cached_app
app = cached_app(configuration)
workers = args.workers or configuration.get('webserver', 'workers')
@ -330,7 +330,7 @@ def webserver(args):
def scheduler(args):
print(settings.HEADER)
settings.log_to_stdout()
utils.log_to_stdout()
job = jobs.SchedulerJob(
dag_id=args.dag_id,
subdir=process_subdir(args.subdir),
@ -419,7 +419,7 @@ def flower(args):
def kerberos(args):
print(settings.HEADER)
settings.log_to_stdout()
utils.log_to_stdout()
import airflow.security.kerberos
airflow.security.kerberos.run()

Просмотреть файл

@ -1,13 +1,12 @@
from builtins import range
from airflow import configuration
from airflow.settings import WithLogger
from airflow.utils import State
from airflow.utils import State, LoggingMixin
PARALLELISM = configuration.getint('core', 'PARALLELISM')
class BaseExecutor(WithLogger):
class BaseExecutor(LoggingMixin):
def __init__(self, parallelism=PARALLELISM):
"""
@ -32,7 +31,7 @@ class BaseExecutor(WithLogger):
def queue_command(self, key, command, priority=1, queue=None):
if key not in self.queued_tasks and key not in self.running:
self.log_info("Adding to queue: {}".format(command))
self.logger.info("Adding to queue: {}".format(command))
self.queued_tasks[key] = (command, priority, queue)
def queue_task_instance(
@ -63,7 +62,7 @@ class BaseExecutor(WithLogger):
def heartbeat(self):
# Calling child class sync method
self.log_debug("Calling the {} sync method".format(self.__class__))
self.logger.debug("Calling the {} sync method".format(self.__class__))
self.sync()
# Triggering new jobs
@ -72,9 +71,9 @@ class BaseExecutor(WithLogger):
else:
open_slots = self.parallelism - len(self.running)
self.log_debug("{} running task instances".format(len(self.running)))
self.log_debug("{} in queue".format(len(self.queued_tasks)))
self.log_debug("{} open slots".format(open_slots))
self.logger.debug("{} running task instances".format(len(self.running)))
self.logger.debug("{} in queue".format(len(self.queued_tasks)))
self.logger.debug("{} open slots".format(open_slots))
sorted_queue = sorted(
[(k, v) for k, v in self.queued_tasks.items()],

Просмотреть файл

@ -58,14 +58,14 @@ class CeleryExecutor(BaseExecutor):
self.last_state = {}
def execute_async(self, key, command, queue=DEFAULT_QUEUE):
self.log_info( "[celery] queuing {key} through celery, "
self.logger.info( "[celery] queuing {key} through celery, "
"queue={queue}".format(**locals()))
self.tasks[key] = execute_command.apply_async(
args=[command], queue=queue)
self.last_state[key] = celery_states.PENDING
def sync(self):
self.log_debug(
self.logger.debug(
"Inquiring about {} celery task(s)".format(len(self.tasks)))
for key, async in list(self.tasks.items()):
state = async.state
@ -83,7 +83,7 @@ class CeleryExecutor(BaseExecutor):
del self.tasks[key]
del self.last_state[key]
else:
self.log_info("Unexpected state: " + async.state)
self.logger.info("Unexpected state: " + async.state)
self.last_state[key] = async.state
def end(self, synchronous=False):

Просмотреть файл

@ -6,13 +6,12 @@ from builtins import range
from airflow import configuration
from airflow.executors.base_executor import BaseExecutor
from airflow.utils import State
from airflow.settings import WithLogger
from airflow.utils import State, LoggingMixin
PARALLELISM = configuration.get('core', 'PARALLELISM')
class LocalWorker(multiprocessing.Process, WithLogger):
class LocalWorker(multiprocessing.Process, LoggingMixin):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
@ -26,7 +25,7 @@ class LocalWorker(multiprocessing.Process, WithLogger):
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
self.log_info("{} running {}".format(
self.logger.info("{} running {}".format(
self.__class__.__name__, command))
command = "exec bash -c '{0}'".format(command)
try:
@ -34,7 +33,7 @@ class LocalWorker(multiprocessing.Process, WithLogger):
state = State.SUCCESS
except Exception as e:
state = State.FAILED
self.log_error("failed to execute task {}:".format(str(e)))
self.logger.error("failed to execute task {}:".format(str(e)))
# raise e
self.result_queue.put((key, state))
self.task_queue.task_done()

Просмотреть файл

@ -24,7 +24,7 @@ class SequentialExecutor(BaseExecutor):
def sync(self):
for key, command in self.commands_to_run:
self.log_info("Executing command: {}".format(command))
self.logger.info("Executing command: {}".format(command))
try:
sp = subprocess.Popen(command, shell=True)
sp.wait()

Просмотреть файл

@ -21,8 +21,7 @@ from sqlalchemy.orm.session import make_transient
from airflow import executors, models, settings, utils
from airflow import configuration
from airflow.settings import WithLogger
from airflow.utils import AirflowException, State
from airflow.utils import AirflowException, State, LoggingMixin
Base = models.Base
@ -38,7 +37,7 @@ if configuration.getboolean('scheduler', 'statsd_on'):
prefix=configuration.get('scheduler', 'statsd_prefix'))
class BaseJob(Base, WithLogger):
class BaseJob(Base, LoggingMixin):
"""
Abstract class to be derived for jobs. Jobs are processing items with state
and duration that aren't task instances. For instance a BackfillJob is
@ -95,7 +94,7 @@ class BaseJob(Base, WithLogger):
try:
self.on_kill()
except:
self.log_error('on_kill() method failed')
self.logger.error('on_kill() method failed')
session.merge(job)
session.commit()
session.close()
@ -148,7 +147,7 @@ class BaseJob(Base, WithLogger):
session.close()
self.heartbeat_callback()
self.log_debug('[heart] Boom.')
self.logger.debug('[heart] Boom.')
def run(self):
if statsd:
@ -445,7 +444,7 @@ class SchedulerJob(BaseJob):
active_runs = dag.get_active_runs()
self.log_info('Getting list of tasks to skip for active runs.')
self.logger.info('Getting list of tasks to skip for active runs.')
skip_tis = set()
if active_runs:
qry = (
@ -459,7 +458,7 @@ class SchedulerJob(BaseJob):
skip_tis = {(ti[0], ti[1]) for ti in qry.all()}
descartes = [obj for obj in product(dag.tasks, active_runs)]
self.log_info('Checking dependencies on {} tasks instances, minus {} '
self.logger.info('Checking dependencies on {} tasks instances, minus {} '
'skippable ones'.format(len(descartes), len(skip_tis)))
for task, dttm in descartes:
if task.adhoc or (task.task_id, dttm) in skip_tis:
@ -470,11 +469,11 @@ class SchedulerJob(BaseJob):
State.RUNNING, State.QUEUED, State.SUCCESS, State.FAILED):
continue
elif ti.is_runnable(flag_upstream_failed=True):
self.log_debug('Firing task: {}'.format(ti))
self.logger.debug('Firing task: {}'.format(ti))
executor.queue_task_instance(ti, pickle_id=pickle_id)
# Releasing the lock
self.log_debug("Unlocking DAG (scheduler_lock)")
self.logger.debug("Unlocking DAG (scheduler_lock)")
db_dag = (
session.query(DagModel)
.filter(DagModel.dag_id == dag.dag_id)
@ -497,17 +496,17 @@ class SchedulerJob(BaseJob):
.filter(TI.state == State.QUEUED)
.all()
)
self.log_info("Prioritizing {} queued jobs".format(len(queued_tis)))
self.logger.info("Prioritizing {} queued jobs".format(len(queued_tis)))
session.expunge_all()
d = defaultdict(list)
for ti in queued_tis:
if ti.dag_id not in dagbag.dags:
self.log_info("DAG not longer in dagbag, "
self.logger.info("DAG not longer in dagbag, "
"deleting {}".format(ti))
session.delete(ti)
session.commit()
elif not dagbag.dags[ti.dag_id].has_task(ti.task_id):
self.log_info("Task not longer exists, deleting {}".format(ti))
self.logger.info("Task not longer exists, deleting {}".format(ti))
session.delete(ti)
session.commit()
else:
@ -523,7 +522,7 @@ class SchedulerJob(BaseJob):
open_slots = pools[pool].open_slots(session=session)
queue_size = len(tis)
self.log_info("Pool {pool} has {open_slots} slots, {queue_size} "
self.logger.info("Pool {pool} has {open_slots} slots, {queue_size} "
"task instances in queue".format(**locals()))
if not open_slots:
continue
@ -536,7 +535,7 @@ class SchedulerJob(BaseJob):
try:
task = dagbag.dags[ti.dag_id].get_task(ti.task_id)
except:
self.log_error("Queued task {} seems gone".format(ti))
self.logger.error("Queued task {} seems gone".format(ti))
session.delete(ti)
session.commit()
continue
@ -552,7 +551,7 @@ class SchedulerJob(BaseJob):
if self.do_pickle and self.executor.__class__ not in (
executors.LocalExecutor,
executors.SequentialExecutor):
self.log_info("Pickling DAG {}".format(dag))
self.logger.info("Pickling DAG {}".format(dag))
pickle_id = dag.pickle(session).id
if dag.dag_id in overloaded_dags or dag.concurrency_reached:
@ -573,14 +572,14 @@ class SchedulerJob(BaseJob):
dag_id = self.dag_id
def signal_handler(signum, frame):
self.log_error("SIGINT (ctrl-c) received")
self.logger.error("SIGINT (ctrl-c) received")
sys.exit(1)
signal.signal(signal.SIGINT, signal_handler)
utils.pessimistic_connection_handling()
logging.basicConfig(level=logging.DEBUG)
self.log_info("Starting the scheduler")
self.logger.info("Starting the scheduler")
dagbag = models.DagBag(self.subdir, sync_to_db=True)
executor = dagbag.executor
@ -592,7 +591,7 @@ class SchedulerJob(BaseJob):
try:
self.prioritize_queued(executor=executor, dagbag=dagbag)
except Exception as e:
self.log_exception(e)
self.logger.exception(e)
i += 1
try:
@ -601,7 +600,7 @@ class SchedulerJob(BaseJob):
else:
dagbag.collect_dags(only_if_updated=True)
except:
self.log_error("Failed at reloading the dagbag")
self.logger.error("Failed at reloading the dagbag")
if statsd:
statsd.incr('dag_refresh_error', 1, 1)
sleep(5)
@ -613,7 +612,7 @@ class SchedulerJob(BaseJob):
dag for dag in dagbag.dags.values() if not dag.parent_dag]
paused_dag_ids = dagbag.paused_dags()
for dag in dags:
self.log_debug("Scheduling {}".format(dag.dag_id))
self.logger.debug("Scheduling {}".format(dag.dag_id))
dag = dagbag.get_dag(dag.dag_id)
if not dag or (dag.dag_id in paused_dag_ids):
continue
@ -622,28 +621,28 @@ class SchedulerJob(BaseJob):
self.process_dag(dag, executor)
self.manage_slas(dag)
except Exception as e:
self.log_exception(e)
self.log_info("Done queuing tasks, calling the executor's "
self.logger.exception(e)
self.logger.info("Done queuing tasks, calling the executor's "
"heartbeat")
duration_sec = (datetime.now() - loop_start_dttm).total_seconds()
self.log_info("Loop took: {} seconds".format(duration_sec))
self.logger.info("Loop took: {} seconds".format(duration_sec))
try:
self.import_errors(dagbag)
except Exception as e:
self.log_exception(e)
self.logger.exception(e)
try:
dagbag.kill_zombies()
except Exception as e:
self.log_exception(e)
self.logger.exception(e)
try:
# We really just want the scheduler to never ever stop.
executor.heartbeat()
self.heartbeat()
except Exception as e:
self.log_exception(e)
self.log_error("Tachycardia!")
self.logger.exception(e)
self.logger.error("Tachycardia!")
except Exception as deep_e:
self.log_exception(deep_e)
self.logger.exception(deep_e)
executor.end()
def heartbeat_callback(self):
@ -755,10 +754,10 @@ class BackfillJob(BaseJob):
state == State.FAILED):
if ti.state == State.FAILED or state == State.FAILED:
failed.append(key)
self.log_error("Task instance " + str(key) + " failed")
self.logger.error("Task instance " + str(key) + " failed")
elif ti.state == State.SKIPPED:
wont_run.append(key)
self.log_error("Skipping " + str(key) + " failed")
self.logger.error("Skipping " + str(key) + " failed")
tasks_to_run.pop(key)
# Removing downstream tasks that also shouldn't run
for t in self.dag.get_task(task_id).get_flat_relatives(
@ -773,7 +772,7 @@ class BackfillJob(BaseJob):
elif (
ti.state not in (State.SUCCESS, State.QUEUED) and
state == State.SUCCESS):
self.log_error(
self.logger.error(
"The airflow run command failed "
"at reporting an error. This should not occur "
"in normal circustances. State is {}".format(ti.state))
@ -790,7 +789,7 @@ class BackfillJob(BaseJob):
len(started),
len(failed),
len(wont_run))
self.log_info(msg)
self.logger.info(msg)
executor.end()
session.close()
@ -800,7 +799,7 @@ class BackfillJob(BaseJob):
"Some tasks instances failed, "
"here's the list:\n{}".format(failed))
raise AirflowException(msg)
self.log_info("All done. Exiting.")
self.logger.info("All done. Exiting.")
class LocalTaskJob(BaseJob):

Просмотреть файл

@ -5,8 +5,6 @@ from __future__ import unicode_literals
from future.standard_library import install_aliases
from airflow.settings import WithLogger
install_aliases()
from builtins import str
from builtins import object, bytes
@ -42,7 +40,7 @@ from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor
from airflow import configuration
from airflow.utils import (
AirflowException, State, apply_defaults, provide_session,
is_container, as_tuple, TriggerRule)
is_container, as_tuple, TriggerRule, LoggingMixin)
Base = declarative_base()
ID_LEN = 250
@ -92,7 +90,7 @@ def clear_task_instances(tis, session, activate_dag_runs=True):
dr.state = State.RUNNING
class DagBag(WithLogger):
class DagBag(LoggingMixin):
"""
A dagbag is a collection of dags, parsed out of a folder tree and has high
level configuration settings, like what database to use as a backend and
@ -122,7 +120,7 @@ class DagBag(WithLogger):
sync_to_db=False):
dag_folder = dag_folder or DAGS_FOLDER
self.log_info("Filling up the DagBag from {}".format(dag_folder))
self.logger.info("Filling up the DagBag from {}".format(dag_folder))
self.dag_folder = dag_folder
self.dags = {}
self.sync_to_db = sync_to_db
@ -195,13 +193,13 @@ class DagBag(WithLogger):
filepath not in self.file_last_changed or
dttm != self.file_last_changed[filepath]):
try:
self.log_info("Importing " + filepath)
self.logger.info("Importing " + filepath)
if mod_name in sys.modules:
del sys.modules[mod_name]
with utils.timeout(30):
m = imp.load_source(mod_name, filepath)
except Exception as e:
self.log_exception("Failed to import: " + filepath)
self.logger.exception("Failed to import: " + filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = dttm
return
@ -220,11 +218,11 @@ class DagBag(WithLogger):
Fails tasks that haven't had a heartbeat in too long
"""
from airflow.jobs import LocalTaskJob as LJ
self.log_info("Finding 'running' jobs without a recent heartbeat")
self.logger.info("Finding 'running' jobs without a recent heartbeat")
secs = (
configuration.getint('scheduler', 'job_heartbeat_sec') * 3) + 120
limit_dttm = datetime.now() - timedelta(seconds=secs)
self.log_info(
self.logger.info(
"Failing jobs without heartbeat after {}".format(limit_dttm))
jobs = (
session
@ -237,14 +235,14 @@ class DagBag(WithLogger):
for job in jobs:
ti = session.query(TaskInstance).filter_by(
job_id=job.id, state=State.RUNNING).first()
self.log_info("Failing job_id '{}'".format(job.id))
self.logger.info("Failing job_id '{}'".format(job.id))
if ti and ti.dag_id in self.dags:
dag = self.dags[ti.dag_id]
if ti.task_id in dag.task_ids:
task = dag.get_task(ti.task_id)
ti.task = task
ti.handle_failure("{} killed as zombie".format(ti))
self.log_info('Marked zombie job {} as failed'.format(ti))
self.logger.info('Marked zombie job {} as failed'.format(ti))
else:
job.state = State.FAILED
session.commit()
@ -280,7 +278,7 @@ class DagBag(WithLogger):
subdag.fileloc = root_dag.full_filepath
subdag.is_subdag = True
self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
self.log_info('Loaded DAG {dag}'.format(**locals()))
self.logger.info('Loaded DAG {dag}'.format(**locals()))
def collect_dags(
self,
@ -1902,7 +1900,7 @@ class DagModel(Base):
@functools.total_ordering
class DAG(WithLogger):
class DAG(LoggingMixin):
"""
A dag (directed acyclic graph) is a collection of tasks with directional
dependencies. A dag also has a schedule, a start end an end date
@ -2138,7 +2136,7 @@ class DAG(WithLogger):
.all()
)
for run in active_runs:
self.log_info("Checking state for {}".format(run))
self.logger.info("Checking state for {}".format(run))
task_instances = session.query(TI).filter(
TI.dag_id == run.dag_id,
TI.task_id.in_(self.task_ids),
@ -2147,13 +2145,13 @@ class DAG(WithLogger):
if len(task_instances) == len(self.tasks):
task_states = [ti.state for ti in task_instances]
if State.FAILED in task_states:
self.log_info('Marking run {} failed'.format(run))
self.logger.info('Marking run {} failed'.format(run))
run.state = State.FAILED
elif len(
set(task_states) |
set([State.SUCCESS, State.SKIPPED])
) == 2:
self.log_info('Marking run {} successful'.format(run))
self.logger.info('Marking run {} successful'.format(run))
run.state = State.SUCCESS
else:
active_dates.append(run.execution_date)

Просмотреть файл

@ -5,7 +5,6 @@ from __future__ import unicode_literals
import logging
import os
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
@ -42,48 +41,6 @@ LOG_FORMAT = (
SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s'
def log_to_stdout():
root_logger = logging.getLogger()
# default log level if not set externally (e.g. with --logging-level=DEBUG)
if root_logger.level == logging.NOTSET:
root_logger.setLevel(LOGGING_LEVEL)
for handler in root_logger.handlers:
if isinstance(handler, logging.StreamHandler):
root_logger.warn("not adding a stream handler: already present")
return
logformat = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logformat)
root_logger.addHandler(ch)
class WithLogger(object):
"""
Convenience super-class to have a logger configured with the class name
"""
def logger(self):
return logging.getLogger(self.__class__.__name__)
def log_debug(self, *arg, **kwargs):
self.logger().debug(*arg, **kwargs)
def log_info(self, *arg, **kwargs):
self.logger().info(*arg, **kwargs)
def log_error(self, *arg, **kwargs):
self.logger().error(*arg, **kwargs)
def log_exception(self, *arg, **kwargs):
self.logger().exception(*arg, **kwargs)
def policy(task_instance):
"""
This policy setting allows altering task instances right before they

Просмотреть файл

@ -3,6 +3,7 @@ from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import sys
from builtins import str, input, object
from past.builtins import basestring
from copy import copy
@ -39,6 +40,7 @@ from croniter import croniter
from airflow import settings
from airflow import configuration
from airflow.settings import LOGGING_LEVEL
class AirflowException(Exception):
@ -747,3 +749,36 @@ class AirflowJsonEncoder(json.JSONEncoder):
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
def log_to_stdout():
root_logger = logging.getLogger()
# default log level if not set externally (e.g. with --logging-level=DEBUG)
if root_logger.level == logging.NOTSET:
root_logger.setLevel(LOGGING_LEVEL)
for handler in root_logger.handlers:
if isinstance(handler, logging.StreamHandler):
root_logger.warn("not adding a stream handler: already present")
return
logformat = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logformat)
root_logger.addHandler(ch)
class LoggingMixin(object):
"""
Convenience super-class to have a logger configured with the class name
"""
@property
def logger(self):
if not hasattr(self, "_logger"):
self._logger = logging.getLogger(self.__class__.__name__)
return self._logger

Просмотреть файл

@ -1,13 +1,13 @@
from __future__ import print_function
from datetime import datetime, time, timedelta
import doctest
import json
import logging
import os
import re
from time import sleep
import unittest
import logging
from datetime import datetime, time, timedelta
from time import sleep
from airflow import configuration
from airflow.executors import SequentialExecutor, LocalExecutor
@ -18,7 +18,8 @@ from airflow import jobs, models, DAG, utils, operators, hooks, macros, settings
from airflow.hooks import BaseHook
from airflow.bin import cli
from airflow.www import app as application
from airflow.settings import Session, WithLogger
from airflow.settings import Session
from airflow.utils import LoggingMixin
from lxml import html
NUM_EXAMPLE_DAGS = 7
@ -397,8 +398,8 @@ class CoreTest(unittest.TestCase):
def test_calling_log_to_stdout_2X_should_add_only_one_stream_handler(self):
settings.log_to_stdout()
settings.log_to_stdout()
utils.log_to_stdout()
utils.log_to_stdout()
root_logger = logging.getLogger()
stream_handlers = [h for h in root_logger.handlers
@ -414,7 +415,7 @@ class CoreTest(unittest.TestCase):
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
settings.log_to_stdout()
utils.log_to_stdout()
assert root_logger.level == logging.DEBUG
@ -422,12 +423,12 @@ class CoreTest(unittest.TestCase):
# each class should automatically receive a logger with a correct name
class Blah(WithLogger):
class Blah(LoggingMixin):
pass
assert Blah().logger().name == "Blah"
assert SequentialExecutor().logger().name == "SequentialExecutor"
assert LocalExecutor().logger().name == "LocalExecutor"
assert Blah().logger.name == "Blah"
assert SequentialExecutor().logger.name == "SequentialExecutor"
assert LocalExecutor().logger.name == "LocalExecutor"
class CliTests(unittest.TestCase):