[AIRFLOW-5401] Add support for project_id from connection in GKE
This change adds support for reading project_id value from connection configuration GKE operators.
This commit is contained in:
Родитель
004f353342
Коммит
ae0d03eba7
|
@ -131,6 +131,7 @@ class GKEClusterHook(GoogleCloudBaseHook):
|
||||||
cluster_proto.resource_labels.update({key: val})
|
cluster_proto.resource_labels.update({key: val})
|
||||||
return cluster_proto
|
return cluster_proto
|
||||||
|
|
||||||
|
@GoogleCloudBaseHook.fallback_to_default_project_id
|
||||||
def delete_cluster(
|
def delete_cluster(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -177,6 +178,7 @@ class GKEClusterHook(GoogleCloudBaseHook):
|
||||||
self.log.info('Assuming Success: %s', error.message)
|
self.log.info('Assuming Success: %s', error.message)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@GoogleCloudBaseHook.fallback_to_default_project_id
|
||||||
def create_cluster(
|
def create_cluster(
|
||||||
self,
|
self,
|
||||||
cluster: Union[Dict, Cluster],
|
cluster: Union[Dict, Cluster],
|
||||||
|
@ -234,6 +236,7 @@ class GKEClusterHook(GoogleCloudBaseHook):
|
||||||
self.log.info('Assuming Success: %s', error.message)
|
self.log.info('Assuming Success: %s', error.message)
|
||||||
return self.get_cluster(name=cluster.name).self_link
|
return self.get_cluster(name=cluster.name).self_link
|
||||||
|
|
||||||
|
@GoogleCloudBaseHook.fallback_to_default_project_id
|
||||||
def get_cluster(
|
def get_cluster(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
|
|
@ -71,9 +71,9 @@ class GKEClusterDeleteOperator(BaseOperator):
|
||||||
|
|
||||||
@apply_defaults
|
@apply_defaults
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
project_id: str,
|
|
||||||
name: str,
|
name: str,
|
||||||
location: str,
|
location: str,
|
||||||
|
project_id: str = None,
|
||||||
gcp_conn_id: str = 'google_cloud_default',
|
gcp_conn_id: str = 'google_cloud_default',
|
||||||
api_version: str = 'v2',
|
api_version: str = 'v2',
|
||||||
*args,
|
*args,
|
||||||
|
@ -146,9 +146,9 @@ class GKEClusterCreateOperator(BaseOperator):
|
||||||
|
|
||||||
@apply_defaults
|
@apply_defaults
|
||||||
def __init__(self,
|
def __init__(self,
|
||||||
project_id: str,
|
|
||||||
location: str,
|
location: str,
|
||||||
body: Optional[Union[Dict, Cluster]],
|
body: Optional[Union[Dict, Cluster]],
|
||||||
|
project_id: str = None,
|
||||||
gcp_conn_id: str = 'google_cloud_default',
|
gcp_conn_id: str = 'google_cloud_default',
|
||||||
api_version: str = 'v2',
|
api_version: str = 'v2',
|
||||||
*args,
|
*args,
|
||||||
|
|
|
@ -98,7 +98,7 @@ class TestGKEClusterHookDelete(unittest.TestCase):
|
||||||
message = 'Not Found'
|
message = 'Not Found'
|
||||||
self.gke_hook._client.delete_cluster.side_effect = NotFound(message=message)
|
self.gke_hook._client.delete_cluster.side_effect = NotFound(message=message)
|
||||||
|
|
||||||
self.gke_hook.delete_cluster('not-existing')
|
self.gke_hook.delete_cluster(name='not-existing', project_id=TEST_GCP_PROJECT_ID)
|
||||||
wait_mock.assert_not_called()
|
wait_mock.assert_not_called()
|
||||||
convert_mock.assert_not_called()
|
convert_mock.assert_not_called()
|
||||||
log_mock.info.assert_any_call("Assuming Success: %s", message)
|
log_mock.info.assert_any_call("Assuming Success: %s", message)
|
||||||
|
@ -116,7 +116,7 @@ class TestGKEClusterHookDelete(unittest.TestCase):
|
||||||
self.gke_hook._client.delete_cluster.side_effect = AirflowException('400')
|
self.gke_hook._client.delete_cluster.side_effect = AirflowException('400')
|
||||||
|
|
||||||
with self.assertRaises(AirflowException):
|
with self.assertRaises(AirflowException):
|
||||||
self.gke_hook.delete_cluster('a-cluster')
|
self.gke_hook.delete_cluster(name='a-cluster')
|
||||||
wait_mock.assert_not_called()
|
wait_mock.assert_not_called()
|
||||||
convert_mock.assert_not_called()
|
convert_mock.assert_not_called()
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ class TestGKEClusterHookCreate(unittest.TestCase):
|
||||||
|
|
||||||
client_create = self.gke_hook._client.create_cluster = mock.Mock()
|
client_create = self.gke_hook._client.create_cluster = mock.Mock()
|
||||||
|
|
||||||
self.gke_hook.create_cluster(mock_cluster_proto,
|
self.gke_hook.create_cluster(cluster=mock_cluster_proto,
|
||||||
project_id=TEST_GCP_PROJECT_ID,
|
project_id=TEST_GCP_PROJECT_ID,
|
||||||
retry=retry_mock,
|
retry=retry_mock,
|
||||||
timeout=timeout_mock)
|
timeout=timeout_mock)
|
||||||
|
@ -169,7 +169,7 @@ class TestGKEClusterHookCreate(unittest.TestCase):
|
||||||
client_create = self.gke_hook._client.create_cluster = mock.Mock()
|
client_create = self.gke_hook._client.create_cluster = mock.Mock()
|
||||||
proto_mock = convert_mock.return_value = mock.Mock()
|
proto_mock = convert_mock.return_value = mock.Mock()
|
||||||
|
|
||||||
self.gke_hook.create_cluster(mock_cluster_dict,
|
self.gke_hook.create_cluster(cluster=mock_cluster_dict,
|
||||||
project_id=TEST_GCP_PROJECT_ID,
|
project_id=TEST_GCP_PROJECT_ID,
|
||||||
retry=retry_mock,
|
retry=retry_mock,
|
||||||
timeout=timeout_mock)
|
timeout=timeout_mock)
|
||||||
|
@ -196,23 +196,18 @@ class TestGKEClusterHookCreate(unittest.TestCase):
|
||||||
wait_mock.assert_not_called()
|
wait_mock.assert_not_called()
|
||||||
convert_mock.assert_not_called()
|
convert_mock.assert_not_called()
|
||||||
|
|
||||||
@mock.patch(
|
|
||||||
'airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook.project_id',
|
|
||||||
new_callable=PropertyMock,
|
|
||||||
return_value=None
|
|
||||||
)
|
|
||||||
@mock.patch(
|
@mock.patch(
|
||||||
"airflow.gcp.hooks.kubernetes_engine.GKEClusterHook.log")
|
"airflow.gcp.hooks.kubernetes_engine.GKEClusterHook.log")
|
||||||
@mock.patch("airflow.gcp.hooks.kubernetes_engine.ParseDict")
|
@mock.patch("airflow.gcp.hooks.kubernetes_engine.ParseDict")
|
||||||
@mock.patch(
|
@mock.patch(
|
||||||
"airflow.gcp.hooks.kubernetes_engine.GKEClusterHook.wait_for_operation")
|
"airflow.gcp.hooks.kubernetes_engine.GKEClusterHook.wait_for_operation")
|
||||||
def test_create_cluster_already_exists(self, wait_mock, convert_mock, log_mock, mock_project_id):
|
def test_create_cluster_already_exists(self, wait_mock, convert_mock, log_mock):
|
||||||
from google.api_core.exceptions import AlreadyExists
|
from google.api_core.exceptions import AlreadyExists
|
||||||
# To force an error
|
# To force an error
|
||||||
message = 'Already Exists'
|
message = 'Already Exists'
|
||||||
self.gke_hook._client.create_cluster.side_effect = AlreadyExists(message=message)
|
self.gke_hook._client.create_cluster.side_effect = AlreadyExists(message=message)
|
||||||
|
|
||||||
self.gke_hook.create_cluster({})
|
self.gke_hook.create_cluster(cluster={}, project_id=TEST_GCP_PROJECT_ID)
|
||||||
wait_mock.assert_not_called()
|
wait_mock.assert_not_called()
|
||||||
self.assertEqual(convert_mock.call_count, 1)
|
self.assertEqual(convert_mock.call_count, 1)
|
||||||
log_mock.info.assert_any_call("Assuming Success: %s", message)
|
log_mock.info.assert_any_call("Assuming Success: %s", message)
|
||||||
|
|
Загрузка…
Ссылка в новой задаче