diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py index 6f941ef110..6064da822c 100644 --- a/airflow/operators/__init__.py +++ b/airflow/operators/__init__.py @@ -5,5 +5,6 @@ from presto_check_operator import PrestoCheckOperator from sensors import SqlSensor from sensors import ExternalTaskSensor from sensors import HivePartitionSensor +from sensors import HdfsSensor from email_operator import EmailOperator from dummy_operator import DummyOperator diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index d88b9e7701..af3a90a1a9 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -11,6 +11,8 @@ from airflow.models import State from airflow.models import TaskInstance from airflow.utils import apply_defaults +from snakebite.client import HAClient, Namenode + class BaseSensorOperator(BaseOperator): @@ -121,7 +123,7 @@ class ExternalTaskSensor(BaseSensorOperator): class HivePartitionSensor(BaseSensorOperator): """ - Waits for the apparation of a partition in Hive + Waits for the apparition of a partition in Hive """ template_fields = ('table', 'partition',) __mapper_args__ = { @@ -150,3 +152,43 @@ class HivePartitionSensor(BaseSensorOperator): 'partition {self.partition}'.format(**locals())) return self.hook.check_for_partition( self.schema, self.table, self.partition) + + +class HdfsSensor(BaseSensorOperator): + """ + Waits for a file or folder to land in HDFS + """ + template_fields = ('filepath',) + __mapper_args__ = { + 'polymorphic_identity': 'HdfsSensor' + } + + @apply_defaults + def __init__( + self, + filepath, + hdfs_conn_id='hdfs_default', + *args, **kwargs): + super(HdfsSensor, self).__init__(*args, **kwargs) + self.filepath = filepath + session = settings.Session() + db = session.query(DB).filter(DB.conn_id==hdfs_conn_id).first() + if not db: + raise Exception("conn_id doesn't exist in the repository") + self.host = db.host + self.port = db.port + NAMENODES = [Namenode(self.host, self.port)] + self.sb = HAClient(NAMENODES) + session.commit() + session.close() + + def poke(self): + logging.getLogger("snakebite").setLevel(logging.WARNING) + logging.info( + 'Poking for file {self.filepath} '.format(**locals())) + try: + files = [f for f in self.sb.ls([self.filepath])] + except: + return False + print([i for i in f]) + return True diff --git a/airflow/www/app.py b/airflow/www/app.py index 9083485072..709bbe3b5e 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -1061,12 +1061,13 @@ class ConnectionModelView(LoginMixin, ModelView): column_list = ('conn_id', 'conn_type', 'host', 'port') form_choices = { 'conn_type': [ + ('ftp', 'FTP',), + ('hdfs', 'HDFS',), ('hive', 'Hive',), - ('presto', 'Presto',), ('mysql', 'MySQL',), ('oracle', 'Oracle',), + ('presto', 'Presto',), ('samba', 'Samba',), - ('ftp', 'FTP',), ] } mv = ConnectionModelView( diff --git a/requirements.txt b/requirements.txt index 391e590703..f4af54b687 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,6 +18,7 @@ pyhive python-dateutil requests setproctitle +snakebite sphinx sphinx_rtd_theme Sphinx-PyPI-upload diff --git a/setup.py b/setup.py index 1c292d2fe0..9fcd73dbc5 100644 --- a/setup.py +++ b/setup.py @@ -21,6 +21,7 @@ setup( 'pandas', 'pygments', 'pyhive', 'python-dateutil', 'requests', 'setproctitle', + 'snakebite', 'sphinx', 'sphinx-rtd-theme', 'Sphinx-PyPI-upload', 'sqlalchemy', 'thrift', 'tornado' ],