[AIRFLOW-2016] Add support for Dataproc Workflow Templates

Closes #2958 from DanSedov/master
This commit is contained in:
Dan Sedov 2018-01-24 07:23:48 -08:00 коммит произвёл Chris Riccomini
Родитель 18d09a9481
Коммит 0990ba8c02
3 изменённых файлов: 301 добавлений и 16 удалений

Просмотреть файл

@ -141,24 +141,76 @@ class _DataProcJobBuilder:
return self.job
class _DataProcOperation(LoggingMixin):
"""Continuously polls Dataproc Operation until it completes."""
def __init__(self, dataproc_api, operation):
self.dataproc_api = dataproc_api
self.operation = operation
self.operation_name = self.operation['name']
def wait_for_done(self):
if self._check_done():
return True
self.log.info(
'Waiting for Dataproc Operation %s to finish', self.operation_name)
while True:
time.sleep(10)
self.operation = (
self.dataproc_api.projects()
.regions()
.operations()
.get(name=self.operation_name)
.execute(num_retries=5))
if self._check_done():
return True
def get(self):
return self.operation
def _check_done(self):
if 'done' in self.operation:
if 'error' in self.operation:
self.log.warning(
'Dataproc Operation %s failed with error: %s',
self.operation_name, self.operation['error']['message'])
self._raise_error()
else:
self.log.info(
'Dataproc Operation %s done', self.operation['name'])
return True
return False
def _raise_error(self):
raise Exception('Google Dataproc Operation %s failed: %s' %
(self.operation_name, self.operation['error']['message']))
class DataProcHook(GoogleCloudBaseHook):
"""Hook for Google Cloud Dataproc APIs."""
def __init__(self,
gcp_conn_id='google_cloud_default',
delegate_to=None):
delegate_to=None,
api_version='v1'):
super(DataProcHook, self).__init__(gcp_conn_id, delegate_to)
self.api_version = api_version
def get_conn(self):
"""
Returns a Google Cloud DataProc service object.
"""
"""Returns a Google Cloud Dataproc service object."""
http_authorized = self._authorize()
return build('dataproc', 'v1', http=http_authorized)
return build('dataproc', self.api_version, http=http_authorized)
def submit(self, project_id, job, region='global'):
submitted = _DataProcJob(self.get_conn(), project_id, job, region)
if not submitted.wait_for_done():
submitted.raise_error("DataProcTask has errors")
submitted.raise_error('DataProcTask has errors')
def create_job_template(self, task_id, cluster_name, job_type, properties):
return _DataProcJobBuilder(self.project_id, task_id, cluster_name, job_type,
properties)
return _DataProcJobBuilder(self.project_id, task_id, cluster_name,
job_type, properties)
def await(self, operation):
"""Awaits for Google Cloud Dataproc Operation to complete."""
submitted = _DataProcOperation(self.get_conn(), operation)
submitted.wait_for_done()

Просмотреть файл

