Adding HdfsSensor operator
This commit is contained in:
Родитель
6af313cdac
Коммит
0790c1ce54
|
@ -5,5 +5,6 @@ from presto_check_operator import PrestoCheckOperator
|
|||
from sensors import SqlSensor
|
||||
from sensors import ExternalTaskSensor
|
||||
from sensors import HivePartitionSensor
|
||||
from sensors import HdfsSensor
|
||||
from email_operator import EmailOperator
|
||||
from dummy_operator import DummyOperator
|
||||
|
|
|
@ -11,6 +11,8 @@ from airflow.models import State
|
|||
from airflow.models import TaskInstance
|
||||
from airflow.utils import apply_defaults
|
||||
|
||||
from snakebite.client import HAClient, Namenode
|
||||
|
||||
|
||||
class BaseSensorOperator(BaseOperator):
|
||||
|
||||
|
@ -121,7 +123,7 @@ class ExternalTaskSensor(BaseSensorOperator):
|
|||
|
||||
class HivePartitionSensor(BaseSensorOperator):
|
||||
"""
|
||||
Waits for the apparation of a partition in Hive
|
||||
Waits for the apparition of a partition in Hive
|
||||
"""
|
||||
template_fields = ('table', 'partition',)
|
||||
__mapper_args__ = {
|
||||
|
@ -150,3 +152,43 @@ class HivePartitionSensor(BaseSensorOperator):
|
|||
'partition {self.partition}'.format(**locals()))
|
||||
return self.hook.check_for_partition(
|
||||
self.schema, self.table, self.partition)
|
||||
|
||||
|
||||
class HdfsSensor(BaseSensorOperator):
|
||||
"""
|
||||
Waits for a file or folder to land in HDFS
|
||||
"""
|
||||
template_fields = ('filepath',)
|
||||
__mapper_args__ = {
|
||||
'polymorphic_identity': 'HdfsSensor'
|
||||
}
|
||||
|
||||
@apply_defaults
|
||||
def __init__(
|
||||
self,
|
||||
filepath,
|
||||
hdfs_conn_id='hdfs_default',
|
||||
*args, **kwargs):
|
||||
super(HdfsSensor, self).__init__(*args, **kwargs)
|
||||
self.filepath = filepath
|
||||
session = settings.Session()
|
||||
db = session.query(DB).filter(DB.conn_id==hdfs_conn_id).first()
|
||||
if not db:
|
||||
raise Exception("conn_id doesn't exist in the repository")
|
||||
self.host = db.host
|
||||
self.port = db.port
|
||||
NAMENODES = [Namenode(self.host, self.port)]
|
||||
self.sb = HAClient(NAMENODES)
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
def poke(self):
|
||||
logging.getLogger("snakebite").setLevel(logging.WARNING)
|
||||
logging.info(
|
||||
'Poking for file {self.filepath} '.format(**locals()))
|
||||
try:
|
||||
files = [f for f in self.sb.ls([self.filepath])]
|
||||
except:
|
||||
return False
|
||||
print([i for i in f])
|
||||
return True
|
||||
|
|
|
@ -1061,12 +1061,13 @@ class ConnectionModelView(LoginMixin, ModelView):
|
|||
column_list = ('conn_id', 'conn_type', 'host', 'port')
|
||||
form_choices = {
|
||||
'conn_type': [
|
||||
('ftp', 'FTP',),
|
||||
('hdfs', 'HDFS',),
|
||||
('hive', 'Hive',),
|
||||
('presto', 'Presto',),
|
||||
('mysql', 'MySQL',),
|
||||
('oracle', 'Oracle',),
|
||||
('presto', 'Presto',),
|
||||
('samba', 'Samba',),
|
||||
('ftp', 'FTP',),
|
||||
]
|
||||
}
|
||||
mv = ConnectionModelView(
|
||||
|
|
|
@ -18,6 +18,7 @@ pyhive
|
|||
python-dateutil
|
||||
requests
|
||||
setproctitle
|
||||
snakebite
|
||||
sphinx
|
||||
sphinx_rtd_theme
|
||||
Sphinx-PyPI-upload
|
||||
|
|
1
setup.py
1
setup.py
|
@ -21,6 +21,7 @@ setup(
|
|||
'pandas',
|
||||
'pygments', 'pyhive',
|
||||
'python-dateutil', 'requests', 'setproctitle',
|
||||
'snakebite',
|
||||
'sphinx', 'sphinx-rtd-theme', 'Sphinx-PyPI-upload',
|
||||
'sqlalchemy', 'thrift', 'tornado'
|
||||
],
|
||||
|
|
Загрузка…
Ссылка в новой задаче