[AIRFLOW-230] [HiveServer2Hook] adding multi statements support
Changing the library from pyhive to impyla broke the behavior where multiple statements, including statements that don't return results were previously supported and aren't anymore. impyla raises an exception if any of the statements doesn't return result. We have tasks that run multiple statements including DDL and want to run them atomically. Closes #1583 from mistercrunch/hooks_hive_presto [AIRFLOW-230] [HiveServer2Hook] adding multi statements support
This commit is contained in:
Родитель
901e8f2a95
Коммит
a599167c43
|
@ -465,6 +465,7 @@ class HiveServer2Hook(BaseHook):
|
|||
database=db.schema or 'default')
|
||||
|
||||
def get_results(self, hql, schema='default', arraysize=1000):
|
||||
from impala.error import ProgrammingError
|
||||
with self.get_conn() as conn:
|
||||
if isinstance(hql, basestring):
|
||||
hql = [hql]
|
||||
|
@ -475,7 +476,14 @@ class HiveServer2Hook(BaseHook):
|
|||
for statement in hql:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(statement)
|
||||
records = cur.fetchall()
|
||||
records = []
|
||||
try:
|
||||
# impala Lib raises when no results are returned
|
||||
# we're silencing here as some statements in the list
|
||||
# may be `SET` or DDL
|
||||
records = cur.fetchall()
|
||||
except ProgrammingError:
|
||||
logging.debug("get_results returned no records")
|
||||
if records:
|
||||
results = {
|
||||
'data': records,
|
||||
|
|
|
@ -1474,6 +1474,15 @@ if 'HiveOperator' in dir(operators):
|
|||
hook = HiveServer2Hook()
|
||||
hook.get_records(sql)
|
||||
|
||||
def test_multi_statements(self):
|
||||
from airflow.hooks.hive_hooks import HiveServer2Hook
|
||||
sqls = [
|
||||
"CREATE TABLE IF NOT EXISTS test_multi_statements (i INT)",
|
||||
"DROP TABLE test_multi_statements",
|
||||
]
|
||||
hook = HiveServer2Hook()
|
||||
hook.get_records(sqls)
|
||||
|
||||
def test_get_metastore_databases(self):
|
||||
if six.PY2:
|
||||
from airflow.hooks.hive_hooks import HiveMetastoreHook
|
||||
|
|
Загрузка…
Ссылка в новой задаче