@ -14,8 +14,11 @@
#
import time
import uuid
from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
@ -92,7 +95,7 @@ class DataprocClusterCreateOperator(BaseOperator):
:type service_account_scopes: list[string]
"""
template_fields = ['cluster_name', ]
template_fields = ['cluster_name']
@apply_defaults
def __init__(self,
@ -928,3 +931,113 @@ class DataProcPySparkOperator(BaseOperator):
job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build(), self.region)
class DataprocWorkflowTemplateBaseOperator(BaseOperator):
template_fields = ['template_id', 'template']
@apply_defaults
def __init__(self,
project_id,
region='global',
gcp_conn_id='google_cloud_default',
delegate_to=None,
*args,
**kwargs):
super(DataprocWorkflowTemplateBaseOperator, self).__init__(*args, **kwargs)
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.project_id = project_id
self.region = region
self.hook = DataProcHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
api_version='v1beta2'
)
def execute(self, context):
self.hook.await(self.start())
def start(self, context):
raise AirflowException('plese start a workflow operation')
class DataprocWorkflowTemplateInstantiateOperator(DataprocWorkflowTemplateBaseOperator):
"""
Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait
until the WorkflowTemplate is finished executing.
Please refer to:
https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate
:param template_id: The id of the template.
:type template_id: string
:param project_id: The ID of the google cloud project in which
the template runs
:type project_id: string
:param region: leave as 'global', might become relevant in the future
:type region: string
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:type gcp_conn_id: string
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
"""
@apply_defaults
def __init__(self, template_id, *args, **kwargs):
(super(DataprocWorkflowTemplateInstantiateOperator, self)
.__init__(*args, **kwargs))
self.template_id = template_id
def start(self):
self.log.info('Instantiating Template: %s', self.template_id)
return (
self.hook.get_conn().projects().regions().workflowTemplates()
.instantiate(
name=('projects/%s/regions/%s/workflowTemplates/%s' %
(self.project_id, self.region, self.template_id)),
body={'instanceId': str(uuid.uuid1())})
.execute())
class DataprocWorkflowTemplateInstantiateInlineOperator(
DataprocWorkflowTemplateBaseOperator):
"""
Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc. The operator will
wait until the WorkflowTemplate is finished executing.
Please refer to:
https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline
:param template: The template contents.
:type template: map
:param project_id: The ID of the google cloud project in which
the template runs
:type project_id: string
:param region: leave as 'global', might become relevant in the future
:type region: string
:param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
:type gcp_conn_id: string
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: string
"""
@apply_defaults
def __init__(self, template, *args, **kwargs):
(super(DataprocWorkflowTemplateInstantiateInlineOperator, self)
.__init__(*args, **kwargs))
self.template = template
def start(self):
self.log.info('Instantiating Inline Template')
return (
self.hook.get_conn().projects().regions().workflowTemplates()
.instantiateInline(
parent='projects/%s/regions/%s' % (self.project_id, self.region),
instanceId=str(uuid.uuid1()),
body=self.template)
.execute())

Просмотреть файл

@ -18,12 +18,15 @@ import re
import unittest
from airflow import DAG
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator
from airflow.contrib.operators.dataproc_operator import DataprocClusterDeleteOperator
from airflow.contrib.operators.dataproc_operator import DataProcHadoopOperator
from airflow.contrib.operators.dataproc_operator import DataProcHiveOperator
from airflow.contrib.operators.dataproc_operator import DataProcPySparkOperator
from airflow.contrib.operators.dataproc_operator import DataProcSparkOperator
from airflow.contrib.operators.dataproc_operator import \
DataprocClusterCreateOperator,\
DataprocClusterDeleteOperator,\
DataProcHadoopOperator,\
DataProcHiveOperator,\
DataProcPySparkOperator,\
DataProcSparkOperator,\
DataprocWorkflowTemplateInstantiateInlineOperator,\
DataprocWorkflowTemplateInstantiateOperator
from airflow.version import version
from copy import deepcopy
@ -55,7 +58,7 @@ WORKER_MACHINE_TYPE = 'n1-standard-2'
WORKER_DISK_SIZE = 100
NUM_PREEMPTIBLE_WORKERS = 2
LABEL1 = {}
LABEL2 = {'application':'test', 'year': 2017}
LABEL2 = {'application': 'test', 'year': 2017}
SERVICE_ACCOUNT_SCOPES = [
'https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/bigtable.data'
@ -63,6 +66,10 @@ SERVICE_ACCOUNT_SCOPES = [
DEFAULT_DATE = datetime.datetime(2017, 6, 6)
REGION = 'test-region'
MAIN_URI = 'test-uri'
TEMPLATE_ID = 'template-id'
HOOK = 'airflow.contrib.operators.dataproc_operator.DataProcHook'
class DataprocClusterCreateOperatorTest(unittest.TestCase):
# Unit test for the DataprocClusterCreateOperator
@ -290,3 +297,116 @@ class DataProcSparkOperatorTest(unittest.TestCase):
dataproc_task.execute(None)
mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
class DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase):
def setUp(self):
# Setup service.projects().regions().workflowTemplates().instantiate().execute()
self.operation = {'name': 'operation', 'done': True}
self.mock_execute = Mock()
self.mock_execute.execute.return_value = self.operation
self.mock_workflows = Mock()
self.mock_workflows.instantiate.return_value = self.mock_execute
self.mock_regions = Mock()
self.mock_regions.workflowTemplates.return_value = self.mock_workflows
self.mock_projects = Mock()
self.mock_projects.regions.return_value = self.mock_regions
self.mock_conn = Mock()
self.mock_conn.projects.return_value = self.mock_projects
self.dag = DAG(
'test_dag',
default_args={
'owner': 'airflow',
'start_date': DEFAULT_DATE,
'end_date': DEFAULT_DATE,
},
schedule_interval='@daily')
def test_workflow(self):
with patch(HOOK) as MockHook:
hook = MockHook()
hook.get_conn.return_value = self.mock_conn
hook.await.return_value = None
dataproc_task = DataprocWorkflowTemplateInstantiateOperator(
task_id=TASK_ID,
project_id=PROJECT_ID,
region=REGION,
template_id=TEMPLATE_ID,
dag=self.dag
)
dataproc_task.execute(None)
template_name = (
'projects/test-project-id/regions/test-region/'
'workflowTemplates/template-id')
self.mock_workflows.instantiate.assert_called_once_with(
name=template_name,
body=mock.ANY)
hook.await.assert_called_once_with(self.operation)
class DataprocWorkflowTemplateInstantiateInlineOperatorTest(unittest.TestCase):
def setUp(self):
# Setup service.projects().regions().workflowTemplates().instantiateInline()
# .execute()
self.operation = {'name': 'operation', 'done': True}
self.mock_execute = Mock()
self.mock_execute.execute.return_value = self.operation
self.mock_workflows = Mock()
self.mock_workflows.instantiateInline.return_value = self.mock_execute
self.mock_regions = Mock()
self.mock_regions.workflowTemplates.return_value = self.mock_workflows
self.mock_projects = Mock()
self.mock_projects.regions.return_value = self.mock_regions
self.mock_conn = Mock()
self.mock_conn.projects.return_value = self.mock_projects
self.dag = DAG(
'test_dag',
default_args={
'owner': 'airflow',
'start_date': DEFAULT_DATE,
'end_date': DEFAULT_DATE,
},
schedule_interval='@daily')
def test_iniline_workflow(self):
with patch(HOOK) as MockHook:
hook = MockHook()
hook.get_conn.return_value = self.mock_conn
hook.await.return_value = None
template = {
"placement": {
"managed_cluster": {
"cluster_name": CLUSTER_NAME,
"config": {
"gce_cluster_config": {
"zone_uri": ZONE,
}
}
}
},
"jobs": [
{
"step_id": "say-hello",
"pig_job": {
"query": "sh echo hello"
}
}],
}
dataproc_task = DataprocWorkflowTemplateInstantiateInlineOperator(
task_id=TASK_ID,
project_id=PROJECT_ID,
region=REGION,
template=template,
dag=self.dag
)
dataproc_task.execute(None)
self.mock_workflows.instantiateInline.assert_called_once_with(
parent='projects/test-project-id/regions/test-region',
instanceId=mock.ANY,
body=template)
hook.await.assert_called_once_with(self.operation)