diff --git a/airflow/gcp/hooks/kubernetes_engine.py b/airflow/gcp/hooks/kubernetes_engine.py index 8c60c4a2a6..c1b91ae243 100644 --- a/airflow/gcp/hooks/kubernetes_engine.py +++ b/airflow/gcp/hooks/kubernetes_engine.py @@ -131,6 +131,7 @@ class GKEClusterHook(GoogleCloudBaseHook): cluster_proto.resource_labels.update({key: val}) return cluster_proto + @GoogleCloudBaseHook.fallback_to_default_project_id def delete_cluster( self, name: str, @@ -177,6 +178,7 @@ class GKEClusterHook(GoogleCloudBaseHook): self.log.info('Assuming Success: %s', error.message) return None + @GoogleCloudBaseHook.fallback_to_default_project_id def create_cluster( self, cluster: Union[Dict, Cluster], @@ -234,6 +236,7 @@ class GKEClusterHook(GoogleCloudBaseHook): self.log.info('Assuming Success: %s', error.message) return self.get_cluster(name=cluster.name).self_link + @GoogleCloudBaseHook.fallback_to_default_project_id def get_cluster( self, name: str, diff --git a/airflow/gcp/operators/kubernetes_engine.py b/airflow/gcp/operators/kubernetes_engine.py index 68b38de417..f7fd043933 100644 --- a/airflow/gcp/operators/kubernetes_engine.py +++ b/airflow/gcp/operators/kubernetes_engine.py @@ -71,9 +71,9 @@ class GKEClusterDeleteOperator(BaseOperator): @apply_defaults def __init__(self, - project_id: str, name: str, location: str, + project_id: str = None, gcp_conn_id: str = 'google_cloud_default', api_version: str = 'v2', *args, @@ -146,9 +146,9 @@ class GKEClusterCreateOperator(BaseOperator): @apply_defaults def __init__(self, - project_id: str, location: str, body: Optional[Union[Dict, Cluster]], + project_id: str = None, gcp_conn_id: str = 'google_cloud_default', api_version: str = 'v2', *args, diff --git a/tests/gcp/hooks/test_kubernetes_engine.py b/tests/gcp/hooks/test_kubernetes_engine.py index 8d4a4a4f6b..1e498c9b1c 100644 --- a/tests/gcp/hooks/test_kubernetes_engine.py +++ b/tests/gcp/hooks/test_kubernetes_engine.py @@ -98,7 +98,7 @@ class TestGKEClusterHookDelete(unittest.TestCase): message = 'Not Found' self.gke_hook._client.delete_cluster.side_effect = NotFound(message=message) - self.gke_hook.delete_cluster('not-existing') + self.gke_hook.delete_cluster(name='not-existing', project_id=TEST_GCP_PROJECT_ID) wait_mock.assert_not_called() convert_mock.assert_not_called() log_mock.info.assert_any_call("Assuming Success: %s", message) @@ -116,7 +116,7 @@ class TestGKEClusterHookDelete(unittest.TestCase): self.gke_hook._client.delete_cluster.side_effect = AirflowException('400') with self.assertRaises(AirflowException): - self.gke_hook.delete_cluster('a-cluster') + self.gke_hook.delete_cluster(name='a-cluster') wait_mock.assert_not_called() convert_mock.assert_not_called() @@ -142,7 +142,7 @@ class TestGKEClusterHookCreate(unittest.TestCase): client_create = self.gke_hook._client.create_cluster = mock.Mock() - self.gke_hook.create_cluster(mock_cluster_proto, + self.gke_hook.create_cluster(cluster=mock_cluster_proto, project_id=TEST_GCP_PROJECT_ID, retry=retry_mock, timeout=timeout_mock) @@ -169,7 +169,7 @@ class TestGKEClusterHookCreate(unittest.TestCase): client_create = self.gke_hook._client.create_cluster = mock.Mock() proto_mock = convert_mock.return_value = mock.Mock() - self.gke_hook.create_cluster(mock_cluster_dict, + self.gke_hook.create_cluster(cluster=mock_cluster_dict, project_id=TEST_GCP_PROJECT_ID, retry=retry_mock, timeout=timeout_mock) @@ -196,23 +196,18 @@ class TestGKEClusterHookCreate(unittest.TestCase): wait_mock.assert_not_called() convert_mock.assert_not_called() - @mock.patch( - 'airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook.project_id', - new_callable=PropertyMock, - return_value=None - ) @mock.patch( "airflow.gcp.hooks.kubernetes_engine.GKEClusterHook.log") @mock.patch("airflow.gcp.hooks.kubernetes_engine.ParseDict") @mock.patch( "airflow.gcp.hooks.kubernetes_engine.GKEClusterHook.wait_for_operation") - def test_create_cluster_already_exists(self, wait_mock, convert_mock, log_mock, mock_project_id): + def test_create_cluster_already_exists(self, wait_mock, convert_mock, log_mock): from google.api_core.exceptions import AlreadyExists # To force an error message = 'Already Exists' self.gke_hook._client.create_cluster.side_effect = AlreadyExists(message=message) - self.gke_hook.create_cluster({}) + self.gke_hook.create_cluster(cluster={}, project_id=TEST_GCP_PROJECT_ID) wait_mock.assert_not_called() self.assertEqual(convert_mock.call_count, 1) log_mock.info.assert_any_call("Assuming Success: %s", message)