Merge pull request #165 from mistercrunch/mysql2hive

MySQL 2 Hive operator
This commit is contained in:
Maxime Beauchemin 2015-04-05 21:03:35 -07:00
Родитель 4c37c23f8f 32600dc08f
Коммит 7e55905857
7 изменённых файлов: 227 добавлений и 4 удалений

Просмотреть файл

@ -43,7 +43,7 @@ class HiveCliHook(BaseHook):
f.write(hql)
f.flush()
fname = f.name
hive_cmd = ['hive' , '-f' , fname]
hive_cmd = ['hive', '-f', fname]
if self.hive_cli_params:
hive_params_list = self.hive_cli_params.split()
hive_cmd.extend(hive_params_list)
@ -60,6 +60,69 @@ class HiveCliHook(BaseHook):
if sp.returncode:
raise Exception(all_err)
def load_file(
self,
filepath,
table,
delimiter="'",
field_dict=None,
create=True,
overwrite=True,
partition=None,
recreate=False):
"""
Loads a local file into Hive
Note that the table genearted in Hive uses ``STORED AS textfile``
which isn't the most efficient serialization format. If a
large amount of data is loaded and/or if the tables gets
queried considerably, you may want to use this operator only to
stage the data into a temporary table before loading it into its
final destination using a ``HiveOperator``.
:param table: target Hive table, use dot notation to target a
specific database
:type table: str
:param create: whether to create the table if it doesn't exist
:type create: bool
:param recreate: whether to drop and recreate the table at every
execution
:type recreate: bool
:param partition: target partition as a dict of partition columns
and values
:type partition: dict
:param delimiter: field delimiter in the file
:type delimiter: str
"""
hql = ''
if recreate:
hql += "DROP TABLE IF EXISTS {table};\n"
if create or recreate:
fields = ",\n ".join(
[k + ' ' + v for k, v in field_dict.items()])
hql += "CREATE EXTERNAL TABLE IF NOT EXISTS {table} (\n{fields})\n"
if partition:
pfields = ",\n ".join(
[p + " STRING" for p in partition])
hql += "PARTITIONED BY ({pfields})\n"
hql += "ROW FORMAT DELIMITED\n"
hql += "FIELDS TERMINATED BY '{delimiter}'\n"
hql += "STORED AS textfile;"
hql = hql.format(**locals())
logging.info(hql)
self.run_cli(hql)
hql = "LOAD DATA LOCAL INPATH '{filepath}' "
if overwrite:
hql += "OVERWRITE "
hql += "INTO TABLE {table} "
if partition:
pvals = ", ".join(
["{0}='{1}'".format(k, v) for k, v in partition.items()])
hql += "PARTITION ({pvals});"
hql = hql.format(**locals())
logging.info(hql)
self.run_cli(hql)
def kill(self):
if hasattr(self, 'sp'):
if self.sp.poll() is None:

Просмотреть файл

