BaseOperator to not be a sqlalchemy model anymore

This commit is contained in:
Maxime Beauchemin 2015-06-02 21:11:39 -04:00 коммит произвёл Maxime
Родитель 9655c12506
Коммит 0779f56507
19 изменённых файлов: 18 добавлений и 102 удалений

Просмотреть файл

@ -894,7 +894,7 @@ class Log(Base):
self.owner = task_instance.task.owner self.owner = task_instance.task.owner
class BaseOperator(Base): class BaseOperator(object):
""" """
Abstract base class for all operators. Since operators create objects that Abstract base class for all operators. Since operators create objects that
become node in the dag, BaseOperator contains many recursive methods for become node in the dag, BaseOperator contains many recursive methods for
@ -969,21 +969,6 @@ class BaseOperator(Base):
ui_color = '#fff' ui_color = '#fff'
ui_fgcolor = '#000' ui_fgcolor = '#000'
__tablename__ = "task"
dag_id = Column(String(ID_LEN), primary_key=True)
task_id = Column(String(ID_LEN), primary_key=True)
owner = Column(String(500))
task_type = Column(String(20))
start_date = Column(DateTime())
end_date = Column(DateTime())
depends_on_past = Column(Integer())
__mapper_args__ = {
'polymorphic_on': task_type,
'polymorphic_identity': 'BaseOperator'
}
@apply_defaults @apply_defaults
def __init__( def __init__(
self, self,
@ -1241,7 +1226,11 @@ class BaseOperator(Base):
return self.downstream_list return self.downstream_list
def __repr__(self): def __repr__(self):
return "<Task({self.task_type}): {self.task_id}>".format(self=self) return "<Task({self.__class__.__name__}): {self.task_id}>".format(self=self)
@property
def task_type(self):
return self.__class__.__name__
def append_only_new(self, l, item): def append_only_new(self, l, item):
if any([item is t for t in l]): if any([item is t for t in l]):

Просмотреть файл

@ -18,10 +18,6 @@ class BashOperator(BaseOperator):
template_ext = ('.sh', '.bash',) template_ext = ('.sh', '.bash',)
ui_color = '#f0ede4' ui_color = '#f0ede4'
__mapper_args__ = {
'polymorphic_identity': 'BashOperator'
}
@apply_defaults @apply_defaults
def __init__(self, bash_command, *args, **kwargs): def __init__(self, bash_command, *args, **kwargs):
super(BashOperator, self).__init__(*args, **kwargs) super(BashOperator, self).__init__(*args, **kwargs)

Просмотреть файл

@ -11,10 +11,6 @@ class DummyOperator(BaseOperator):
template_fields = tuple() template_fields = tuple()
ui_color = '#e8f7e4' ui_color = '#e8f7e4'
__mapper_args__ = {
'polymorphic_identity': 'DummyOperator'
}
@apply_defaults @apply_defaults
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(DummyOperator, self).__init__(*args, **kwargs) super(DummyOperator, self).__init__(*args, **kwargs)

Просмотреть файл

@ -20,10 +20,6 @@ class EmailOperator(BaseOperator):
template_ext = ('.html',) template_ext = ('.html',)
ui_color = '#e6faf9' ui_color = '#e6faf9'
__mapper_args__ = {
'polymorphic_identity': 'EmailOperator'
}
@apply_defaults @apply_defaults
def __init__( def __init__(
self, self,

Просмотреть файл

@ -22,9 +22,6 @@ class HiveOperator(BaseOperator):
:type script_begin_tag: str :type script_begin_tag: str
""" """
__mapper_args__ = {
'polymorphic_identity': 'HiveOperator'
}
template_fields = ('hql', 'schema') template_fields = ('hql', 'schema')
template_ext = ('.hql', '.sql',) template_ext = ('.hql', '.sql',)
ui_color = '#f0e4ec' ui_color = '#f0e4ec'

Просмотреть файл

@ -40,9 +40,6 @@ class HiveStatsCollectionOperator(BaseOperator):
:type assignment_func: function :type assignment_func: function
""" """
__mapper_args__ = {
'polymorphic_identity': 'HiveStatsCollectionOperator'
}
template_fields = ('table', 'partition', 'ds', 'dttm') template_fields = ('table', 'partition', 'ds', 'dttm')
ui_color = '#aff7a6' ui_color = '#aff7a6'

Просмотреть файл

@ -27,9 +27,6 @@ class HiveToMySqlTransfer(BaseOperator):
:type mysql_preoperator: str :type mysql_preoperator: str
""" """
__mapper_args__ = {
'polymorphic_identity': 'HiveToMySqlTransfer'
}
template_fields = ('sql', 'mysql_table') template_fields = ('sql', 'mysql_table')
template_ext = ('.sql',) template_ext = ('.sql',)
ui_color = '#a0e08c' ui_color = '#a0e08c'

Просмотреть файл

@ -19,9 +19,6 @@ class Hive2SambaOperator(BaseOperator):
:type samba_conn_id: string :type samba_conn_id: string
""" """
__mapper_args__ = {
'polymorphic_identity': 'Hive2SambaOperator'
}
template_fields = ('hql', 'destination_filepath') template_fields = ('hql', 'destination_filepath')
template_ext = ('.hql', '.sql',) template_ext = ('.hql', '.sql',)

Просмотреть файл

@ -16,9 +16,6 @@ class MySqlOperator(BaseOperator):
a '.sql' extensions. a '.sql' extensions.
""" """
__mapper_args__ = {
'polymorphic_identity': 'MySqlOperator'
}
template_fields = ('sql',) template_fields = ('sql',)
template_ext = ('.sql',) template_ext = ('.sql',)
ui_color = '#ededed' ui_color = '#ededed'

Просмотреть файл

@ -45,9 +45,6 @@ class MySqlToHiveTransfer(BaseOperator):
:type hive_conn_id: str :type hive_conn_id: str
""" """
__mapper_args__ = {
'polymorphic_identity': 'MySqlToHiveOperator'
}
template_fields = ('sql', 'partition', 'hive_table') template_fields = ('sql', 'partition', 'hive_table')
template_ext = ('.sql',) template_ext = ('.sql',)
ui_color = '#a0e08c' ui_color = '#a0e08c'

Просмотреть файл

@ -16,9 +16,6 @@ class PostgresOperator(BaseOperator):
a '.sql' extensions. a '.sql' extensions.
""" """
__mapper_args__ = {
'polymorphic_identity': 'PostgresOperator'
}
template_fields = ('sql',) template_fields = ('sql',)
template_ext = ('.sql',) template_ext = ('.sql',)
ui_color = '#ededed' ui_color = '#ededed'

Просмотреть файл

@ -15,9 +15,6 @@ class PrestoCheckOperator(BaseOperator):
:type presto_conn_id: string :type presto_conn_id: string
""" """
__mapper_args__ = {
'polymorphic_identity': 'PrestoCheckOperator'
}
template_fields = ('sql',) template_fields = ('sql',)
template_ext = ('.hql', '.sql',) template_ext = ('.hql', '.sql',)
ui_color = '#fff7e6' ui_color = '#fff7e6'

Просмотреть файл

@ -26,10 +26,6 @@ class PythonOperator(BaseOperator):
template_fields = tuple() template_fields = tuple()
ui_color = '#ffefeb' ui_color = '#ffefeb'
__mapper_args__ = {
'polymorphic_identity': 'PythonOperator'
}
@apply_defaults @apply_defaults
def __init__( def __init__(
self, self,

Просмотреть файл

@ -1,7 +1,6 @@
import logging import logging
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
import subprocess import subprocess
import os
from airflow.hooks import S3Hook from airflow.hooks import S3Hook
from airflow.models import BaseOperator from airflow.models import BaseOperator
@ -9,14 +8,14 @@ from airflow.utils import apply_defaults
class S3FileTransformOperator(BaseOperator): class S3FileTransformOperator(BaseOperator):
""" """
Copies data from a source S3 location to a temporary location on the Copies data from a source S3 location to a temporary location on the
local filesystem. Runs a transformation on this file as specified by local filesystem. Runs a transformation on this file as specified by
the transformation script and uploads the output to a destination S3 location. the transformation script and uploads the output to a destination S3 location.
The locations of the source and the destination files in the local filesystem The locations of the source and the destination files in the local filesystem
is provided as an first and second arguments to the transformation script. is provided as an first and second arguments to the transformation script.
The transformation script is expected to read the data from source , transform it The transformation script is expected to read the data from source , transform it
and write the output to the local destination file. The operator then takes over and write the output to the local destination file. The operator then takes over
control and uploads the local destination file to S3. control and uploads the local destination file to S3.
:param source_s3_key: The key to be retrieved from S3 :param source_s3_key: The key to be retrieved from S3
@ -33,12 +32,9 @@ class S3FileTransformOperator(BaseOperator):
:type transform_script: str :type transform_script: str
""" """
__mapper_args__ = {
'polymorphic_identity': 'S3FileTransformOperator'
}
template_fields = ('source_s3_key', 'dest_s3_key') template_fields = ('source_s3_key', 'dest_s3_key')
template_ext = () template_ext = ()
ui_color = '#f9c915' ui_color = '#f9c915'
@apply_defaults @apply_defaults
def __init__( def __init__(
@ -81,8 +77,8 @@ class S3FileTransformOperator(BaseOperator):
logging.info("Uploading transformed file to S3") logging.info("Uploading transformed file to S3")
f_dest.flush() f_dest.flush()
self.dest_s3.load_file( self.dest_s3.load_file(
filename=f_dest.name, filename=f_dest.name,
key=self.dest_s3_key, key=self.dest_s3_key,
replace=self.replace replace=self.replace
) )
logging.info("Upload successful") logging.info("Upload successful")

Просмотреть файл

@ -54,9 +54,6 @@ class S3ToHiveTransfer(BaseOperator):
:type hive_conn_id: str :type hive_conn_id: str
""" """
__mapper_args__ = {
'polymorphic_identity': 'S3ToHiveOperator'
}
template_fields = ('s3_key', 'partition', 'hive_table') template_fields = ('s3_key', 'partition', 'hive_table')
template_ext = () template_ext = ()
ui_color = '#a0e08c' ui_color = '#a0e08c'

Просмотреть файл

@ -66,10 +66,6 @@ class SqlSensor(BaseSensorOperator):
template_fields = ('sql',) template_fields = ('sql',)
template_ext = ('.hql', '.sql',) template_ext = ('.hql', '.sql',)
__mapper_args__ = {
'polymorphic_identity': 'SqlSensor'
}
@apply_defaults @apply_defaults
def __init__(self, conn_id, sql, *args, **kwargs): def __init__(self, conn_id, sql, *args, **kwargs):
@ -111,9 +107,6 @@ class ExternalTaskSensor(BaseSensorOperator):
:type external_task_id: string :type external_task_id: string
""" """
template_fields = ('execution_date',) template_fields = ('execution_date',)
__mapper_args__ = {
'polymorphic_identity': 'ExternalTaskSensor'
}
@apply_defaults @apply_defaults
def __init__(self, external_dag_id, external_task_id, *args, **kwargs): def __init__(self, external_dag_id, external_task_id, *args, **kwargs):
@ -155,9 +148,6 @@ class HivePartitionSensor(BaseSensorOperator):
:type partition: string :type partition: string
""" """
template_fields = ('table', 'partition',) template_fields = ('table', 'partition',)
__mapper_args__ = {
'polymorphic_identity': 'HivePartitionSensor'
}
@apply_defaults @apply_defaults
def __init__( def __init__(
@ -193,9 +183,6 @@ class HdfsSensor(BaseSensorOperator):
Waits for a file or folder to land in HDFS Waits for a file or folder to land in HDFS
""" """
template_fields = ('filepath',) template_fields = ('filepath',)
__mapper_args__ = {
'polymorphic_identity': 'HdfsSensor'
}
@apply_defaults @apply_defaults
def __init__( def __init__(
@ -235,9 +222,6 @@ class S3KeySensor(BaseSensorOperator):
:type wildcard_match: bool :type wildcard_match: bool
""" """
template_fields = ('bucket_key', 'bucket_name') template_fields = ('bucket_key', 'bucket_name')
__mapper_args__ = {
'polymorphic_identity': 'S3KeySensor'
}
@apply_defaults @apply_defaults
def __init__( def __init__(
@ -300,9 +284,6 @@ class S3PrefixSensor(BaseSensorOperator):
:type bucket_name: str :type bucket_name: str
""" """
template_fields = ('bucket_key', 'bucket_name') template_fields = ('bucket_key', 'bucket_name')
__mapper_args__ = {
'polymorphic_identity': 'S3PrefixSensor'
}
@apply_defaults @apply_defaults
def __init__( def __init__(
@ -342,9 +323,6 @@ class TimeSensor(BaseSensorOperator):
:type target_time: datetime.time :type target_time: datetime.time
""" """
template_fields = tuple() template_fields = tuple()
__mapper_args__ = {
'polymorphic_identity': 'TimeSensor'
}
@apply_defaults @apply_defaults
def __init__(self, target_time, *args, **kwargs): def __init__(self, target_time, *args, **kwargs):

Просмотреть файл

@ -16,9 +16,6 @@ class SqliteOperator(BaseOperator):
a '.sql' extensions. a '.sql' extensions.
""" """
__mapper_args__ = {
'polymorphic_identity': 'sqliteOperator'
}
template_fields = ('sql',) template_fields = ('sql',)
template_ext = ('.sql',) template_ext = ('.sql',)
ui_color = '#cdaaed' ui_color = '#cdaaed'

Просмотреть файл

@ -9,10 +9,6 @@ class SubDagOperator(BaseOperator):
ui_color = '#555' ui_color = '#555'
ui_fgcolor = '#fff' ui_fgcolor = '#fff'
__mapper_args__ = {
'polymorphic_identity': 'SubDagOperator'
}
@apply_defaults @apply_defaults
def __init__( def __init__(
self, self,

Просмотреть файл

@ -1097,7 +1097,10 @@ class Airflow(BaseView):
for ti in dag.get_task_instances(session, dttm, dttm) for ti in dag.get_task_instances(session, dttm, dttm)
} }
tasks = { tasks = {
t.task_id: utils.alchemy_to_dict(t) t.task_id: {
'dag_id': t.dag_id,
'task_type': t.task_type,
}
for t in dag.tasks for t in dag.tasks
} }
if not tasks: if not tasks: