[AIRFLOW-2753] Add dataproc_job_id instance var holding actual DP jobId

Closes #3622 from jeffkpayne/master
This commit is contained in:
Jeffrey Scott Keone Payne 2018-07-22 21:54:21 +01:00 коммит произвёл Kaxil Naik
Родитель c1217c349f
Коммит 03ac60dd67
2 изменённых файлов: 125 добавлений и 15 удалений

Просмотреть файл

@ -669,6 +669,11 @@ class DataProcPigOperator(BaseOperator):
:type delegate_to: string
:param region: The specified region where the dataproc cluster is created.
:type region: string
:var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
This is useful for identifying or linking to the job in the Google Cloud Console
Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
an 8 character random string.
:vartype dataproc_job_id: string
"""
template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
template_ext = ('.pg', '.pig',)
@ -716,7 +721,10 @@ class DataProcPigOperator(BaseOperator):
job.add_jar_file_uris(self.dataproc_jars)
job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build(), self.region)
job_to_submit = job.build()
self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
hook.submit(hook.project_id, job_to_submit, self.region)
class DataProcHiveOperator(BaseOperator):
@ -749,6 +757,11 @@ class DataProcHiveOperator(BaseOperator):
:type delegate_to: string
:param region: The specified region where the dataproc cluster is created.
:type region: string
:var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
This is useful for identifying or linking to the job in the Google Cloud Console
Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
an 8 character random string.
:vartype dataproc_job_id: string
"""
template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
template_ext = ('.q',)
@ -797,7 +810,10 @@ class DataProcHiveOperator(BaseOperator):
job.add_jar_file_uris(self.dataproc_jars)
job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build(), self.region)
job_to_submit = job.build()
self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
hook.submit(hook.project_id, job_to_submit, self.region)
class DataProcSparkSqlOperator(BaseOperator):
@ -831,6 +847,11 @@ class DataProcSparkSqlOperator(BaseOperator):
:type delegate_to: string
:param region: The specified region where the dataproc cluster is created.
:type region: string
:var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
This is useful for identifying or linking to the job in the Google Cloud Console
Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
an 8 character random string.
:vartype dataproc_job_id: string
"""
template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
template_ext = ('.q',)
@ -879,7 +900,10 @@ class DataProcSparkSqlOperator(BaseOperator):
job.add_jar_file_uris(self.dataproc_jars)
job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build(), self.region)
job_to_submit = job.build()
self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
hook.submit(hook.project_id, job_to_submit, self.region)
class DataProcSparkOperator(BaseOperator):
@ -920,6 +944,11 @@ class DataProcSparkOperator(BaseOperator):
:type delegate_to: string
:param region: The specified region where the dataproc cluster is created.
:type region: string
:var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
This is useful for identifying or linking to the job in the Google Cloud Console
Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
an 8 character random string.
:vartype dataproc_job_id: string
"""
template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
@ -970,7 +999,10 @@ class DataProcSparkOperator(BaseOperator):
job.add_file_uris(self.files)
job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build(), self.region)
job_to_submit = job.build()
self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
hook.submit(hook.project_id, job_to_submit, self.region)
class DataProcHadoopOperator(BaseOperator):
@ -1011,6 +1043,11 @@ class DataProcHadoopOperator(BaseOperator):
:type delegate_to: string
:param region: The specified region where the dataproc cluster is created.
:type region: string
:var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
This is useful for identifying or linking to the job in the Google Cloud Console
Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
an 8 character random string.
:vartype dataproc_job_id: string
"""
template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
@ -1061,10 +1098,14 @@ class DataProcHadoopOperator(BaseOperator):
job.add_file_uris(self.files)
job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build(), self.region)
job_to_submit = job.build()
self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
hook.submit(hook.project_id, job_to_submit, self.region)
class DataProcPySparkOperator(BaseOperator):
# TODO Add docs around dataproc_job_id.
"""
Start a PySpark Job on a Cloud DataProc cluster.
@ -1102,6 +1143,11 @@ class DataProcPySparkOperator(BaseOperator):
:type delegate_to: string
:param region: The specified region where the dataproc cluster is created.
:type region: string
:var dataproc_job_id: The actual "jobId" as submitted to the Dataproc API.
This is useful for identifying or linking to the job in the Google Cloud Console
Dataproc UI, as the actual "jobId" submitted to the Dataproc API is appended with
an 8 character random string.
:vartype dataproc_job_id: string
"""
template_fields = ['arguments', 'job_name', 'cluster_name', 'dataproc_jars']
@ -1192,7 +1238,10 @@ class DataProcPySparkOperator(BaseOperator):
job.add_python_file_uris(self.pyfiles)
job.set_job_name(self.job_name)
hook.submit(hook.project_id, job.build(), self.region)
job_to_submit = job.build()
self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]
hook.submit(hook.project_id, job_to_submit, self.region)
class DataprocWorkflowTemplateBaseOperator(BaseOperator):

Просмотреть файл

@ -45,7 +45,7 @@ except ImportError:
except ImportError:
mock = None
from mock import Mock
from mock import MagicMock, Mock
from mock import patch
TASK_ID = 'test-dataproc-operator'
@ -80,6 +80,27 @@ MAIN_URI = 'test-uri'
TEMPLATE_ID = 'template-id'
HOOK = 'airflow.contrib.operators.dataproc_operator.DataProcHook'
DATAPROC_JOB_ID = 'dataproc_job_id'
DATAPROC_JOB_TO_SUBMIT = {
'job': {
'reference': {
'projectId': PROJECT_ID,
'jobId': DATAPROC_JOB_ID,
},
'placement': {
'clusterName': CLUSTER_NAME
}
}
}
def _assert_dataproc_job_id(mock_hook, dataproc_task):
hook = mock_hook.return_value
job = MagicMock()
job.build.return_value = DATAPROC_JOB_TO_SUBMIT
hook.create_job_template.return_value = job
dataproc_task.execute(None)
assert dataproc_task.dataproc_job_id == DATAPROC_JOB_ID
class DataprocClusterCreateOperatorTest(unittest.TestCase):
@ -434,31 +455,51 @@ class DataprocClusterDeleteOperatorTest(unittest.TestCase):
class DataProcHadoopOperatorTest(unittest.TestCase):
# Unit test for the DataProcHadoopOperator
def test_hook_correct_region(self):
with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
with patch(HOOK) as mock_hook:
dataproc_task = DataProcHadoopOperator(
task_id=TASK_ID,
region=REGION
)
dataproc_task.execute(None)
mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
REGION)
def test_dataproc_job_id_is_set(self):
with patch(HOOK) as mock_hook:
dataproc_task = DataProcHadoopOperator(
task_id=TASK_ID
)
_assert_dataproc_job_id(mock_hook, dataproc_task)
class DataProcHiveOperatorTest(unittest.TestCase):
# Unit test for the DataProcHiveOperator
def test_hook_correct_region(self):
with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
with patch(HOOK) as mock_hook:
dataproc_task = DataProcHiveOperator(
task_id=TASK_ID,
region=REGION
)
dataproc_task.execute(None)
mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
REGION)
def test_dataproc_job_id_is_set(self):
with patch(HOOK) as mock_hook:
dataproc_task = DataProcHiveOperator(
task_id=TASK_ID
)
_assert_dataproc_job_id(mock_hook, dataproc_task)
class DataProcPySparkOperatorTest(unittest.TestCase):
# Unit test for the DataProcPySparkOperator
def test_hook_correct_region(self):
with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
with patch(HOOK) as mock_hook:
dataproc_task = DataProcPySparkOperator(
task_id=TASK_ID,
main=MAIN_URI,
@ -466,19 +507,39 @@ class DataProcPySparkOperatorTest(unittest.TestCase):
)
dataproc_task.execute(None)
mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
REGION)
def test_dataproc_job_id_is_set(self):
with patch(HOOK) as mock_hook:
dataproc_task = DataProcPySparkOperator(
task_id=TASK_ID,
main=MAIN_URI
)
_assert_dataproc_job_id(mock_hook, dataproc_task)
class DataProcSparkOperatorTest(unittest.TestCase):
# Unit test for the DataProcSparkOperator
def test_hook_correct_region(self):
with patch('airflow.contrib.operators.dataproc_operator.DataProcHook') as mock_hook:
with patch(HOOK) as mock_hook:
dataproc_task = DataProcSparkOperator(
task_id=TASK_ID,
region=REGION
)
dataproc_task.execute(None)
mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY, REGION)
mock_hook.return_value.submit.assert_called_once_with(mock.ANY, mock.ANY,
REGION)
def test_dataproc_job_id_is_set(self):
with patch(HOOK) as mock_hook:
dataproc_task = DataProcSparkOperator(
task_id=TASK_ID
)
_assert_dataproc_job_id(mock_hook, dataproc_task)
class DataprocWorkflowTemplateInstantiateOperatorTest(unittest.TestCase):