Merge pull request #15 from mistercrunch/default_args
Using a decorator to specify operators default args
This commit is contained in:
Коммит
7c95661509
|
@ -24,7 +24,8 @@ from airflow.configuration import getconf
|
|||
from airflow import settings
|
||||
from airflow import utils
|
||||
import socket
|
||||
from utils import State
|
||||
from airflow.utils import State
|
||||
from airflow.utils import apply_defaults
|
||||
|
||||
Base = declarative_base()
|
||||
ID_LEN = getconf().getint('misc', 'ID_LEN')
|
||||
|
@ -702,6 +703,7 @@ class BaseOperator(Base):
|
|||
'polymorphic_identity': 'BaseOperator'
|
||||
}
|
||||
|
||||
@apply_defaults
|
||||
def __init__(
|
||||
self,
|
||||
task_id,
|
||||
|
@ -714,6 +716,7 @@ class BaseOperator(Base):
|
|||
depends_on_past=False,
|
||||
dag=None,
|
||||
params=None,
|
||||
default_args=None,
|
||||
*args,
|
||||
**kwargs):
|
||||
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
import logging
|
||||
from airflow.models import BaseOperator
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.utils import apply_defaults
|
||||
|
||||
|
||||
class BashOperator(BaseOperator):
|
||||
|
||||
|
@ -12,6 +14,7 @@ class BashOperator(BaseOperator):
|
|||
'polymorphic_identity': 'BashOperator'
|
||||
}
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self, bash_command, *args, **kwargs):
|
||||
super(BashOperator, self).__init__(*args, **kwargs)
|
||||
self.bash_command = bash_command
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from airflow.models import BaseOperator
|
||||
from airflow.utils import apply_defaults
|
||||
|
||||
|
||||
class DummyOperator(BaseOperator):
|
||||
|
@ -13,6 +14,7 @@ class DummyOperator(BaseOperator):
|
|||
'polymorphic_identity': 'DummyOperator'
|
||||
}
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(DummyOperator, self).__init__(*args, **kwargs)
|
||||
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
import logging
|
||||
from airflow.models import BaseOperator
|
||||
|
||||
from airflow.configuration import getconf
|
||||
from airflow.hooks import HiveHook
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.utils import apply_defaults
|
||||
|
||||
|
||||
class HiveOperator(BaseOperator):
|
||||
|
@ -20,6 +22,7 @@ class HiveOperator(BaseOperator):
|
|||
template_fields = ('hql',)
|
||||
template_ext = ('.hql', '.sql',)
|
||||
|
||||
@apply_defaults
|
||||
def __init__(
|
||||
self, hql,
|
||||
hive_dbid=getconf().get('hooks', 'HIVE_DEFAULT_DBID'),
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import logging
|
||||
from airflow.models import BaseOperator
|
||||
|
||||
from airflow.hooks import MySqlHook
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.utils import apply_defaults
|
||||
|
||||
|
||||
class MySqlOperator(BaseOperator):
|
||||
|
@ -14,6 +16,7 @@ class MySqlOperator(BaseOperator):
|
|||
template_fields = ('sql',)
|
||||
template_ext = ('.sql',)
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self, sql, mysql_dbid, *args, **kwargs):
|
||||
"""
|
||||
Parameters:
|
||||
|
|
|
@ -10,10 +10,12 @@ from airflow.models import BaseOperator
|
|||
from airflow.models import DatabaseConnection as DB
|
||||
from airflow.models import State
|
||||
from airflow.models import TaskInstance
|
||||
from airflow.utils import apply_defaults
|
||||
|
||||
|
||||
class BaseSensorOperator(BaseOperator):
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self, poke_interval=5, timeout=60*60*24*7, *args, **kwargs):
|
||||
super(BaseSensorOperator, self).__init__(*args, **kwargs)
|
||||
self.poke_interval = poke_interval
|
||||
|
@ -45,6 +47,7 @@ class SqlSensor(BaseSensorOperator):
|
|||
'polymorphic_identity': 'SqlSensor'
|
||||
}
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self, db_id, sql, *args, **kwargs):
|
||||
|
||||
super(SqlSensor, self).__init__(*args, **kwargs)
|
||||
|
@ -82,6 +85,7 @@ class ExternalTaskSensor(BaseSensorOperator):
|
|||
'polymorphic_identity': 'ExternalTaskSensor'
|
||||
}
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self, external_dag_id, external_task_id, *args, **kwargs):
|
||||
super(ExternalTaskSensor, self).__init__(*args, **kwargs)
|
||||
self.external_dag_id = external_dag_id
|
||||
|
@ -116,6 +120,7 @@ class HivePartitionSensor(BaseSensorOperator):
|
|||
'polymorphic_identity': 'HivePartitionSensor'
|
||||
}
|
||||
|
||||
@apply_defaults
|
||||
def __init__(
|
||||
self,
|
||||
table, partition="ds='{{ ds }}'",
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from functools import wraps
|
||||
import inspect
|
||||
import re
|
||||
|
||||
|
||||
class State(object):
|
||||
|
@ -80,3 +82,34 @@ def readfile(filepath):
|
|||
content = f.read()
|
||||
f.close()
|
||||
return content
|
||||
|
||||
|
||||
def apply_defaults(func):
|
||||
'''
|
||||
Function decorator that Looks for an argument named "default_args", and
|
||||
fills the unspecified arguments from it.
|
||||
|
||||
Since python2.* isn't clear about which arguments are missing when
|
||||
calling a function, and that this can be quite confusing with multi-level
|
||||
inheritance and argument defaults, this decorator also alerts with
|
||||
specific information about the missing arguments.
|
||||
'''
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if 'default_args' in kwargs:
|
||||
default_args = kwargs['default_args']
|
||||
arg_spec = inspect.getargspec(func)
|
||||
num_defaults = len(arg_spec.defaults) if arg_spec.defaults else 0
|
||||
non_optional_args = arg_spec.args[:-num_defaults]
|
||||
if 'self' in non_optional_args:
|
||||
non_optional_args.remove('self')
|
||||
for arg in func.__code__.co_varnames:
|
||||
if arg in default_args and arg not in kwargs:
|
||||
kwargs[arg] = default_args[arg]
|
||||
missing_args = list(set(non_optional_args) - set(kwargs))
|
||||
if missing_args:
|
||||
msg = "Argument {0} is required".format(missing_args)
|
||||
raise Exception(msg)
|
||||
result = func(*args, **kwargs)
|
||||
return result
|
||||
return wrapper
|
||||
|
|
Загрузка…
Ссылка в новой задаче