@ -15,8 +15,9 @@ _operators = {
],
'dummy_operator': ['DummyOperator'],
'email_operator': ['EmailOperator'],
'hive2samba_operator': ['Hive2SambaOperator'],
'hive_to_samba_operator': ['Hive2SambaOperator'],
'mysql_operator': ['MySqlOperator'],
'mysql_to_hive': ['MySqlToHiveTransfer'],
'postgres_operator': ['PostgresOperator'],
'sensors': [
'SqlSensor',

Просмотреть файл

@ -0,0 +1,111 @@
import csv
import logging
from tempfile import NamedTemporaryFile
import MySQLdb
from airflow.hooks import HiveCliHook, MySqlHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
class MySqlToHiveTransfer(BaseOperator):
"""
Moves data from MySql to Hive. The operator runs your query against
MySQL, stores the file locally before loading it into a Hive table.
If the ``create`` or ``recreate`` arguments are set to ``True``,
a ``CREATE TABLE`` and ``DROP TABLE`` statements are generated.
Hive data types are inferred from the cursors's metadata.
Note that the table genearted in Hive uses ``STORED AS textfile``
which isn't the most efficient serialization format. If a
large amount of data is loaded and/or if the tables gets
queried considerably, you may want to use this operator only to
stage the data into a temporary table before loading it into its
final destination using a ``HiveOperator``.
:param hive_table: target Hive table, use dot notation to target a
specific database
:type hive_table: str
:param create: whether to create the table if it doesn't exist
:type create: bool
:param recreate: whether to drop and recreate the table at every
execution
:type recreate: bool
:param partition: target partition as a dict of partition columns
and values
:type partition: dict
:param delimiter: field delimiter in the file
:type delimiter: str
:param mysql_conn_id: source mysql connection
:type mysql_conn_id: str
:param hive_conn_id: desctination hive connection
:type hive_conn_id: str
"""
__mapper_args__ = {
'polymorphic_identity': 'MySqlToHiveOperator'
}
template_fields = ('sql',)
template_ext = ('.sql',)
ui_color = '#a0e08c'
@apply_defaults
def __init__(
self,
sql,
hive_table,
create=True,
recreate=False,
partition=None,
delimiter=chr(1),
mysql_conn_id='hive_cli_default',
hive_cli_conn_id='hive_cli_default',
*args, **kwargs):
super(MySqlToHiveTransfer, self).__init__(*args, **kwargs)
self.sql = sql
self.hive_table = hive_table
self.partition = partition
self.create = create
self.recreate = recreate
self.delimiter = delimiter
self.hive = HiveCliHook(hive_cli_conn_id=hive_cli_conn_id)
self.mysql = MySqlHook(mysql_conn_id=mysql_conn_id)
@classmethod
def type_map(cls, mysql_type):
t = MySQLdb.constants.FIELD_TYPE
d = {
t.BIT: 'INT',
t.DECIMAL: 'DOUBLE',
t.DOUBLE: 'DOUBLE',
t.FLOAT: 'DOUBLE',
t.INT24: 'INT',
t.LONG: 'INT',
t.LONGLONG: 'BIGINT',
t.SHORT: 'INT',
t.YEAR: 'INT',
}
return d[mysql_type] if mysql_type in d else 'STRING'
def execute(self, context):
logging.info("Dumping MySQL query results to local file")
conn = self.mysql.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
with NamedTemporaryFile("w") as f:
csv_writer = csv.writer(f, delimiter=self.delimiter)
field_dict = {
i[0]: self.type_map(i[1]) for i in cursor.description}
csv_writer.writerows(cursor)
f.flush()
cursor.close()
conn.close()
logging.info("Loading file into Hive")
self.hive.load_file(
f.name,
self.hive_table,
field_dict=field_dict,
create=self.create,
partition=self.partition,
delimiter=self.delimiter,
recreate=self.recreate)

Просмотреть файл

@ -10,8 +10,9 @@ HEADER = """\
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
__ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/"""
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
"""
BASE_LOG_URL = '/admin/airflow/log'
AIRFLOW_HOME = os.path.expanduser(conf.get('core', 'AIRFLOW_HOME'))

Просмотреть файл

@ -8,6 +8,17 @@ the DAG when instantiated. All operators derive from BaseOperator and
inherit a whole lot of attributes and method that way. Refer to the
BaseOperator documentation for more details.
There are 3 main types of operators:
- Operators that performs an **action**, or tells another system to
perform an action
- **Transfer** operators move data from a system to another
- **Sensors** are a certain type of operators that will keep running until a
certain criteria is met. Things like a specific file landing in HDFS or
S3, a partition appearing in Hive, or a specific time of the day. Sensors
are derived from ``BaseSensorOperator`` and run a poke
method at a specified ``poke_interval`` until it returns ``True``.
.. automodule:: airflow.operators
:show-inheritance:
:members:
@ -20,6 +31,7 @@ BaseOperator documentation for more details.
HiveOperator,
HivePartitionSensor,
MySqlOperator,
MySqlToHiveTransfer,
PostgresOperator,
PrestoCheckOperator,
PrestoIntervalCheckOperator,

Просмотреть файл

@ -11,6 +11,41 @@ LOCAL_EXECUTOR = executors.LocalExecutor()
DEFAULT_DATE = datetime(2015, 1, 1)
configuration.test_mode()
class TransferTests(unittest.TestCase):
def setUp(self):
configuration.test_mode()
utils.initdb()
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
dag = DAG('hive_test', default_args=args)
self.dag = dag
def test_mysql_to_hive(self):
sql = "SELECT * FROM task_instance LIMIT 1000;"
t = operators.MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id='airflow_db',
sql=sql,
hive_table='airflow.test_mysql_to_hive',
recreate=True,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_mysql_to_hive_partition(self):
sql = "SELECT * FROM task_instance LIMIT 1000;"
t = operators.MySqlToHiveTransfer(
task_id='test_m2h',
mysql_conn_id='airflow_db',
sql=sql,
hive_table='airflow.test_mysql_to_hive_part',
partition={'ds': '2015-01-02'},
recreate=False,
create=True,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
class HivePrestoTest(unittest.TestCase):
def setUp(self):