diff --git a/airflow/models.py b/airflow/models.py index 945885a7b4..9fc8be4845 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -894,7 +894,7 @@ class Log(Base): self.owner = task_instance.task.owner -class BaseOperator(Base): +class BaseOperator(object): """ Abstract base class for all operators. Since operators create objects that become node in the dag, BaseOperator contains many recursive methods for @@ -969,21 +969,6 @@ class BaseOperator(Base): ui_color = '#fff' ui_fgcolor = '#000' - __tablename__ = "task" - - dag_id = Column(String(ID_LEN), primary_key=True) - task_id = Column(String(ID_LEN), primary_key=True) - owner = Column(String(500)) - task_type = Column(String(20)) - start_date = Column(DateTime()) - end_date = Column(DateTime()) - depends_on_past = Column(Integer()) - - __mapper_args__ = { - 'polymorphic_on': task_type, - 'polymorphic_identity': 'BaseOperator' - } - @apply_defaults def __init__( self, @@ -1241,7 +1226,11 @@ class BaseOperator(Base): return self.downstream_list def __repr__(self): - return "".format(self=self) + return "".format(self=self) + + @property + def task_type(self): + return self.__class__.__name__ def append_only_new(self, l, item): if any([item is t for t in l]): diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py index ef275add59..86fc9b2052 100644 --- a/airflow/operators/bash_operator.py +++ b/airflow/operators/bash_operator.py @@ -18,10 +18,6 @@ class BashOperator(BaseOperator): template_ext = ('.sh', '.bash',) ui_color = '#f0ede4' - __mapper_args__ = { - 'polymorphic_identity': 'BashOperator' - } - @apply_defaults def __init__(self, bash_command, *args, **kwargs): super(BashOperator, self).__init__(*args, **kwargs) diff --git a/airflow/operators/dummy_operator.py b/airflow/operators/dummy_operator.py index f5574993b7..6b69115e6b 100644 --- a/airflow/operators/dummy_operator.py +++ b/airflow/operators/dummy_operator.py @@ -11,10 +11,6 @@ class DummyOperator(BaseOperator): template_fields = tuple() ui_color = '#e8f7e4' - __mapper_args__ = { - 'polymorphic_identity': 'DummyOperator' - } - @apply_defaults def __init__(self, *args, **kwargs): super(DummyOperator, self).__init__(*args, **kwargs) diff --git a/airflow/operators/email_operator.py b/airflow/operators/email_operator.py index d5d5b75115..8a5d162e85 100644 --- a/airflow/operators/email_operator.py +++ b/airflow/operators/email_operator.py @@ -20,10 +20,6 @@ class EmailOperator(BaseOperator): template_ext = ('.html',) ui_color = '#e6faf9' - __mapper_args__ = { - 'polymorphic_identity': 'EmailOperator' - } - @apply_defaults def __init__( self, diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py index f54f7430c3..4a19fa6db9 100644 --- a/airflow/operators/hive_operator.py +++ b/airflow/operators/hive_operator.py @@ -22,9 +22,6 @@ class HiveOperator(BaseOperator): :type script_begin_tag: str """ - __mapper_args__ = { - 'polymorphic_identity': 'HiveOperator' - } template_fields = ('hql', 'schema') template_ext = ('.hql', '.sql',) ui_color = '#f0e4ec' diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py index e73ec526d9..e7e9c8ec63 100644 --- a/airflow/operators/hive_stats_operator.py +++ b/airflow/operators/hive_stats_operator.py @@ -40,9 +40,6 @@ class HiveStatsCollectionOperator(BaseOperator): :type assignment_func: function """ - __mapper_args__ = { - 'polymorphic_identity': 'HiveStatsCollectionOperator' - } template_fields = ('table', 'partition', 'ds', 'dttm') ui_color = '#aff7a6' diff --git a/airflow/operators/hive_to_mysql.py b/airflow/operators/hive_to_mysql.py index 775a1509b7..43071deb92 100644 --- a/airflow/operators/hive_to_mysql.py +++ b/airflow/operators/hive_to_mysql.py @@ -27,9 +27,6 @@ class HiveToMySqlTransfer(BaseOperator): :type mysql_preoperator: str """ - __mapper_args__ = { - 'polymorphic_identity': 'HiveToMySqlTransfer' - } template_fields = ('sql', 'mysql_table') template_ext = ('.sql',) ui_color = '#a0e08c' diff --git a/airflow/operators/hive_to_samba_operator.py b/airflow/operators/hive_to_samba_operator.py index 49c8586b1c..cfa98142ff 100644 --- a/airflow/operators/hive_to_samba_operator.py +++ b/airflow/operators/hive_to_samba_operator.py @@ -19,9 +19,6 @@ class Hive2SambaOperator(BaseOperator): :type samba_conn_id: string """ - __mapper_args__ = { - 'polymorphic_identity': 'Hive2SambaOperator' - } template_fields = ('hql', 'destination_filepath') template_ext = ('.hql', '.sql',) diff --git a/airflow/operators/mysql_operator.py b/airflow/operators/mysql_operator.py index 7fbeb38450..6e7a73dd41 100644 --- a/airflow/operators/mysql_operator.py +++ b/airflow/operators/mysql_operator.py @@ -16,9 +16,6 @@ class MySqlOperator(BaseOperator): a '.sql' extensions. """ - __mapper_args__ = { - 'polymorphic_identity': 'MySqlOperator' - } template_fields = ('sql',) template_ext = ('.sql',) ui_color = '#ededed' diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py index a474783aae..a0d7fbf71e 100644 --- a/airflow/operators/mysql_to_hive.py +++ b/airflow/operators/mysql_to_hive.py @@ -45,9 +45,6 @@ class MySqlToHiveTransfer(BaseOperator): :type hive_conn_id: str """ - __mapper_args__ = { - 'polymorphic_identity': 'MySqlToHiveOperator' - } template_fields = ('sql', 'partition', 'hive_table') template_ext = ('.sql',) ui_color = '#a0e08c' diff --git a/airflow/operators/postgres_operator.py b/airflow/operators/postgres_operator.py index ea8f0f6eeb..d4edc2b5be 100644 --- a/airflow/operators/postgres_operator.py +++ b/airflow/operators/postgres_operator.py @@ -16,9 +16,6 @@ class PostgresOperator(BaseOperator): a '.sql' extensions. """ - __mapper_args__ = { - 'polymorphic_identity': 'PostgresOperator' - } template_fields = ('sql',) template_ext = ('.sql',) ui_color = '#ededed' diff --git a/airflow/operators/presto_check_operator.py b/airflow/operators/presto_check_operator.py index 2e24083073..26fe5e7347 100644 --- a/airflow/operators/presto_check_operator.py +++ b/airflow/operators/presto_check_operator.py @@ -15,9 +15,6 @@ class PrestoCheckOperator(BaseOperator): :type presto_conn_id: string """ - __mapper_args__ = { - 'polymorphic_identity': 'PrestoCheckOperator' - } template_fields = ('sql',) template_ext = ('.hql', '.sql',) ui_color = '#fff7e6' diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index 3058090898..87022d19cc 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -26,10 +26,6 @@ class PythonOperator(BaseOperator): template_fields = tuple() ui_color = '#ffefeb' - __mapper_args__ = { - 'polymorphic_identity': 'PythonOperator' - } - @apply_defaults def __init__( self, diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index fbde1218a3..857f3f7e8c 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -1,7 +1,6 @@ import logging from tempfile import NamedTemporaryFile import subprocess -import os from airflow.hooks import S3Hook from airflow.models import BaseOperator @@ -9,14 +8,14 @@ from airflow.utils import apply_defaults class S3FileTransformOperator(BaseOperator): """ - Copies data from a source S3 location to a temporary location on the - local filesystem. Runs a transformation on this file as specified by + Copies data from a source S3 location to a temporary location on the + local filesystem. Runs a transformation on this file as specified by the transformation script and uploads the output to a destination S3 location. - The locations of the source and the destination files in the local filesystem + The locations of the source and the destination files in the local filesystem is provided as an first and second arguments to the transformation script. - The transformation script is expected to read the data from source , transform it - and write the output to the local destination file. The operator then takes over + The transformation script is expected to read the data from source , transform it + and write the output to the local destination file. The operator then takes over control and uploads the local destination file to S3. :param source_s3_key: The key to be retrieved from S3 @@ -33,12 +32,9 @@ class S3FileTransformOperator(BaseOperator): :type transform_script: str """ - __mapper_args__ = { - 'polymorphic_identity': 'S3FileTransformOperator' - } template_fields = ('source_s3_key', 'dest_s3_key') template_ext = () - ui_color = '#f9c915' + ui_color = '#f9c915' @apply_defaults def __init__( @@ -81,8 +77,8 @@ class S3FileTransformOperator(BaseOperator): logging.info("Uploading transformed file to S3") f_dest.flush() self.dest_s3.load_file( - filename=f_dest.name, - key=self.dest_s3_key, + filename=f_dest.name, + key=self.dest_s3_key, replace=self.replace ) logging.info("Upload successful") diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 63322fb088..da641bec33 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -54,9 +54,6 @@ class S3ToHiveTransfer(BaseOperator): :type hive_conn_id: str """ - __mapper_args__ = { - 'polymorphic_identity': 'S3ToHiveOperator' - } template_fields = ('s3_key', 'partition', 'hive_table') template_ext = () ui_color = '#a0e08c' diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index e7b20ab731..c14b6fda07 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -66,10 +66,6 @@ class SqlSensor(BaseSensorOperator): template_fields = ('sql',) template_ext = ('.hql', '.sql',) - __mapper_args__ = { - 'polymorphic_identity': 'SqlSensor' - } - @apply_defaults def __init__(self, conn_id, sql, *args, **kwargs): @@ -111,9 +107,6 @@ class ExternalTaskSensor(BaseSensorOperator): :type external_task_id: string """ template_fields = ('execution_date',) - __mapper_args__ = { - 'polymorphic_identity': 'ExternalTaskSensor' - } @apply_defaults def __init__(self, external_dag_id, external_task_id, *args, **kwargs): @@ -155,9 +148,6 @@ class HivePartitionSensor(BaseSensorOperator): :type partition: string """ template_fields = ('table', 'partition',) - __mapper_args__ = { - 'polymorphic_identity': 'HivePartitionSensor' - } @apply_defaults def __init__( @@ -193,9 +183,6 @@ class HdfsSensor(BaseSensorOperator): Waits for a file or folder to land in HDFS """ template_fields = ('filepath',) - __mapper_args__ = { - 'polymorphic_identity': 'HdfsSensor' - } @apply_defaults def __init__( @@ -235,9 +222,6 @@ class S3KeySensor(BaseSensorOperator): :type wildcard_match: bool """ template_fields = ('bucket_key', 'bucket_name') - __mapper_args__ = { - 'polymorphic_identity': 'S3KeySensor' - } @apply_defaults def __init__( @@ -300,9 +284,6 @@ class S3PrefixSensor(BaseSensorOperator): :type bucket_name: str """ template_fields = ('bucket_key', 'bucket_name') - __mapper_args__ = { - 'polymorphic_identity': 'S3PrefixSensor' - } @apply_defaults def __init__( @@ -342,9 +323,6 @@ class TimeSensor(BaseSensorOperator): :type target_time: datetime.time """ template_fields = tuple() - __mapper_args__ = { - 'polymorphic_identity': 'TimeSensor' - } @apply_defaults def __init__(self, target_time, *args, **kwargs): diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py index 36b3a2ab3b..5abe539184 100644 --- a/airflow/operators/sqlite_operator.py +++ b/airflow/operators/sqlite_operator.py @@ -16,9 +16,6 @@ class SqliteOperator(BaseOperator): a '.sql' extensions. """ - __mapper_args__ = { - 'polymorphic_identity': 'sqliteOperator' - } template_fields = ('sql',) template_ext = ('.sql',) ui_color = '#cdaaed' diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py index 93a35bfbea..aaed3f29bb 100644 --- a/airflow/operators/subdag_operator.py +++ b/airflow/operators/subdag_operator.py @@ -9,10 +9,6 @@ class SubDagOperator(BaseOperator): ui_color = '#555' ui_fgcolor = '#fff' - __mapper_args__ = { - 'polymorphic_identity': 'SubDagOperator' - } - @apply_defaults def __init__( self, diff --git a/airflow/www/app.py b/airflow/www/app.py index 0c03d1ad7f..47ed93be90 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -1097,7 +1097,10 @@ class Airflow(BaseView): for ti in dag.get_task_instances(session, dttm, dttm) } tasks = { - t.task_id: utils.alchemy_to_dict(t) + t.task_id: { + 'dag_id': t.dag_id, + 'task_type': t.task_type, + } for t in dag.tasks } if not tasks: