Adding a preoperator to HiveToMySqloperator to allow idempotence

This commit is contained in:
Maxime 2015-05-21 12:29:07 +00:00
Родитель 8e3539db29
Коммит 0cfc4b89b6
2 изменённых файлов: 11 добавлений и 0 удалений

Просмотреть файл

@ -20,6 +20,11 @@ class HiveToMySqlTransfer(BaseOperator):
:type mysql_conn_id: str
:param hive_conn_id: desctination hive connection
:type hive_conn_id: str
:param mysql_preoperator: sql statement to run against mysql prior to
import, typically use to truncate of delete in place of the data
coming in, allowing the task to be idempotent (running the task
twice won't double load data)
:type mysql_preoperator: str
"""
__mapper_args__ = {
@ -36,12 +41,14 @@ class HiveToMySqlTransfer(BaseOperator):
mysql_table,
hive_cli_conn_id='hiveserver2_default',
mysql_conn_id='mysql_default',
mysql_preoperator=None,
*args, **kwargs):
super(HiveToMySqlTransfer, self).__init__(*args, **kwargs)
self.sql = sql
self.mysql_table = mysql_table
self.mysql_conn_id = mysql_conn_id
self.hive_cli_conn_id = hive_cli_conn_id
self.mysql_preoperator = mysql_preoperator
def execute(self, context):
hive = HiveServer2Hook(hiveserver2_conn_id=self.hive_cli_conn_id)
@ -49,5 +56,8 @@ class HiveToMySqlTransfer(BaseOperator):
results = hive.get_records(self.sql)
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
if self.mysql_preoperator:
logging.info("Running MySQL preoperator")
mysql.run(self.mysql_preoperator)
logging.info("Inserting rows into MySQL")
mysql.insert_rows(table=self.mysql_table, rows=results)

Просмотреть файл

@ -129,6 +129,7 @@ class HivePrestoTest(unittest.TestCase):
GROUP BY name
""",
mysql_table='test_static_babynames',
mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)