[AIRFLOW-2019] Update DataflowHook for updating Streaming type job
Closes #2965 from ivanwirawan/AIRFLOW-2019
This commit is contained in:
Родитель
97ca9791c3
Коммит
2794819687
|
@ -159,8 +159,12 @@ class DataFlowHook(GoogleCloudBaseHook):
|
|||
_DataflowJob(self.get_conn(), variables['project'],
|
||||
name, self.poll_sleep).wait_for_done()
|
||||
|
||||
def start_java_dataflow(self, task_id, variables, dataflow, job_class=None):
|
||||
name = task_id + "-" + str(uuid.uuid1())[:8]
|
||||
def start_java_dataflow(self, task_id, variables, dataflow, job_class=None,
|
||||
append_job_name=True):
|
||||
if append_job_name:
|
||||
name = task_id + "-" + str(uuid.uuid1())[:8]
|
||||
else:
|
||||
name = task_id
|
||||
variables['jobName'] = name
|
||||
|
||||
def label_formatter(labels_dict):
|
||||
|
@ -171,13 +175,21 @@ class DataFlowHook(GoogleCloudBaseHook):
|
|||
self._start_dataflow(task_id, variables, name,
|
||||
command_prefix, label_formatter)
|
||||
|
||||
def start_template_dataflow(self, task_id, variables, parameters, dataflow_template):
|
||||
name = task_id + "-" + str(uuid.uuid1())[:8]
|
||||
def start_template_dataflow(self, task_id, variables, parameters, dataflow_template,
|
||||
append_job_name=True):
|
||||
if append_job_name:
|
||||
name = task_id + "-" + str(uuid.uuid1())[:8]
|
||||
else:
|
||||
name = task_id
|
||||
self._start_template_dataflow(
|
||||
name, variables, parameters, dataflow_template)
|
||||
|
||||
def start_python_dataflow(self, task_id, variables, dataflow, py_options):
|
||||
name = task_id + "-" + str(uuid.uuid1())[:8]
|
||||
def start_python_dataflow(self, task_id, variables, dataflow, py_options,
|
||||
append_job_name=True):
|
||||
if append_job_name:
|
||||
name = task_id + "-" + str(uuid.uuid1())[:8]
|
||||
else:
|
||||
name = task_id
|
||||
variables["job_name"] = name
|
||||
|
||||
def label_formatter(labels_dict):
|
||||
|
@ -193,6 +205,8 @@ class DataFlowHook(GoogleCloudBaseHook):
|
|||
for attr, value in variables.items():
|
||||
if attr == 'labels':
|
||||
command += label_formatter(value)
|
||||
elif value is None or value.__len__() < 1:
|
||||
command.append("--" + attr)
|
||||
else:
|
||||
command.append("--" + attr + "=" + value)
|
||||
return command
|
||||
|
|
Загрузка…
Ссылка в новой задаче