Merge pull request #209 from mistercrunch/hdfs_hook
add hdfs hook and update hdfs sensor to work with HA configuration
This commit is contained in:
Коммит
77bcb1b8ea
|
@ -10,9 +10,10 @@ _hooks = {
|
|||
'HiveMetastoreHook',
|
||||
'HiveServer2Hook',
|
||||
],
|
||||
'presto_hook': ['PrestoHook'],
|
||||
'hdfs_hook': ['HDFSHook'],
|
||||
'mysql_hook': ['MySqlHook'],
|
||||
'postgres_hook': ['PostgresHook'],
|
||||
'presto_hook': ['PrestoHook'],
|
||||
'samba_hook': ['SambaHook'],
|
||||
'S3_hook': ['S3Hook'],
|
||||
}
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
from airflow.hooks.base_hook import BaseHook
|
||||
from snakebite.client import Client, HAClient, Namenode
|
||||
|
||||
class HDFSHook(BaseHook):
|
||||
'''
|
||||
Interact with HDFS. This class is a wrapper around the snakebite library.
|
||||
'''
|
||||
def __init__(self, hdfs_conn_id='hdfs_default'):
|
||||
self.hdfs_conn_id = hdfs_conn_id
|
||||
|
||||
def get_conn(self):
|
||||
'''
|
||||
Returns a snakebite HDFSClient object.
|
||||
'''
|
||||
connections = self.get_connections(self.hdfs_conn_id)
|
||||
client = None
|
||||
if len(connections) == 1:
|
||||
client = Client(connections[0].host, connections[0].port)
|
||||
elif len(connections) > 1:
|
||||
nn = [Namenode(conn.host, conn.port) for conn in connections]
|
||||
client = HAClient(nn)
|
||||
else:
|
||||
raise Exception("conn_id doesn't exist in the repository")
|
||||
return client
|
|
@ -207,16 +207,7 @@ class HdfsSensor(BaseSensorOperator):
|
|||
*args, **kwargs):
|
||||
super(HdfsSensor, self).__init__(*args, **kwargs)
|
||||
self.filepath = filepath
|
||||
session = settings.Session()
|
||||
db = session.query(DB).filter(DB.conn_id == hdfs_conn_id).first()
|
||||
if not db:
|
||||
raise Exception("conn_id doesn't exist in the repository")
|
||||
self.host = db.host
|
||||
self.port = db.port
|
||||
NAMENODES = [Namenode(self.host, self.port)]
|
||||
self.sb = HAClient(NAMENODES)
|
||||
session.commit()
|
||||
session.close()
|
||||
self.sb = HDFSHook(hdfs_conn_id).get_conn()
|
||||
|
||||
def poke(self):
|
||||
logging.getLogger("snakebite").setLevel(logging.WARNING)
|
||||
|
|
Загрузка…
Ссылка в новой задаче