Merge pull request #227 from mistercrunch/mysql_preoperator

Adding a preoperator to HiveToMySqloperator to allow idempotence
This commit is contained in:
Maxime Beauchemin 2015-05-21 08:50:26 -04:00
Родитель 8e3539db29 0cfc4b89b6
Коммит 89df0b34e4
2 изменённых файлов: 11 добавлений и 0 удалений

Просмотреть файл

@ -20,6 +20,11 @@ class HiveToMySqlTransfer(BaseOperator):
:type mysql_conn_id: str
:param hive_conn_id: desctination hive connection
:type hive_conn_id: str
:param mysql_preoperator: sql statement to run against mysql prior to
import, typically use to truncate of delete in place of the data
coming in, allowing the task to be idempotent (running the task
twice won't double load data)
:type mysql_preoperator: str
"""
__mapper_args__ = {
@ -36,12 +41,14 @@ class HiveToMySqlTransfer(BaseOperator):
mysql_table,
hive_cli_conn_id='hiveserver2_default',
mysql_conn_id='mysql_default',
mysql_preoperator=None,
*args, **kwargs):
super(HiveToMySqlTransfer, self).__init__(*args, **kwargs)
self.sql = sql
self.mysql_table = mysql_table
self.mysql_conn_id = mysql_conn_id
self.hive_cli_conn_id = hive_cli_conn_id
self.mysql_preoperator = mysql_preoperator
def execute(self, context):
hive = HiveServer2Hook(hiveserver2_conn_id=self.hive_cli_conn_id)
@ -49,5 +56,8 @@ class HiveToMySqlTransfer(BaseOperator):
results = hive.get_records(self.sql)
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
if self.mysql_preoperator:
logging.info("Running MySQL preoperator")
mysql.run(self.mysql_preoperator)
logging.info("Inserting rows into MySQL")
mysql.insert_rows(table=self.mysql_table, rows=results)

Просмотреть файл

@ -129,6 +129,7 @@ class HivePrestoTest(unittest.TestCase):
GROUP BY name
""",
mysql_table='test_static_babynames',
mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)