Fixes around HDFSSensor
This commit is contained in:
Родитель
4fb8bff70f
Коммит
d8d66c786d
|
@ -11,8 +11,6 @@ from airflow.models import State
|
|||
from airflow.models import TaskInstance
|
||||
from airflow.utils import apply_defaults
|
||||
|
||||
from snakebite.client import HAClient, Namenode
|
||||
|
||||
|
||||
class BaseSensorOperator(BaseOperator):
|
||||
'''
|
||||
|
@ -207,14 +205,15 @@ class HdfsSensor(BaseSensorOperator):
|
|||
*args, **kwargs):
|
||||
super(HdfsSensor, self).__init__(*args, **kwargs)
|
||||
self.filepath = filepath
|
||||
self.sb = HDFSHook(hdfs_conn_id).get_conn()
|
||||
self.hdfs_conn_id = hdfs_conn_id
|
||||
|
||||
def poke(self):
|
||||
sb = hooks.HDFSHook(self.hdfs_conn_id).get_conn()
|
||||
logging.getLogger("snakebite").setLevel(logging.WARNING)
|
||||
logging.info(
|
||||
'Poking for file {self.filepath} '.format(**locals()))
|
||||
try:
|
||||
files = [f for f in self.sb.ls([self.filepath])]
|
||||
files = [f for f in sb.ls([self.filepath])]
|
||||
except:
|
||||
return False
|
||||
return True
|
||||
|
|
Загрузка…
Ссылка в новой задаче