Fix names + typos and load_file args
This commit is contained in:
Родитель
e56be2c046
Коммит
327fa2e76e
|
@ -2,7 +2,7 @@ from airflow.hooks.base_hook import BaseHook
|
|||
from airflow.configuration import conf
|
||||
import logging
|
||||
|
||||
from hdfs import InsecureClient, Hdfserror
|
||||
from hdfs import InsecureClient, HdfsError
|
||||
from airflow.utils import AirflowException
|
||||
|
||||
|
||||
|
@ -14,24 +14,24 @@ class WebHDFSHook(BaseHook):
|
|||
"""
|
||||
Interact with HDFS. This class is a wrapper around the hdfscli library.
|
||||
"""
|
||||
def __init__(self, hdfs_conn_id='hdfs_default'):
|
||||
self.hdfs_conn_id = hdfs_conn_id
|
||||
def __init__(self, webhdfs_conn_id='webhdfs_default'):
|
||||
self.webhdfs_conn_id = webhdfs_conn_id
|
||||
|
||||
def get_conn(self):
|
||||
"""
|
||||
Returns a hdfscli InsecureClient object.
|
||||
"""
|
||||
nn_connections = self.get_connections(self.hdfs_conn_id)
|
||||
nn_connections = self.get_connections(self.webhdfs_conn_id)
|
||||
for nn in nn_connections:
|
||||
try:
|
||||
logging.debug('Trying namenode {nn.host}'.format(nn))
|
||||
client = InsecureClient('http://{nn.host}:{nn.port}'.format(nn))
|
||||
logging.debug('Trying namenode {}'.format(nn.host))
|
||||
client = InsecureClient('http://{nn.host}:{nn.port}'.format(nn=nn))
|
||||
client.content('/')
|
||||
logging.debug('Using namenode {nn.host} for hook'.format(nn))
|
||||
logging.debug('Using namenode {} for hook'.format(nn.host))
|
||||
return client
|
||||
except Hdfserror as e:
|
||||
except HdfsError as e:
|
||||
logging.debug("Read operation on namenode {nn.host} failed with"
|
||||
" error: {e.messaage}".format(**locals()))
|
||||
" error: {e.message}".format(**locals()))
|
||||
nn_hosts = [c.host for c in nn_connections]
|
||||
no_nn_error = "Read operations failed on the namenodes below:\n{}".format("\n".join(nn_hosts))
|
||||
raise WebHDFSHookException(no_nn_error)
|
||||
|
@ -44,7 +44,7 @@ class WebHDFSHook(BaseHook):
|
|||
return bool(c.status(hdfs_path, strict=False))
|
||||
|
||||
def load_file(self, source, destination, overwrite=True, parallelism=1,
|
||||
tmp_dir=None, chunk_size=2 ** 16, progress=None, **kwargs):
|
||||
**kwargs):
|
||||
"""
|
||||
Uploads a file to HDFS
|
||||
|
||||
|
@ -77,8 +77,5 @@ class WebHDFSHook(BaseHook):
|
|||
local_path=source,
|
||||
overwrite=overwrite,
|
||||
n_threads=parallelism,
|
||||
tmp_dir=tmp_dir,
|
||||
chunk_size=chunk_size,
|
||||
progress=progress,
|
||||
**kwargs)
|
||||
logging.debug("Uploaded file {} to {}".format(source, destination))
|
||||
|
|
|
@ -287,14 +287,14 @@ class WebHdfsSensor(BaseSensorOperator):
|
|||
def __init__(
|
||||
self,
|
||||
filepath,
|
||||
hdfs_conn_id='hdfs_default',
|
||||
webhdfs_conn_id='webhdfs_default',
|
||||
*args, **kwargs):
|
||||
super(WebHdfsSensor, self).__init__(*args, **kwargs)
|
||||
self.filepath = filepath
|
||||
self.hdfs_conn_id = hdfs_conn_id
|
||||
self.hdfs_conn_id = webhdfs_conn_id
|
||||
|
||||
def poke(self, context):
|
||||
c = hooks.WebHDFSHook(self.hdfs_conn_id).get_conn()
|
||||
c = hooks.WebHDFSHook(self.webhdfs_conn_id).get_conn()
|
||||
logging.info(
|
||||
'Poking for file {self.filepath} '.format(**locals()))
|
||||
return c.check_for_path(hdfs_path=self.filepath)
|
||||
|
|
Загрузка…
Ссылка в новой задаче