Adding Hive2FtpOperator
This commit is contained in:
Родитель
6f386e43f6
Коммит
068e351c63
|
@ -1,3 +1,4 @@
|
|||
from airflow.hooks.mysql_hook import MySqlHook
|
||||
from airflow.hooks.hive_hook import HiveHook
|
||||
from airflow.hooks.presto_hook import PrestoHook
|
||||
from airflow.hooks.samba_hook import SambaHook
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
import ftplib
|
||||
import os
|
||||
|
||||
from airflow import settings
|
||||
from airflow.models import DatabaseConnection
|
||||
|
||||
|
||||
class FtpHook(object):
|
||||
'''
|
||||
Allows for interaction with an ftp server.
|
||||
'''
|
||||
|
||||
def __init__(self, ftp_dbid=None):
|
||||
session = settings.Session()
|
||||
ftp_conn = session.query(
|
||||
DatabaseConnection).filter(
|
||||
DatabaseConnection.db_id == ftp_dbid).first()
|
||||
if not ftp_conn:
|
||||
raise Exception("The ftp id you provided isn't defined")
|
||||
self.host = ftp_conn.host
|
||||
self.port = ftp_conn.port or 21
|
||||
self.login = ftp_conn.login
|
||||
self.psw = ftp_conn.password
|
||||
self.db = ftp_conn.schema
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
def get_conn(self):
|
||||
ftp = ftplib.FTP()
|
||||
ftp.connect(self.host, self.port)
|
||||
ftp.login(self.login, self.psw)
|
||||
return ftp
|
||||
|
||||
def push_from_local(self, destination_filepath, local_filepath):
|
||||
ftp = self.get_conn()
|
||||
ftp.cwd(os.path.dirname(self.destination_filepath))
|
||||
f = open(local_filepath, 'r')
|
||||
filename = os.path.basename(destination_filepath)
|
||||
ftp.storbinary("STOR " + filename, f)
|
||||
f.close()
|
||||
ftp.quit()
|
|
@ -0,0 +1,39 @@
|
|||
import os
|
||||
from smbclient import SambaClient
|
||||
|
||||
from airflow import settings
|
||||
from airflow.models import DatabaseConnection
|
||||
|
||||
|
||||
class SambaHook(object):
|
||||
'''
|
||||
Allows for interaction with an samba server.
|
||||
'''
|
||||
|
||||
def __init__(self, samba_dbid=None):
|
||||
session = settings.Session()
|
||||
samba_conn = session.query(
|
||||
DatabaseConnection).filter(
|
||||
DatabaseConnection.db_id == samba_dbid).first()
|
||||
if not samba_conn:
|
||||
raise Exception("The samba id you provided isn't defined")
|
||||
self.host = samba_conn.host
|
||||
self.login = samba_conn.login
|
||||
self.psw = samba_conn.password
|
||||
self.db = samba_conn.schema
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
def get_conn(self):
|
||||
samba = SambaClient(
|
||||
server=self.host, share='', username=self.login, password=self.psw)
|
||||
return samba
|
||||
|
||||
def push_from_local(self, destination_filepath, local_filepath):
|
||||
samba = self.get_conn()
|
||||
samba.cwd(os.path.dirname(self.destination_filepath))
|
||||
f = open(local_filepath, 'r')
|
||||
filename = os.path.basename(destination_filepath)
|
||||
samba.storbinary("STOR " + filename, f)
|
||||
f.close()
|
||||
samba.quit()
|
|
@ -8,3 +8,4 @@ from sensors import HivePartitionSensor
|
|||
from sensors import HdfsSensor
|
||||
from email_operator import EmailOperator
|
||||
from dummy_operator import DummyOperator
|
||||
from hive2samba_operator import Hive2SambaOperator
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
import logging
|
||||
import tempfile
|
||||
|
||||
from airflow.configuration import conf
|
||||
from airflow.hooks import HiveHook, FtpHook
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.utils import apply_defaults
|
||||
|
||||
|
||||
class Hive2FtpOperator(BaseOperator):
|
||||
"""
|
||||
Executes hql code in a specific Hive database.
|
||||
|
||||
:param hql: the hql to be exported
|
||||
:type hql: string
|
||||
:param hive_dbid: reference to the Hive database
|
||||
:type hive_dbid: string
|
||||
:param ftp_dbid: reference to the ftp destination
|
||||
:type ftp_dbid: string
|
||||
"""
|
||||
|
||||
__mapper_args__ = {
|
||||
'polymorphic_identity': 'Hive2FtpOperator'
|
||||
}
|
||||
template_fields = ('hql',)
|
||||
template_ext = ('.hql', '.sql',)
|
||||
|
||||
@apply_defaults
|
||||
def __init__(
|
||||
self, hql,
|
||||
ftp_dbid,
|
||||
destination_filepath,
|
||||
hive_dbid=conf.get('hooks', 'HIVE_DEFAULT_DBID'),
|
||||
*args, **kwargs):
|
||||
super(Hive2FtpOperator, self).__init__(*args, **kwargs)
|
||||
|
||||
self.hive_dbid = hive_dbid
|
||||
self.ftp_dbid = ftp_dbid
|
||||
self.destination_filepath = self.destination_filepath
|
||||
self.ftp = FtpHook(ftp_dbid=ftp_dbid)
|
||||
self.hook = HiveHook(hive_dbid=hive_dbid)
|
||||
self.hql = hql.strip().rstrip(';')
|
||||
|
||||
def execute(self, execution_date):
|
||||
tmpfile = tempfile.NamedTemporaryFile()
|
||||
hql = """\
|
||||
INSERT OVERWRITE LOCAL DIRECTORY '{tmpfile.name}'
|
||||
ROW FORMAT DELIMITED
|
||||
FIELDS TERMINATED BY ','
|
||||
{self.hql};
|
||||
""".format(**locals())
|
||||
logging.info('Executing: ' + hql)
|
||||
self.hook.run_cli(hql=hql)
|
||||
|
||||
ftp.push_from_local(self.destination_filepath, tmpfile.name)
|
||||
|
||||
# Cleaning up
|
||||
hql = "DROP TABLE {table};"
|
||||
self.hook.run_cli(hql=self.hql)
|
||||
tmpfile.close()
|
|
@ -0,0 +1,60 @@
|
|||
import logging
|
||||
import tempfile
|
||||
|
||||
from airflow.configuration import conf
|
||||
from airflow.hooks import HiveHook, SambaHook
|
||||
from airflow.models import BaseOperator
|
||||
from airflow.utils import apply_defaults
|
||||
|
||||
|
||||
class Hive2SambaOperator(BaseOperator):
|
||||
"""
|
||||
Executes hql code in a specific Hive database.
|
||||
|
||||
:param hql: the hql to be exported
|
||||
:type hql: string
|
||||
:param hive_dbid: reference to the Hive database
|
||||
:type hive_dbid: string
|
||||
:param samba_dbid: reference to the samba destination
|
||||
:type samba_dbid: string
|
||||
"""
|
||||
|
||||
__mapper_args__ = {
|
||||
'polymorphic_identity': 'Hive2SambaOperator'
|
||||
}
|
||||
template_fields = ('hql',)
|
||||
template_ext = ('.hql', '.sql',)
|
||||
|
||||
@apply_defaults
|
||||
def __init__(
|
||||
self, hql,
|
||||
samba_dbid,
|
||||
destination_filepath,
|
||||
hive_dbid=conf.get('hooks', 'HIVE_DEFAULT_DBID'),
|
||||
*args, **kwargs):
|
||||
super(Hive2SambaOperator, self).__init__(*args, **kwargs)
|
||||
|
||||
self.hive_dbid = hive_dbid
|
||||
self.samba_dbid = samba_dbid
|
||||
self.destination_filepath = destination_filepath
|
||||
self.samba = SambaHook(samba_dbid=samba_dbid)
|
||||
self.hook = HiveHook(hive_dbid=hive_dbid)
|
||||
self.hql = hql.strip().rstrip(';')
|
||||
|
||||
def execute(self, execution_date):
|
||||
tmpfile = tempfile.NamedTemporaryFile()
|
||||
hql = """\
|
||||
INSERT OVERWRITE LOCAL DIRECTORY '{tmpfile.name}'
|
||||
ROW FORMAT DELIMITED
|
||||
FIELDS TERMINATED BY ','
|
||||
{self.hql};
|
||||
""".format(**locals())
|
||||
logging.info('Executing: ' + hql)
|
||||
self.hook.run_cli(hql=hql)
|
||||
|
||||
self.samba.push_from_local(self.destination_filepath, tmpfile.name)
|
||||
|
||||
# Cleaning up
|
||||
hql = "DROP TABLE {table};"
|
||||
self.hook.run_cli(hql=self.hql)
|
||||
tmpfile.close()
|
|
@ -15,6 +15,7 @@ mysql-python
|
|||
pandas
|
||||
pygments
|
||||
pyhive
|
||||
PySmbClient
|
||||
python-dateutil
|
||||
requests
|
||||
setproctitle
|
||||
|
|
Загрузка…
Ссылка в новой задаче