Using a decorator to specify operators default args

This commit is contained in:
Maxime Beauchemin 2014-11-19 22:14:33 +00:00
Родитель 8e4c3762c3
Коммит a29f316bd0
7 изменённых файлов: 57 добавлений и 5 удалений

Просмотреть файл

@ -22,7 +22,8 @@ from airflow.configuration import getconf
from airflow import settings
from airflow import utils
import socket
from utils import State
from airflow.utils import State
from airflow.utils import apply_defaults
Base = declarative_base()
ID_LEN = getconf().getint('misc', 'ID_LEN')
@ -700,6 +701,7 @@ class BaseOperator(Base):
'polymorphic_identity': 'BaseOperator'
}
@apply_defaults
def __init__(
self,
task_id,
@ -712,6 +714,7 @@ class BaseOperator(Base):
depends_on_past=False,
dag=None,
params=None,
default_args=None,
*args,
**kwargs):

Просмотреть файл

@ -1,7 +1,9 @@
import logging
from airflow.models import BaseOperator
from subprocess import Popen, PIPE
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
class BashOperator(BaseOperator):
@ -12,6 +14,7 @@ class BashOperator(BaseOperator):
'polymorphic_identity': 'BashOperator'
}
@apply_defaults
def __init__(self, bash_command, *args, **kwargs):
super(BashOperator, self).__init__(*args, **kwargs)
self.bash_command = bash_command

Просмотреть файл

@ -1,4 +1,5 @@
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
class DummyOperator(BaseOperator):
@ -13,6 +14,7 @@ class DummyOperator(BaseOperator):
'polymorphic_identity': 'DummyOperator'
}
@apply_defaults
def __init__(self, *args, **kwargs):
super(DummyOperator, self).__init__(*args, **kwargs)

Просмотреть файл

@ -1,7 +1,9 @@
import logging
from airflow.models import BaseOperator
from airflow.configuration import getconf
from airflow.hooks import HiveHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
class HiveOperator(BaseOperator):
@ -20,6 +22,7 @@ class HiveOperator(BaseOperator):
template_fields = ('hql',)
template_ext = ('.hql', '.sql',)
@apply_defaults
def __init__(
self, hql,
hive_dbid=getconf().get('hooks', 'HIVE_DEFAULT_DBID'),

Просмотреть файл

@ -1,6 +1,8 @@
import logging
from airflow.models import BaseOperator
from airflow.hooks import MySqlHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
class MySqlOperator(BaseOperator):
@ -14,6 +16,7 @@ class MySqlOperator(BaseOperator):
template_fields = ('sql',)
template_ext = ('.sql',)
@apply_defaults
def __init__(self, sql, mysql_dbid, *args, **kwargs):
"""
Parameters:

Просмотреть файл

@ -10,10 +10,12 @@ from airflow.models import BaseOperator
from airflow.models import DatabaseConnection as DB
from airflow.models import State
from airflow.models import TaskInstance
from airflow.utils import apply_defaults
class BaseSensorOperator(BaseOperator):
@apply_defaults
def __init__(self, poke_interval=5, timeout=60*60*24*7, *args, **kwargs):
super(BaseSensorOperator, self).__init__(*args, **kwargs)
self.poke_interval = poke_interval
@ -45,6 +47,7 @@ class SqlSensor(BaseSensorOperator):
'polymorphic_identity': 'SqlSensor'
}
@apply_defaults
def __init__(self, db_id, sql, *args, **kwargs):
super(SqlSensor, self).__init__(*args, **kwargs)
@ -82,6 +85,7 @@ class ExternalTaskSensor(BaseSensorOperator):
'polymorphic_identity': 'ExternalTaskSensor'
}
@apply_defaults
def __init__(self, external_dag_id, external_task_id, *args, **kwargs):
super(ExternalTaskSensor, self).__init__(*args, **kwargs)
self.external_dag_id = external_dag_id
@ -116,6 +120,7 @@ class HivePartitionSensor(BaseSensorOperator):
'polymorphic_identity': 'HivePartitionSensor'
}
@apply_defaults
def __init__(
self,
table, partition="ds='{{ ds }}'",

Просмотреть файл

@ -1,5 +1,7 @@
import re
from datetime import datetime, timedelta
from functools import wraps
import inspect
import re
class State(object):
@ -80,3 +82,34 @@ def readfile(filepath):
content = f.read()
f.close()
return content
def apply_defaults(func):
'''
Function decorator that Looks for an argument named "default_args", and
fills the unspecified arguments from it.
Since python2.* isn't clear about which arguments are missing when
calling a function, and that this can be quite confusing with multi-level
inheritance and argument defaults, this decorator also alerts with
specific information about the missing arguments.
'''
@wraps(func)
def wrapper(*args, **kwargs):
if 'default_args' in kwargs:
default_args = kwargs['default_args']
arg_spec = inspect.getargspec(func)
num_defaults = len(arg_spec.defaults) if arg_spec.defaults else 0
non_optional_args = arg_spec.args[:-num_defaults]
if 'self' in non_optional_args:
non_optional_args.remove('self')
for arg in func.__code__.co_varnames:
if arg in default_args and arg not in kwargs:
kwargs[arg] = default_args[arg]
missing_args = list(set(non_optional_args) - set(kwargs))
if missing_args:
msg = "Argument {0} is required".format(missing_args)
raise Exception(msg)
result = func(*args, **kwargs)
return result
return wrapper