[AIRFLOW-5671] rename DataProcHook to DataprocHook (#6404)

This commit is contained in:
mislo 2019-10-28 17:01:46 +01:00 коммит произвёл Kamil Breguła
Родитель ffe7ba9819
Коммит a9fc1a2814
7 изменённых файлов: 47 добавлений и 33 удалений

Просмотреть файл

@ -202,7 +202,7 @@ The following table shows changes in import paths.
|airflow.contrib.hooks.gcp_compute_hook.GceHook |airflow.gcp.hooks.compute.ComputeEngineHook |
|airflow.contrib.hooks.gcp_container_hook.GKEClusterHook |airflow.gcp.hooks.kubernetes_engine.GKEClusterHook |
|airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook |airflow.gcp.hooks.dataflow.DataFlowHook |
|airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook |airflow.gcp.hooks.dataproc.DataProcHook |
|airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook |airflow.gcp.hooks.dataproc.DataprocHook |
|airflow.contrib.hooks.gcp_dlp_hook.CloudDLPHook |airflow.gcp.hooks.dlp.CloudDLPHook |
|airflow.contrib.hooks.gcp_function_hook.GcfHook |airflow.gcp.hooks.functions.GcfHook |
|airflow.contrib.hooks.gcp_kms_hook.GoogleCloudKMSHook |airflow.gcp.hooks.kms.GoogleCloudKMSHook |

Просмотреть файл

@ -21,9 +21,23 @@
import warnings
# pylint: disable=unused-import
from airflow.gcp.hooks.dataproc import DataProcHook, DataprocJobStatus # noqa
from airflow.gcp.hooks.dataproc import DataprocHook, DataprocJobStatus # noqa
warnings.warn(
"This module is deprecated. Please use `airflow.gcp.hooks.dataproc`.",
DeprecationWarning, stacklevel=2
)
class DataProcHook(DataprocHook):
"""
This class is deprecated. Please use `airflow.gcp.hooks.dataproc.DataprocHook`.
"""
def __init__(self, *args, **kwargs):
warnings.warn(
"This class is deprecated. Please use `airflow.gcp.hooks.dataproc.DataprocHook`.",
DeprecationWarning, stacklevel=2
)
super().__init__(*args, **kwargs)

Просмотреть файл

@ -427,7 +427,7 @@ class _DataProcOperation(LoggingMixin):
(self.operation_name, self.operation['error']['message']))
class DataProcHook(GoogleCloudBaseHook):
class DataprocHook(GoogleCloudBaseHook):
"""
Hook for Google Cloud Dataproc APIs.
@ -576,7 +576,7 @@ class DataProcHook(GoogleCloudBaseHook):
:return:
"""
while True:
state = DataProcHook.get_cluster_state(self.get_conn(), project_id, region, cluster_name)
state = DataprocHook.get_cluster_state(self.get_conn(), project_id, region, cluster_name)
if state is None:
logger.info("No state for cluster '%s'", cluster_name)
time.sleep(15)
@ -595,7 +595,7 @@ class DataProcHook(GoogleCloudBaseHook):
:param cluster_name:
:return:
"""
cluster = DataProcHook.find_cluster(service, project_id, region, cluster_name)
cluster = DataprocHook.find_cluster(service, project_id, region, cluster_name)
if cluster and 'status' in cluster:
return cluster['status']['state']
else:
@ -612,7 +612,7 @@ class DataProcHook(GoogleCloudBaseHook):
:param cluster_name:
:return:
"""
cluster_list = DataProcHook.get_cluster_list_for_project(service, project_id, region)
cluster_list = DataprocHook.get_cluster_list_for_project(service, project_id, region)
cluster = [c for c in cluster_list if c['clusterName'] == cluster_name]
if cluster:
return cluster[0]
@ -702,7 +702,7 @@ class DataProcHook(GoogleCloudBaseHook):
:param operation_name:
:return:
"""
response = DataProcHook.wait_for_operation_done(service, operation_name)
response = DataprocHook.wait_for_operation_done(service, operation_name)
if response.get('done'):
if 'error' in response:
raise AirflowException(str(response['error']))
@ -711,9 +711,9 @@ class DataProcHook(GoogleCloudBaseHook):
setattr(
DataProcHook,
DataprocHook,
"await",
deprecation.deprecated(
DataProcHook.wait, "renamed to 'wait' for Python3.7 compatibility"
DataprocHook.wait, "renamed to 'wait' for Python3.7 compatibility"
),
)

Просмотреть файл

@ -31,7 +31,7 @@ from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from airflow.exceptions import AirflowException
from airflow.gcp.hooks.dataproc import DataProcHook
from airflow.gcp.hooks.dataproc import DataprocHook
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils import timezone
@ -56,7 +56,7 @@ class DataprocOperationBaseOperator(BaseOperator):
self.delegate_to = delegate_to
self.project_id = project_id
self.region = region
self.hook = DataProcHook(
self.hook = DataprocHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
api_version='v1beta2'
@ -282,7 +282,7 @@ class DataprocClusterCreateOperator(DataprocOperationBaseOperator):
if state == 'DELETING':
raise Exception('Tried to create a cluster but it\'s in DELETING, something went wrong.')
if state == 'ERROR':
cluster = DataProcHook.find_cluster(service, self.project_id, self.region, self.cluster_name)
cluster = DataprocHook.find_cluster(service, self.project_id, self.region, self.cluster_name)
try:
error_details = cluster['status']['details']
except KeyError:
@ -292,9 +292,9 @@ class DataprocClusterCreateOperator(DataprocOperationBaseOperator):
self.log.info('Dataproc cluster creation resulted in an ERROR state running diagnostics')
self.log.info(error_details)
diagnose_operation_name = \
DataProcHook.execute_dataproc_diagnose(service, self.project_id,
DataprocHook.execute_dataproc_diagnose(service, self.project_id,
self.region, self.cluster_name)
diagnose_result = DataProcHook.wait_for_operation_done(service, diagnose_operation_name)
diagnose_result = DataprocHook.wait_for_operation_done(service, diagnose_operation_name)
if diagnose_result.get('response') and diagnose_result.get('response').get('outputUri'):
output_uri = diagnose_result.get('response').get('outputUri')
self.log.info('Diagnostic information for ERROR cluster available at [%s]', output_uri)
@ -474,7 +474,7 @@ class DataprocClusterCreateOperator(DataprocOperationBaseOperator):
return cluster_data
def _usable_existing_cluster_present(self, service):
existing_cluster = DataProcHook.find_cluster(service, self.project_id, self.region, self.cluster_name)
existing_cluster = DataprocHook.find_cluster(service, self.project_id, self.region, self.cluster_name)
if existing_cluster:
self.log.info(
'Cluster %s already exists... Checking status...',
@ -488,8 +488,8 @@ class DataprocClusterCreateOperator(DataprocOperationBaseOperator):
return True
elif existing_status == 'DELETING':
while DataProcHook.find_cluster(service, self.project_id, self.region, self.cluster_name) \
and DataProcHook.get_cluster_state(service, self.project_id,
while DataprocHook.find_cluster(service, self.project_id, self.region, self.cluster_name) \
and DataprocHook.get_cluster_state(service, self.project_id,
self.region, self.cluster_name) == 'DELETING':
self.log.info('Existing cluster is deleting, waiting for it to finish')
time.sleep(15)
@ -497,10 +497,10 @@ class DataprocClusterCreateOperator(DataprocOperationBaseOperator):
elif existing_status == 'ERROR':
self.log.info('Existing cluster in ERROR state, deleting it first')
operation_name = DataProcHook.execute_delete(service, self.project_id,
operation_name = DataprocHook.execute_delete(service, self.project_id,
self.region, self.cluster_name)
self.log.info("Cluster delete operation name: %s", operation_name)
DataProcHook.wait_for_operation_done_or_error(service, operation_name)
DataprocHook.wait_for_operation_done_or_error(service, operation_name)
return False
@ -509,7 +509,7 @@ class DataprocClusterCreateOperator(DataprocOperationBaseOperator):
Create a new cluster on Google Cloud Dataproc.
"""
self.log.info('Creating cluster: %s', self.cluster_name)
hook = DataProcHook(
hook = DataprocHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)
@ -765,7 +765,7 @@ class DataProcJobBaseOperator(BaseOperator):
self.region = region
self.job_error_states = job_error_states if job_error_states is not None else {'ERROR'}
self.hook = DataProcHook(gcp_conn_id=gcp_conn_id,
self.hook = DataprocHook(gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to)
self.job_template = None
self.job = None

Просмотреть файл

@ -22,7 +22,7 @@ import unittest
from mock import MagicMock
from airflow.gcp.hooks.dataproc import DataProcHook, _DataProcJob
from airflow.gcp.hooks.dataproc import DataprocHook, _DataProcJob
from tests.compat import mock
from tests.gcp.utils.base_gcp_mock import GCP_PROJECT_ID_HOOK_UNIT_TEST
@ -42,9 +42,9 @@ class TestDataProcHook(unittest.TestCase):
def setUp(self):
with mock.patch(BASE_STRING.format('GoogleCloudBaseHook.__init__'),
new=mock_init):
self.dataproc_hook = DataProcHook()
self.dataproc_hook = DataprocHook()
@mock.patch("airflow.gcp.hooks.dataproc.DataProcHook._authorize")
@mock.patch("airflow.gcp.hooks.dataproc.DataprocHook._authorize")
@mock.patch("airflow.gcp.hooks.dataproc.build")
def test_dataproc_client_creation(self, mock_build, mock_authorize):
result = self.dataproc_hook.get_conn()
@ -55,7 +55,7 @@ class TestDataProcHook(unittest.TestCase):
@mock.patch(DATAPROC_STRING.format('_DataProcJob'))
def test_submit(self, job_mock):
with mock.patch(DATAPROC_STRING.format('DataProcHook.get_conn',
with mock.patch(DATAPROC_STRING.format('DataprocHook.get_conn',
return_value=None)):
self.dataproc_hook.submit(GCP_PROJECT_ID_HOOK_UNIT_TEST, JOB)
job_mock.assert_called_once_with(mock.ANY, GCP_PROJECT_ID_HOOK_UNIT_TEST, JOB, GCP_REGION,

Просмотреть файл

@ -90,7 +90,7 @@ LABELS = {
'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')
}
HOOK = 'airflow.gcp.operators.dataproc.DataProcHook'
HOOK = 'airflow.gcp.operators.dataproc.DataprocHook'
DATAPROC_JOB_ID = 'dataproc_job_id'
DATAPROC_JOB_TO_SUBMIT = {
'job': {
@ -798,7 +798,7 @@ class TestDataProcJobBaseOperator(unittest.TestCase):
def test_dataproc_job_base(self):
with patch(
'airflow.gcp.operators.dataproc.DataProcHook.project_id',
'airflow.gcp.operators.dataproc.DataprocHook.project_id',
new_callable=PropertyMock) as mock_project_id:
mock_project_id.return_value = GCP_PROJECT_ID
task = DataProcJobBaseOperator(
@ -829,7 +829,7 @@ class TestDataProcHadoopOperator(unittest.TestCase):
schedule_interval='@daily')
@mock.patch(
'airflow.gcp.hooks.dataproc.DataProcHook.project_id',
'airflow.gcp.hooks.dataproc.DataprocHook.project_id',
new_callable=PropertyMock,
return_value=GCP_PROJECT_ID
)
@ -914,7 +914,7 @@ class TestDataProcHiveOperator(unittest.TestCase):
schedule_interval='@daily')
@mock.patch(
'airflow.gcp.hooks.dataproc.DataProcHook.project_id',
'airflow.gcp.hooks.dataproc.DataprocHook.project_id',
new_callable=PropertyMock,
return_value=GCP_PROJECT_ID
)
@ -999,7 +999,7 @@ class TestDataProcPigOperator(unittest.TestCase):
schedule_interval='@daily')
@mock.patch(
'airflow.gcp.hooks.dataproc.DataProcHook.project_id',
'airflow.gcp.hooks.dataproc.DataprocHook.project_id',
new_callable=PropertyMock,
return_value=GCP_PROJECT_ID
)
@ -1089,7 +1089,7 @@ class TestDataProcPySparkOperator(unittest.TestCase):
schedule_interval='@daily')
@mock.patch(
'airflow.gcp.hooks.dataproc.DataProcHook.project_id',
'airflow.gcp.hooks.dataproc.DataprocHook.project_id',
new_callable=PropertyMock,
return_value=GCP_PROJECT_ID
)
@ -1177,7 +1177,7 @@ class TestDataProcSparkOperator(unittest.TestCase):
schedule_interval='@daily')
@mock.patch(
'airflow.gcp.hooks.dataproc.DataProcHook.project_id',
'airflow.gcp.hooks.dataproc.DataprocHook.project_id',
new_callable=PropertyMock,
return_value=GCP_PROJECT_ID
)

Просмотреть файл

@ -37,7 +37,7 @@ HOOK = [
"airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook",
),
(
"airflow.gcp.hooks.dataproc.DataProcHook",
"airflow.gcp.hooks.dataproc.DataprocHook",
"airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook",
),
(