[AIRFLOW-1192] Some enhancements to qubole_operator

1. Upgrade qds_sdk version to latest
2. Add support to run Zeppelin Notebooks
3. Move out initialization of QuboleHook from
init()

Closes #2322 from msumit/AIRFLOW-1192
This commit is contained in:
Sumit Maheshwari 2017-06-07 09:09:50 +02:00 коммит произвёл Bolke de Bruin
Родитель 6b890d157c
Коммит 6be02475f8
5 изменённых файлов: 115 добавлений и 16 удалений

Просмотреть файл

@ -42,7 +42,7 @@ COMMAND_CLASSES = {
"dbimportcmd": DbImportCommand
}
HYPHEN_ARGS = ['cluster_label', 'app_id']
HYPHEN_ARGS = ['cluster_label', 'app_id', 'note_id']
POSITIONAL_ARGS = ['sub_command', 'parameters']
@ -57,7 +57,7 @@ COMMAND_ARGS = {
'name'],
'dbtapquerycmd': ['db_tap_id', 'query', 'macros', 'tags', 'name'],
'sparkcmd': ['program', 'cmdline', 'sql', 'script_location', 'macros', 'tags',
'cluster_label', 'language', 'app_id', 'name', 'arguments',
'cluster_label', 'language', 'app_id', 'name', 'arguments', 'note_id',
'user_program_arguments'],
'dbexportcmd': ['mode', 'hive_table', 'partition_spec', 'dbtap_id', 'db_table',
'db_update_mode', 'db_update_keys', 'export_dir',

Просмотреть файл

@ -102,8 +102,9 @@ class QuboleOperator(BaseOperator):
``sub_command``, ``script``, ``files``, ``archives``, ``program``, ``cmdline``,
``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``,
``tags``, ``name``, ``parameters``, ``dbtap_id``, ``hive_table``, ``db_table``,
``split_column``, ``db_update_keys``, ``export_dir``, ``partition_spec``. You
can also use ``.txt`` files for template driven use cases.
``split_column``, ``note_id``, ``db_update_keys``, ``export_dir``,
``partition_spec``, ``qubole_conn_id``, ``arguments``, ``user_program_arguments``.
You can also use ``.txt`` files for template driven use cases.
.. note:: In QuboleOperator there is a default handler for task failures and retries,
which generally kills the command running at QDS for the corresponding task
@ -114,8 +115,10 @@ class QuboleOperator(BaseOperator):
template_fields = ('query', 'script_location', 'sub_command', 'script', 'files',
'archives', 'program', 'cmdline', 'sql', 'where_clause', 'tags',
'extract_query', 'boundary_query', 'macros', 'name', 'parameters',
'dbtap_id', 'hive_table', 'db_table', 'split_column',
'db_update_keys', 'export_dir', 'partition_spec')
'dbtap_id', 'hive_table', 'db_table', 'split_column', 'note_id',
'db_update_keys', 'export_dir', 'partition_spec', 'qubole_conn_id',
'arguments', 'user_program_arguments')
template_ext = ('.txt',)
ui_color = '#3064A1'
ui_fgcolor = '#fff'
@ -125,7 +128,6 @@ class QuboleOperator(BaseOperator):
self.args = args
self.kwargs = kwargs
self.kwargs['qubole_conn_id'] = qubole_conn_id
self.hook = QuboleHook(*self.args, **self.kwargs)
super(QuboleOperator, self).__init__(*args, **kwargs)
if self.on_failure_callback is None:
@ -135,21 +137,23 @@ class QuboleOperator(BaseOperator):
self.on_retry_callback = QuboleHook.handle_failure_retry
def execute(self, context):
# Reinitiating the hook, as some template fields might have changed
self.hook = QuboleHook(*self.args, **self.kwargs)
return self.hook.execute(context)
return self.get_hook().execute(context)
def on_kill(self, ti):
self.hook.kill(ti)
def on_kill(self, ti=None):
self.get_hook().kill(ti)
def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
return self.hook.get_results(ti, fp, inline, delim, fetch)
return self.get_hook().get_results(ti, fp, inline, delim, fetch)
def get_log(self, ti):
return self.hook.get_log(ti)
return self.get_hook().get_log(ti)
def get_jobs_id(self, ti):
return self.hook.get_jobs_id(ti)
return self.get_hook().get_jobs_id(ti)
def get_hook(self):
# Reinitiating the hook, as some template fields might have changed
return QuboleHook(*self.args, **self.kwargs)
def __getattribute__(self, name):
if name in QuboleOperator.template_fields:

Просмотреть файл

@ -73,6 +73,7 @@ PyOpenSSL
PySmbClient
python-daemon
python-dateutil
qds-sdk>=1.9.6
redis
rednose
requests

Просмотреть файл

@ -179,7 +179,7 @@ password = [
'flask-bcrypt>=0.7.1',
]
github_enterprise = ['Flask-OAuthlib>=0.9.1']
qds = ['qds-sdk>=1.9.0']
qds = ['qds-sdk>=1.9.6']
cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
redis = ['redis>=2.10.5']

Просмотреть файл

@ -0,0 +1,94 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
from datetime import datetime
from airflow.models import DAG, Connection
from airflow.utils import db
from airflow.contrib.hooks.qubole_hook import QuboleHook
from airflow.contrib.operators.qubole_operator import QuboleOperator
try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None
DAG_ID="qubole_test_dag"
TASK_ID="test_task"
DEFAULT_CONN="qubole_default"
TEMPLATE_CONN = "my_conn_id"
DEFAULT_DATE = datetime(2017, 1, 1)
class QuboleOperatorTest(unittest.TestCase):
def setUp(self):
db.merge_conn(
Connection(conn_id=DEFAULT_CONN, conn_type='HTTP'))
def test_init_with_default_connection(self):
op = QuboleOperator(task_id=TASK_ID)
self.assertEqual(op.task_id, TASK_ID)
self.assertEqual(op.qubole_conn_id, DEFAULT_CONN)
def test_init_with_template_connection(self):
dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
with dag:
task = QuboleOperator(task_id=TASK_ID, dag=dag,
qubole_conn_id="{{ dag_run.conf['qubole_conn_id'] }}")
result = task.render_template('qubole_conn_id', "{{ qubole_conn_id }}",
{'qubole_conn_id' : TEMPLATE_CONN})
self.assertEqual(task.task_id, TASK_ID)
self.assertEqual(result, TEMPLATE_CONN)
def test_get_hook(self):
dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
with dag:
task = QuboleOperator(task_id=TASK_ID, command_type='hivecmd', dag=dag)
hook = task.get_hook()
self.assertEqual(hook.__class__, QuboleHook)
def test_hyphen_args_note_id(self):
dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
with dag:
task = QuboleOperator(task_id=TASK_ID, command_type='sparkcmd',
note_id="123", dag=dag)
self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[0],
"--note-id=123")
def test_position_args_parameters(self):
dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
with dag:
task = QuboleOperator(task_id=TASK_ID, command_type='pigcmd',
parameters="key1=value1 key2=value2", dag=dag)
self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[1],
"key1=value1")
self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[2],
"key2=value2")