diff --git a/.gitignore b/.gitignore index d005437890..114236d467 100644 --- a/.gitignore +++ b/.gitignore @@ -146,3 +146,9 @@ npm-debug.log* static/dist derby.log metastore_db + +# Airflow log files when airflow is run locally +airflow-*.err +airflow-*.out +airflow-*.log +airflow-*.pid diff --git a/airflow/contrib/example_dags/example_gcp_compute.py b/airflow/contrib/example_dags/example_gcp_compute.py index e4abe2e152..51a55b6a99 100644 --- a/airflow/contrib/example_dags/example_gcp_compute.py +++ b/airflow/contrib/example_dags/example_gcp_compute.py @@ -24,7 +24,7 @@ Engine instance. This DAG relies on the following Airflow variables https://airflow.apache.org/concepts.html#variables * PROJECT_ID - Google Cloud Platform project where the Compute Engine instance exists. -* LOCATION - Google Cloud Platform zone where the instance exists. +* ZONE - Google Cloud Platform zone where the instance exists. * INSTANCE - Name of the Compute Engine instance. * SHORT_MACHINE_TYPE_NAME - Machine type resource name to set, e.g. 'n1-standard-1'. See https://cloud.google.com/compute/docs/machine-types @@ -37,19 +37,23 @@ from airflow import models from airflow.contrib.operators.gcp_compute_operator import GceInstanceStartOperator, \ GceInstanceStopOperator, GceSetMachineTypeOperator -# [START howto_operator_gce_args] -PROJECT_ID = models.Variable.get('PROJECT_ID', '') -LOCATION = models.Variable.get('LOCATION', '') -INSTANCE = models.Variable.get('INSTANCE', '') -SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', '') -SET_MACHINE_TYPE_BODY = { - 'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, SHORT_MACHINE_TYPE_NAME) -} +# [START howto_operator_gce_args_common] +PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow') +ZONE = models.Variable.get('ZONE', 'europe-west1-b') +INSTANCE = models.Variable.get('INSTANCE', 'test-instance') default_args = { 'start_date': airflow.utils.dates.days_ago(1) } -# [END howto_operator_gce_args] +# [END howto_operator_gce_args_common] + +# [START howto_operator_gce_args_set_machine_type] +SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', 'n1-standard-1') +SET_MACHINE_TYPE_BODY = { + 'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, SHORT_MACHINE_TYPE_NAME) +} +# [END howto_operator_gce_args_set_machine_type] + with models.DAG( 'example_gcp_compute', @@ -59,7 +63,7 @@ with models.DAG( # [START howto_operator_gce_start] gce_instance_start = GceInstanceStartOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=INSTANCE, task_id='gcp_compute_start_task' ) @@ -67,14 +71,14 @@ with models.DAG( # Duplicate start for idempotence testing gce_instance_start2 = GceInstanceStartOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=INSTANCE, task_id='gcp_compute_start_task2' ) # [START howto_operator_gce_stop] gce_instance_stop = GceInstanceStopOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=INSTANCE, task_id='gcp_compute_stop_task' ) @@ -82,14 +86,14 @@ with models.DAG( # Duplicate stop for idempotence testing gce_instance_stop2 = GceInstanceStopOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=INSTANCE, task_id='gcp_compute_stop_task2' ) # [START howto_operator_gce_set_machine_type] gce_set_machine_type = GceSetMachineTypeOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=INSTANCE, body=SET_MACHINE_TYPE_BODY, task_id='gcp_compute_set_machine_type' @@ -98,7 +102,7 @@ with models.DAG( # Duplicate set machine type for idempotence testing gce_set_machine_type2 = GceSetMachineTypeOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=INSTANCE, body=SET_MACHINE_TYPE_BODY, task_id='gcp_compute_set_machine_type2' diff --git a/airflow/contrib/example_dags/example_gcp_compute_igm.py b/airflow/contrib/example_dags/example_gcp_compute_igm.py new file mode 100644 index 0000000000..dc24259f9f --- /dev/null +++ b/airflow/contrib/example_dags/example_gcp_compute_igm.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that uses IGM-type compute operations: +* copy of Instance Template +* update template in Instance Group Manager + +This DAG relies on the following OS environment variables + +* PROJECT_ID - the Google Cloud Platform project where the Compute Engine instance exists +* ZONE - the zone where the Compute Engine instance exists + +Variables for copy template operator: +* TEMPLATE_NAME - name of the template to copy +* NEW_TEMPLATE_NAME - name of the new template +* NEW_DESCRIPTION - description added to the template + +Variables for update template in Group Manager: + +* INSTANCE_GROUP_MANAGER_NAME - name of the Instance Group Manager +* SOURCE_TEMPLATE_URL - url of the template to replace in the Instance Group Manager +* DESTINATION_TEMPLATE_URL - url of the new template to set in the Instance Group Manager +""" + +import os +import datetime + +import airflow +from airflow import models +from airflow.contrib.operators.gcp_compute_operator import \ + GceInstanceTemplateCopyOperator, GceInstanceGroupManagerUpdateTemplateOperator + +# [START howto_operator_compute_igm_common_args] +PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project') +ZONE = os.environ.get('ZONE', 'europe-west1-b') + +default_args = { + 'start_date': airflow.utils.dates.days_ago(1) +} +# [END howto_operator_compute_igm_common_args] + +# [START howto_operator_compute_template_copy_args] +TEMPLATE_NAME = os.environ.get('TEMPLATE_NAME', 'instance-template-test') +NEW_TEMPLATE_NAME = os.environ.get('NEW_TEMPLATE_NAME', + 'instance-template-test-new') +NEW_DESCRIPTION = os.environ.get('NEW_DESCRIPTION', 'Test new description') +GCE_INSTANCE_TEMPLATE_BODY_UPDATE = { + "name": NEW_TEMPLATE_NAME, + "description": NEW_DESCRIPTION, + "properties": { + "machineType": "n1-standard-2" + } +} +# [END howto_operator_compute_template_copy_args] + +# [START howto_operator_compute_igm_update_template_args] +INSTANCE_GROUP_MANAGER_NAME = os.environ.get('INSTANCE_GROUP_MANAGER_NAME', + 'instance-group-test') + +SOURCE_TEMPLATE_URL = os.environ.get( + 'SOURCE_TEMPLATE_URL', + "https://www.googleapis.com/compute/beta/projects/" + "example-project/global/instanceTemplates/instance-template-test") + +DESTINATION_TEMPLATE_URL = os.environ.get( + 'DESTINATION_TEMPLATE_URL', + "https://www.googleapis.com/compute/beta/projects/" + "example-airflow/global/instanceTemplates/" + NEW_TEMPLATE_NAME) + +UPDATE_POLICY = { + "type": "OPPORTUNISTIC", + "minimalAction": "RESTART", + "maxSurge": { + "fixed": 1 + }, + "minReadySec": 1800 +} + +# [END howto_operator_compute_igm_update_template_args] + + +with models.DAG( + 'example_gcp_compute_igm', + default_args=default_args, + schedule_interval=datetime.timedelta(days=1) +) as dag: + # [START howto_operator_gce_igm_copy_template] + gce_instance_template_copy = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=TEMPLATE_NAME, + body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE, + task_id='gcp_compute_igm_copy_template_task' + ) + # [END howto_operator_gce_igm_copy_template] + # Added to check for idempotence + gce_instance_template_copy2 = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=TEMPLATE_NAME, + body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE, + task_id='gcp_compute_igm_copy_template_task_2' + ) + # [START howto_operator_gce_igm_update_template] + gce_instance_group_manager_update_template = \ + GceInstanceGroupManagerUpdateTemplateOperator( + project_id=PROJECT_ID, + resource_id=INSTANCE_GROUP_MANAGER_NAME, + zone=ZONE, + source_template=SOURCE_TEMPLATE_URL, + destination_template=DESTINATION_TEMPLATE_URL, + update_policy=UPDATE_POLICY, + task_id='gcp_compute_igm_group_manager_update_template' + ) + # [END howto_operator_gce_igm_update_template] + # Added to check for idempotence (and without UPDATE_POLICY) + gce_instance_group_manager_update_template2 = \ + GceInstanceGroupManagerUpdateTemplateOperator( + project_id=PROJECT_ID, + resource_id=INSTANCE_GROUP_MANAGER_NAME, + zone=ZONE, + source_template=SOURCE_TEMPLATE_URL, + destination_template=DESTINATION_TEMPLATE_URL, + task_id='gcp_compute_igm_group_manager_update_template_2' + ) + gce_instance_template_copy >> gce_instance_template_copy2 >> \ + gce_instance_group_manager_update_template >> \ + gce_instance_group_manager_update_template2 diff --git a/airflow/contrib/example_dags/example_gcp_function_delete.py b/airflow/contrib/example_dags/example_gcp_function_delete.py index 30f5369af6..d87eed39c5 100644 --- a/airflow/contrib/example_dags/example_gcp_function_delete.py +++ b/airflow/contrib/example_dags/example_gcp_function_delete.py @@ -33,9 +33,9 @@ from airflow import models from airflow.contrib.operators.gcp_function_operator import GcfFunctionDeleteOperator # [START howto_operator_gcf_delete_args] -PROJECT_ID = models.Variable.get('PROJECT_ID', '') -LOCATION = models.Variable.get('LOCATION', '') -ENTRYPOINT = models.Variable.get('ENTRYPOINT', '') +PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow') +LOCATION = models.Variable.get('LOCATION', 'europe-west1') +ENTRYPOINT = models.Variable.get('ENTRYPOINT', 'helloWorld') # A fully-qualified name of the function to delete FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION, diff --git a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py index a0e44957b9..606cc181b0 100644 --- a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py +++ b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py @@ -45,11 +45,14 @@ from airflow.contrib.operators.gcp_function_operator \ from airflow.utils import dates # [START howto_operator_gcf_deploy_variables] -PROJECT_ID = models.Variable.get('PROJECT_ID', '') -LOCATION = models.Variable.get('LOCATION', '') +PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow') +LOCATION = models.Variable.get('LOCATION', 'europe-west1') SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '') SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '') -SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '') +SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', + 'https://source.developers.google.com/' + 'projects/example-airflow/' + 'repos/hello-world/moveable-aliases/master') ZIP_PATH = models.Variable.get('ZIP_PATH', '') ENTRYPOINT = models.Variable.get('ENTRYPOINT', '') FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION, @@ -70,11 +73,7 @@ body = { # [START howto_operator_gcf_deploy_args] default_args = { - 'start_date': dates.days_ago(1), - 'project_id': PROJECT_ID, - 'location': LOCATION, - 'body': body, - 'validate_body': VALIDATE_BODY + 'start_date': dates.days_ago(1) } # [END howto_operator_gcf_deploy_args] @@ -103,11 +102,15 @@ with models.DAG( # [START howto_operator_gcf_deploy] deploy_task = GcfFunctionDeployOperator( task_id="gcf_deploy_task", - name=FUNCTION_NAME + name=FUNCTION_NAME, + project_id=PROJECT_ID, + location=LOCATION, + body=body, + validate_body=VALIDATE_BODY ) # [END howto_operator_gcf_deploy] delete_task = GcfFunctionDeleteOperator( task_id="gcf_delete_task", - name=FUNCTION_NAME + name=FUNCTION_NAME, ) deploy_task >> delete_task diff --git a/airflow/contrib/hooks/gcp_compute_hook.py b/airflow/contrib/hooks/gcp_compute_hook.py index 5fa088942b..617e39cb40 100644 --- a/airflow/contrib/hooks/gcp_compute_hook.py +++ b/airflow/contrib/hooks/gcp_compute_hook.py @@ -68,14 +68,14 @@ class GceHook(GoogleCloudBaseHook): """ Starts an existing instance defined by project_id, zone and resource_id. - :param project_id: Google Cloud Platform project where the Compute Engine - instance exists. + :param project_id: Google Cloud Platform project ID where the Compute Engine + Instance exists :type project_id: str - :param zone: Google Cloud Platform zone where the instance exists. + :param zone: Google Cloud Platform zone where the instance exists :type zone: str - :param resource_id: Name of the Compute Engine instance resource. + :param resource_id: Name of the Compute Engine instance resource :type resource_id: str - :return: True if the operation succeeded, raises an error otherwise + :return: True if the operation succeeded, raises an error otherwise. :rtype: bool """ response = self.get_conn().instances().start( @@ -83,21 +83,26 @@ class GceHook(GoogleCloudBaseHook): zone=zone, instance=resource_id ).execute(num_retries=NUM_RETRIES) - operation_name = response["name"] - return self._wait_for_operation_to_complete(project_id, zone, operation_name) + try: + operation_name = response["name"] + except KeyError: + raise AirflowException( + "Wrong response '{}' returned - it should contain " + "'name' field".format(response)) + return self._wait_for_operation_to_complete(project_id, operation_name, zone) def stop_instance(self, project_id, zone, resource_id): """ - Stops an instance defined by project_id, zone and resource_id. + Stops an instance defined by project_id, zone and resource_id - :param project_id: Google Cloud Platform project where the Compute Engine - instance exists. + :param project_id: Google Cloud Platform project ID where the Compute Engine + Instance exists :type project_id: str - :param zone: Google Cloud Platform zone where the instance exists. + :param zone: Google Cloud Platform zone where the instance exists :type zone: str - :param resource_id: Name of the Compute Engine instance resource. + :param resource_id: Name of the Compute Engine instance resource :type resource_id: str - :return: True if the operation succeeded, raises an error otherwise + :return: True if the operation succeeded, raises an error otherwise. :rtype: bool """ response = self.get_conn().instances().stop( @@ -105,50 +110,178 @@ class GceHook(GoogleCloudBaseHook): zone=zone, instance=resource_id ).execute(num_retries=NUM_RETRIES) - operation_name = response["name"] - return self._wait_for_operation_to_complete(project_id, zone, operation_name) + try: + operation_name = response["name"] + except KeyError: + raise AirflowException( + "Wrong response '{}' returned - it should contain " + "'name' field".format(response)) + return self._wait_for_operation_to_complete(project_id, operation_name, zone) def set_machine_type(self, project_id, zone, resource_id, body): """ Sets machine type of an instance defined by project_id, zone and resource_id. - :param project_id: Google Cloud Platform project where the Compute Engine - instance exists. + :param project_id: Google Cloud Platform project ID where the Compute Engine + Instance exists :type project_id: str :param zone: Google Cloud Platform zone where the instance exists. :type zone: str - :param resource_id: Name of the Compute Engine instance resource. + :param resource_id: Name of the Compute Engine instance resource :type resource_id: str :param body: Body required by the Compute Engine setMachineType API, - as described in - https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType + as described in + https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType :type body: dict - :return: True if the operation succeeded, raises an error otherwise + :return: True if the operation succeeded, raises an error otherwise. :rtype: bool """ response = self._execute_set_machine_type(project_id, zone, resource_id, body) - operation_name = response["name"] - return self._wait_for_operation_to_complete(project_id, zone, operation_name) + try: + operation_name = response["name"] + except KeyError: + raise AirflowException( + "Wrong response '{}' returned - it should contain " + "'name' field".format(response)) + return self._wait_for_operation_to_complete(project_id, operation_name, zone) def _execute_set_machine_type(self, project_id, zone, resource_id, body): return self.get_conn().instances().setMachineType( project=project_id, zone=zone, instance=resource_id, body=body)\ .execute(num_retries=NUM_RETRIES) - def _wait_for_operation_to_complete(self, project_id, zone, operation_name): + def get_instance_template(self, project_id, resource_id): """ - Waits for the named operation to complete - checks status of the - asynchronous call. + Retrieves instance template by project_id and resource_id. + + :param project_id: Google Cloud Platform project ID where the Compute Engine + Instance template exists + :type project_id: str + :param resource_id: Name of the instance template + :type resource_id: str + :return: Instance template representation as object according to + https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates + :rtype: dict + """ + response = self.get_conn().instanceTemplates().get( + project=project_id, + instanceTemplate=resource_id + ).execute(num_retries=NUM_RETRIES) + return response + + def insert_instance_template(self, project_id, body, request_id=None): + """ + Inserts instance template using body specified + + :param project_id: Google Cloud Platform project ID where the Compute Engine + Instance exists + :type project_id: str + :param body: Instance template representation as object according to + https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates + :type body: dict + :param request_id: Optional, unique request_id that you might add to achieve + full idempotence (for example when client call times out repeating the request + with the same request id will not create a new instance template again) + It should be in UUID format as defined in RFC 4122 + :type request_id: str + :return: True if the operation succeeded + :rtype: bool + """ + response = self.get_conn().instanceTemplates().insert( + project=project_id, + body=body, + requestId=request_id + ).execute(num_retries=NUM_RETRIES) + try: + operation_name = response["name"] + except KeyError: + raise AirflowException( + "Wrong response '{}' returned - it should contain " + "'name' field".format(response)) + return self._wait_for_operation_to_complete(project_id, operation_name) + + def get_instance_group_manager(self, project_id, zone, resource_id): + """ + Retrieves Instance Group Manager by project_id, zone and resource_id. + + :param project_id: Google Cloud Platform project ID where the Compute Engine + Instance Group Manager exists + :type project_id: str + :param zone: Google Cloud Platform zone where the Instance Group Manager exists + :type zone: str + :param resource_id: Name of the Instance Group Manager + :type resource_id: str + :return: Instance group manager representation as object according to + https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers + :rtype: dict + """ + response = self.get_conn().instanceGroupManagers().get( + project=project_id, + zone=zone, + instanceGroupManager=resource_id + ).execute(num_retries=NUM_RETRIES) + return response + + def patch_instance_group_manager(self, project_id, zone, resource_id, + body, request_id=None): + """ + Patches Instance Group Manager with the specified body. + + :param project_id: Google Cloud Platform project ID where the Compute Engine + Instance Group Manager exists + :type project_id: str + :param zone: Google Cloud Platform zone where the Instance Group Manager exists + :type zone: str + :param resource_id: Name of the Instance Group Manager + :type resource_id: str + :param body: Instance Group Manager representation as json-merge-patch object + according to + https://cloud.google.com/compute/docs/reference/rest/beta/instanceTemplates/patch + :type body: dict + :param request_id: Optional, unique request_id that you might add to achieve + full idempotence (for example when client call times out repeating the request + with the same request id will not create a new instance template again). + It should be in UUID format as defined in RFC 4122 + :type request_id: str + :return: True if the operation succeeded + :rtype: bool + """ + response = self.get_conn().instanceGroupManagers().patch( + project=project_id, + zone=zone, + instanceGroupManager=resource_id, + body=body, + requestId=request_id + ).execute(num_retries=NUM_RETRIES) + try: + operation_name = response["name"] + except KeyError: + raise AirflowException( + "Wrong response '{}' returned - it should contain " + "'name' field".format(response)) + return self._wait_for_operation_to_complete(project_id, operation_name, zone) + + def _wait_for_operation_to_complete(self, project_id, operation_name, zone=None): + """ + Waits for the named operation to complete - checks status of the async call. :param operation_name: name of the operation :type operation_name: str + :param zone: optional region of the request (might be None for global operations) + :type zone: str :return: True if the operation succeeded, raises an error otherwise :rtype: bool """ service = self.get_conn() while True: - operation_response = self._check_operation_status( - service, operation_name, project_id, zone) + if zone is None: + # noinspection PyTypeChecker + operation_response = self._check_global_operation_status( + service, operation_name, project_id) + else: + # noinspection PyTypeChecker + operation_response = self._check_zone_operation_status( + service, operation_name, project_id, zone) if operation_response.get("status") == GceOperationStatus.DONE: error = operation_response.get("error") if error: @@ -161,7 +294,14 @@ class GceHook(GoogleCloudBaseHook): return True time.sleep(TIME_TO_SLEEP_IN_SECONDS) - def _check_operation_status(self, service, operation_name, project_id, zone): + @staticmethod + def _check_zone_operation_status(service, operation_name, project_id, zone): return service.zoneOperations().get( project=project_id, zone=zone, operation=operation_name).execute( num_retries=NUM_RETRIES) + + @staticmethod + def _check_global_operation_status(service, operation_name, project_id): + return service.globalOperations().get( + project=project_id, operation=operation_name).execute( + num_retries=NUM_RETRIES) diff --git a/airflow/contrib/hooks/gcp_function_hook.py b/airflow/contrib/hooks/gcp_function_hook.py index d89b5b0ec8..29cef1716c 100644 --- a/airflow/contrib/hooks/gcp_function_hook.py +++ b/airflow/contrib/hooks/gcp_function_hook.py @@ -65,7 +65,7 @@ class GcfHook(GoogleCloudBaseHook): :param name: name of the function :type name: str - :return: a CloudFunction object representing the function + :return: a Cloud Functions object representing the function :rtype: dict """ return self.get_conn().projects().locations().functions().get( @@ -78,7 +78,7 @@ class GcfHook(GoogleCloudBaseHook): :param full_location: full location including the project in the form of of /projects//location/ :type full_location: str - :return: array of CloudFunction objects - representing functions in the location + :return: array of Cloud Functions objects - representing functions in the location :rtype: [dict] """ list_response = self.get_conn().projects().locations().functions().list( diff --git a/airflow/contrib/operators/gcp_compute_operator.py b/airflow/contrib/operators/gcp_compute_operator.py index a2fd545294..a872c17227 100644 --- a/airflow/contrib/operators/gcp_compute_operator.py +++ b/airflow/contrib/operators/gcp_compute_operator.py @@ -16,18 +16,24 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from copy import deepcopy + +from googleapiclient.errors import HttpError from airflow import AirflowException from airflow.contrib.hooks.gcp_compute_hook import GceHook +from airflow.contrib.utils.gcp_field_sanitizer import GcpBodyFieldSanitizer from airflow.contrib.utils.gcp_field_validator import GcpBodyFieldValidator from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults +from json_merge_patch import merge class GceBaseOperator(BaseOperator): """ Abstract base operator for Google Compute Engine operators to inherit from. """ + @apply_defaults def __init__(self, project_id, @@ -61,10 +67,10 @@ class GceBaseOperator(BaseOperator): class GceInstanceStartOperator(GceBaseOperator): """ - Start an instance in Google Compute Engine. + Starts an instance in Google Compute Engine. - :param project_id: Google Cloud Platform project where the Compute Engine - instance exists. + :param project_id: Google Cloud Platform Project ID where the Compute Engine + Instance exists. :type project_id: str :param zone: Google Cloud Platform zone where the instance exists. :type zone: str @@ -72,10 +78,12 @@ class GceInstanceStartOperator(GceBaseOperator): :type resource_id: str :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str - :param api_version: API version used (e.g. v1). + :param api_version: API version used (for example v1 or beta). :type api_version: str """ + # [START gce_instance_start_template_fields] template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version') + # [END gce_instance_start_template_fields] @apply_defaults def __init__(self, @@ -95,10 +103,10 @@ class GceInstanceStartOperator(GceBaseOperator): class GceInstanceStopOperator(GceBaseOperator): """ - Stop an instance in Google Compute Engine. + Stops an instance in Google Compute Engine. - :param project_id: Google Cloud Platform project where the Compute Engine - instance exists. + :param project_id: Google Cloud Platform Project ID where the Compute Engine + Instance exists. :type project_id: str :param zone: Google Cloud Platform zone where the instance exists. :type zone: str @@ -106,10 +114,12 @@ class GceInstanceStopOperator(GceBaseOperator): :type resource_id: str :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str - :param api_version: API version used (e.g. v1). + :param api_version: API version used (for example v1 or beta). :type api_version: str """ + # [START gce_instance_stop_template_fields] template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version') + # [END gce_instance_stop_template_fields] @apply_defaults def __init__(self, @@ -135,10 +145,10 @@ SET_MACHINE_TYPE_VALIDATION_SPECIFICATION = [ class GceSetMachineTypeOperator(GceBaseOperator): """ Changes the machine type for a stopped instance to the machine type specified in - the request. + the request. - :param project_id: Google Cloud Platform project where the Compute Engine - instance exists. + :param project_id: Google Cloud Platform Project ID where the Compute Engine + Instance exists. :type project_id: str :param zone: Google Cloud Platform zone where the instance exists. :type zone: str @@ -149,10 +159,14 @@ class GceSetMachineTypeOperator(GceBaseOperator): :type body: dict :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str - :param api_version: API version used (e.g. v1). + :param api_version: API version used (for example v1 or beta). :type api_version: str + :param validate_body: If set to False, body validation is not performed. + :type validate_body: bool """ + # [START gce_instance_set_machine_type_template_fields] template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version') + # [END gce_instance_set_machine_type_template_fields] @apply_defaults def __init__(self, @@ -181,3 +195,241 @@ class GceSetMachineTypeOperator(GceBaseOperator): self._validate_all_body_fields() return self._hook.set_machine_type(self.project_id, self.zone, self.resource_id, self.body) + + +GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION = [ + dict(name="name", regexp="^.+$"), + dict(name="description", optional=True), + dict(name="properties", type='dict', optional=True, fields=[ + dict(name="description", optional=True), + dict(name="tags", optional=True, fields=[ + dict(name="items", optional=True) + ]), + dict(name="machineType", optional=True), + dict(name="canIpForward", optional=True), + dict(name="networkInterfaces", optional=True), # not validating deeper + dict(name="disks", optional=True), # not validating the array deeper + dict(name="metadata", optional=True, fields=[ + dict(name="fingerprint", optional=True), + dict(name="items", optional=True), + dict(name="kind", optional=True), + ]), + dict(name="serviceAccounts", optional=True), # not validating deeper + dict(name="scheduling", optional=True, fields=[ + dict(name="onHostMaintenance", optional=True), + dict(name="automaticRestart", optional=True), + dict(name="preemptible", optional=True), + dict(name="nodeAffinitites", optional=True), # not validating deeper + ]), + dict(name="labels", optional=True), + dict(name="guestAccelerators", optional=True), # not validating deeper + dict(name="minCpuPlatform", optional=True), + ]), +] + +GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE = [ + "kind", + "id", + "name", + "creationTimestamp", + "properties.disks.sha256", + "properties.disks.kind", + "properties.disks.sourceImageEncryptionKey.sha256", + "properties.disks.index", + "properties.disks.licenses", + "properties.networkInterfaces.kind", + "properties.networkInterfaces.accessConfigs.kind", + "properties.networkInterfaces.name", + "properties.metadata.kind", + "selfLink" +] + + +class GceInstanceTemplateCopyOperator(GceBaseOperator): + """ + Copies the instance template, applying specified changes. + + :param project_id: Google Cloud Platform Project ID where the Compute Engine + instance exists. + :type project_id: str + :param resource_id: Name of the Instance Template + :type resource_id: str + :param body_patch: Patch to the body of instanceTemplates object following rfc7386 + PATCH semantics. The body_patch content follows + https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates + Name field is required as we need to rename the template, + all the other fields are optional. It is important to follow PATCH semantics + - arrays are replaced fully, so if you need to update an array you should + provide the whole target array as patch element. + :type body_patch: dict + :param request_id: Optional, unique request_id that you might add to achieve + full idempotence (for example when client call times out repeating the request + with the same request id will not create a new instance template again). + It should be in UUID format as defined in RFC 4122. + :type request_id: str + :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + :param api_version: API version used (for example v1 or beta). + :type api_version: str + :param validate_body: If set to False, body validation is not performed. + :type validate_body: bool + """ + # [START gce_instance_template_copy_operator_template_fields] + template_fields = ('project_id', 'resource_id', 'request_id', + 'gcp_conn_id', 'api_version') + # [END gce_instance_template_copy_operator_template_fields] + + @apply_defaults + def __init__(self, + project_id, + resource_id, + body_patch, + request_id=None, + gcp_conn_id='google_cloud_default', + api_version='v1', + validate_body=True, + *args, **kwargs): + self.body_patch = body_patch + self.request_id = request_id + self._field_validator = None + if 'name' not in self.body_patch: + raise AirflowException("The body '{}' should contain at least " + "name for the new operator in the 'name' field". + format(body_patch)) + if validate_body: + self._field_validator = GcpBodyFieldValidator( + GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION, api_version=api_version) + self._field_sanitizer = GcpBodyFieldSanitizer( + GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE) + super(GceInstanceTemplateCopyOperator, self).__init__( + project_id=project_id, zone='global', resource_id=resource_id, + gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs) + + def _validate_all_body_fields(self): + if self._field_validator: + self._field_validator.validate(self.body_patch) + + def execute(self, context): + self._validate_all_body_fields() + try: + # Idempotence check (sort of) - we want to check if the new template + # is already created and if is, then we assume it was created by previous run + # of CopyTemplate operator - we do not check if content of the template + # is as expected. Templates are immutable so we cannot update it anyway + # and deleting/recreating is not worth the hassle especially + # that we cannot delete template if it is already used in some Instance + # Group Manager. We assume success if the template is simply present + existing_template = self._hook.get_instance_template( + project_id=self.project_id, resource_id=self.body_patch['name']) + self.log.info("The {} template already existed. It was likely " + "created by previous run of the operator. Assuming success.") + return existing_template + except HttpError as e: + # We actually expect to get 404 / Not Found here as the template should + # not yet exist + if not e.resp.status == 404: + raise e + old_body = self._hook.get_instance_template(project_id=self.project_id, + resource_id=self.resource_id) + new_body = deepcopy(old_body) + self._field_sanitizer.sanitize(new_body) + new_body = merge(new_body, self.body_patch) + self.log.info("Calling insert instance template with updated body: {}". + format(new_body)) + self._hook.insert_instance_template(project_id=self.project_id, + body=new_body, + request_id=self.request_id) + return self._hook.get_instance_template(project_id=self.project_id, + resource_id=self.body_patch['name']) + + +class GceInstanceGroupManagerUpdateTemplateOperator(GceBaseOperator): + """ + Patches the Instance Group Manager, replacing source template URL with the + destination one. API V1 does not have update/patch operations for Instance + Group Manager, so you must use beta or newer API version. Beta is the default. + + :param project_id: Google Cloud Platform Project ID where the Compute Engine + Instance exists. + :type project_id: str + :param resource_id: Name of the Instance Group Manager + :type resource_id: str + :param zone: Google Cloud Platform zone where the Instance Group Manager exists. + :type zone: str + :param request_id: Optional, unique request_id that you might add to achieve + full idempotence (for example when client call times out repeating the request + with the same request id will not create a new instance template again). + It should be in UUID format as defined in RFC 4122 + :type request_id: str + :param update_policy: The update policy for this managed instance group. See + https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/patch + for details of the updatePolicy fields. It's an optional field. + :type dict + :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + :param api_version: API version used (for example beta). + :type api_version: str + """ + # [START gce_igm_update_template_operator_template_fields] + template_fields = ('project_id', 'resource_id', 'zone', 'request_id', + 'source_template', 'destination_template', + 'gcp_conn_id', 'api_version') + # [END gce_igm_update_template_operator_template_fields] + + @apply_defaults + def __init__(self, + project_id, + resource_id, + zone, + source_template, + destination_template, + update_policy=None, + request_id=None, + gcp_conn_id='google_cloud_default', + api_version='beta', + *args, **kwargs): + self.zone = zone + self.source_template = source_template + self.destination_template = destination_template + self.request_id = request_id + self.update_policy = update_policy + self._change_performed = False + if api_version == 'v1': + raise AirflowException("Api version v1 does not have update/patch " + "operations for Instance Group Managers. Use beta" + " api version or above") + super(GceInstanceGroupManagerUpdateTemplateOperator, self).__init__( + project_id=project_id, zone=self.zone, resource_id=resource_id, + gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs) + + def _possibly_replace_template(self, dictionary): + # type: (dict) -> None + if dictionary.get('instanceTemplate') == self.source_template: + dictionary['instanceTemplate'] = self.destination_template + self._change_performed = True + + def execute(self, context): + old_instance_group_manager = self._hook.get_instance_group_manager( + project_id=self.project_id, + zone=self.zone, + resource_id=self.resource_id) + patch_body = {} + if 'versions' in old_instance_group_manager: + patch_body['versions'] = old_instance_group_manager['versions'] + if 'instanceTemplate' in old_instance_group_manager: + patch_body['instanceTemplate'] = old_instance_group_manager['instanceTemplate'] + if self.update_policy: + patch_body['updatePolicy'] = self.update_policy + self._possibly_replace_template(patch_body) + if 'versions' in patch_body: + for version in patch_body['versions']: + self._possibly_replace_template(version) + if self._change_performed or self.update_policy: + self.log.info("Calling patch instance template with updated body: {}". + format(patch_body)) + return self._hook.patch_instance_group_manager( + project_id=self.project_id, zone=self.zone, resource_id=self.resource_id, + body=patch_body, request_id=self.request_id) + else: + # Idempotence achieved + return True diff --git a/airflow/contrib/operators/gcp_function_operator.py b/airflow/contrib/operators/gcp_function_operator.py index c0013aaea9..7f7da1d3ec 100644 --- a/airflow/contrib/operators/gcp_function_operator.py +++ b/airflow/contrib/operators/gcp_function_operator.py @@ -93,7 +93,7 @@ class GcfFunctionDeployOperator(BaseOperator): . Different API versions require different variants of the Cloud Functions dictionary. :type body: dict or google.cloud.functions.v1.CloudFunction - :param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform. + :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str :param api_version: API version used (for example v1 or v1beta1). :type api_version: str @@ -105,6 +105,9 @@ class GcfFunctionDeployOperator(BaseOperator): :param validate_body: If set to False, body validation is not performed. :type validate_body: bool """ + # [START gce_function_deploy_template_operator_template_fields] + template_fields = ('project_id', 'location', 'gcp_conn_id', 'api_version') + # [END gce_function_deploy_template_operator_template_fields] @apply_defaults def __init__(self, @@ -276,6 +279,9 @@ class GcfFunctionDeleteOperator(BaseOperator): :param api_version: API version used (for example v1 or v1beta1). :type api_version: str """ + # [START gce_function_delete_template_operator_template_fields] + template_fields = ('name', 'gcp_conn_id', 'api_version') + # [END gce_function_delete_template_operator_template_fields] @apply_defaults def __init__(self, diff --git a/airflow/contrib/utils/gcp_field_sanitizer.py b/airflow/contrib/utils/gcp_field_sanitizer.py new file mode 100644 index 0000000000..c0a8985281 --- /dev/null +++ b/airflow/contrib/utils/gcp_field_sanitizer.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Sanitizer for body fields sent via GCP API. + +The sanitizer removes fields specified from the body. + +Context +------- +In some cases where GCP operation requires modification of existing resources (such +as instances or instance templates) we need to sanitize body of the resources returned +via GCP APIs. This is in the case when we retrieve information from GCP first, +modify the body and either update the existing resource or create a new one with the +modified body. Usually when you retrieve resource from GCP you get some extra fields which +are Output-only, and we need to delete those fields if we want to use +the body as input for subsequent create/insert type operation. + + +Field specification +------------------- + +Specification of fields is an array of strings which denote names of fields to be removed. +The field can be either direct field name to remove from the body or the full +specification of the path you should delete - separated with '.' + + +>>> FIELDS_TO_SANITIZE = [ +>>> "kind", +>>> "properties.disks.kind", +>>> "properties.metadata.kind", +>>>] +>>> body = { +>>> "kind": "compute#instanceTemplate", +>>> "name": "instance", +>>> "properties": { +>>> "disks": [ +>>> { +>>> "name": "a", +>>> "kind": "compute#attachedDisk", +>>> "type": "PERSISTENT", +>>> "mode": "READ_WRITE", +>>> }, +>>> { +>>> "name": "b", +>>> "kind": "compute#attachedDisk", +>>> "type": "PERSISTENT", +>>> "mode": "READ_WRITE", +>>> } +>>> ], +>>> "metadata": { +>>> "kind": "compute#metadata", +>>> "fingerprint": "GDPUYxlwHe4=" +>>> }, +>>> } +>>> } +>>> sanitizer=GcpBodyFieldSanitizer(FIELDS_TO_SANITIZE) +>>> SANITIZED_BODY = sanitizer.sanitize(body) +>>> json.dumps(SANITIZED_BODY, indent=2) +{ + "name": "instance", + "properties": { + "disks": [ + { + "name": "a", + "type": "PERSISTENT", + "mode": "READ_WRITE", + }, + { + "name": "b", + "type": "PERSISTENT", + "mode": "READ_WRITE", + } + ], + "metadata": { + "fingerprint": "GDPUYxlwHe4=" + }, + } +} + +Note that the components of the path can be either dictionaries or arrays of dictionaries. +In case they are dictionaries, subsequent component names key of the field, in case of +arrays - the sanitizer iterates through all dictionaries in the array and searches +components in all elements of the array. +""" + +from airflow import LoggingMixin, AirflowException + + +class GcpFieldSanitizerException(AirflowException): + """Thrown when sanitizer finds unexpected field type in the path + (other than dict or array). + """ + + def __init__(self, message): + super(GcpFieldSanitizerException, self).__init__(message) + + +class GcpBodyFieldSanitizer(LoggingMixin): + """Sanitizes the body according to specification. + + :param sanitize_specs: array of strings that specifies which fields to remove + :type sanitize_specs: [string] + + """ + def __init__(self, sanitize_specs): + # type: ([str]) -> None + super(GcpBodyFieldSanitizer, self).__init__() + self._sanitize_specs = sanitize_specs + + def _sanitize(self, dictionary, remaining_field_spec, current_path): + field_split = remaining_field_spec.split(".", 1) + if len(field_split) == 1: + field_name = field_split[0] + if field_name in dictionary: + self.log.info("Deleted {} [{}]".format(field_name, current_path)) + del dictionary[field_name] + else: + self.log.debug("The field {} is missing in {} at the path {}.". + format(field_name, dictionary, current_path)) + else: + field_name = field_split[0] + remaining_path = field_split[1] + child = dictionary.get(field_name) + if child is None: + self.log.debug("The field {} is missing in {} at the path {}. ". + format(field_name, dictionary, current_path)) + elif isinstance(child, dict): + self._sanitize(child, remaining_path, "{}.{}".format( + current_path, field_name)) + elif isinstance(child, list): + for index, elem in enumerate(child): + if not isinstance(elem, dict): + self.log.warn( + "The field {} element at index {} is of wrong type. " + "It should be dict and is {}. Skipping it.". + format(current_path, index, elem)) + self._sanitize(elem, remaining_path, "{}.{}[{}]".format( + current_path, field_name, index)) + else: + self.log.warn( + "The field {} is of wrong type. " + "It should be dict or list and it is {}. Skipping it.". + format(current_path, child)) + + def sanitize(self, body): + for elem in self._sanitize_specs: + self._sanitize(body, elem, "") diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst index 6333e32dd7..806eaa99d9 100644 --- a/docs/howto/operator.rst +++ b/docs/howto/operator.rst @@ -86,8 +86,8 @@ template variables ` and a ``templates_dict`` argument. The ``templates_dict`` argument is templated, so each value in the dictionary is evaluated as a :ref:`Jinja template `. -Google Cloud Platform Operators -------------------------------- +Google Cloud Storage Operators +------------------------------ GoogleCloudStorageToBigQueryOperator ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -102,22 +102,31 @@ to execute a BigQuery load job. :start-after: [START howto_operator_gcs_to_bq] :end-before: [END howto_operator_gcs_to_bq] + +Google Compute Engine Operators +------------------------------- + GceInstanceStartOperator ^^^^^^^^^^^^^^^^^^^^^^^^ -Allows to start an existing Google Compute Engine instance. +Use the +:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator` +to start an existing Google Compute Engine instance. -In this example parameter values are extracted from Airflow variables. -Moreover, the ``default_args`` dict is used to pass common arguments to all operators in a single DAG. + +Arguments +""""""""" + +The following examples of OS environment variables show how you can build function name +to use in the operator and build default args to pass them to multiple tasks: .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py :language: python - :start-after: [START howto_operator_gce_args] - :end-before: [END howto_operator_gce_args] + :start-after: [START howto_operator_gce_args_common] + :end-before: [END howto_operator_gce_args_common] - -Define the :class:`~airflow.contrib.operators.gcp_compute_operator -.GceInstanceStartOperator` by passing the required arguments to the constructor. +Using the operator +"""""""""""""""""" .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py :language: python @@ -125,15 +134,42 @@ Define the :class:`~airflow.contrib.operators.gcp_compute_operator :start-after: [START howto_operator_gce_start] :end-before: [END howto_operator_gce_start] +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py + :language: python + :dedent: 4 + :start-after: [START gce_instance_start_template_fields] + :end-before: [END gce_instance_start_template_fields] + +More information +"""""""""""""""" + +See `Google Compute Engine API documentation `_ + + GceInstanceStopOperator ^^^^^^^^^^^^^^^^^^^^^^^ -Allows to stop an existing Google Compute Engine instance. +Use the operator to stop Google Compute Engine instance. -For parameter definition take a look at :class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator` above. +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStopOperator` -Define the :class:`~airflow.contrib.operators.gcp_compute_operator -.GceInstanceStopOperator` by passing the required arguments to the constructor. +Arguments +""""""""" + +The following examples of OS environment variables show how you can build function name +to use in the operator and build default args to pass them to multiple tasks: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py + :language: python + :start-after: [START howto_operator_gce_args_common] + :end-before: [END howto_operator_gce_args_common] + +Using the operator +"""""""""""""""""" .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py :language: python @@ -141,15 +177,48 @@ Define the :class:`~airflow.contrib.operators.gcp_compute_operator :start-after: [START howto_operator_gce_stop] :end-before: [END howto_operator_gce_stop] +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py + :language: python + :dedent: 4 + :start-after: [START gce_instance_stop_template_fields] + :end-before: [END gce_instance_stop_template_fields] + +More information +"""""""""""""""" + +See `Google Compute Engine API documentation `_ + + GceSetMachineTypeOperator ^^^^^^^^^^^^^^^^^^^^^^^^^ -Allows to change the machine type for a stopped instance to the specified machine type. +Use the operator to change machine type of a Google Compute Engine instance. -For parameter definition take a look at :class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator` above. +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator` -Define the :class:`~airflow.contrib.operators.gcp_compute_operator -.GceSetMachineTypeOperator` by passing the required arguments to the constructor. +Arguments +""""""""" + +The following examples of OS environment variables show how you can build function name +to use in the operator and build default args to pass them to multiple tasks: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py + :language: python + :start-after: [START howto_operator_gce_args_common] + :end-before: [END howto_operator_gce_args_common] + + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py + :language: python + :start-after: [START howto_operator_gce_args_set_machine_type] + :end-before: [END howto_operator_gce_args_set_machine_type] + +Using the operator +"""""""""""""""""" .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py :language: python @@ -157,26 +226,163 @@ Define the :class:`~airflow.contrib.operators.gcp_compute_operator :start-after: [START howto_operator_gce_set_machine_type] :end-before: [END howto_operator_gce_set_machine_type] +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py + :language: python + :dedent: 4 + :start-after: [START gce_instance_set_machine_type_template_fields] + :end-before: [END gce_instance_set_machine_type_template_fields] + +More information +"""""""""""""""" + +See `Google Compute Engine API documentation `_ + + +GceInstanceTemplateCopyOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the operator to copy an existing Google Compute Engine instance template +applying a patch to it. + +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceTemplateCopyOperator`. + +Arguments +""""""""" + +The following examples of OS environment variables show how you can build parameters +passed to the operator and build default args to pass them to multiple tasks: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py + :language: python + :start-after: [START howto_operator_compute_igm_common_args] + :end-before: [END howto_operator_compute_igm_common_args] + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py + :language: python + :start-after: [START howto_operator_compute_template_copy_args] + :end-before: [END howto_operator_compute_template_copy_args] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gce_igm_copy_template] + :end-before: [END howto_operator_gce_igm_copy_template] + +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py + :language: python + :dedent: 4 + :start-after: [START gce_instance_template_copy_operator_template_fields] + :end-before: [END gce_instance_template_copy_operator_template_fields] + +More information +"""""""""""""""" + +See `Google Compute Engine API documentation `_ + +GceInstanceGroupManagerUpdateTemplateOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the operator to update template in Google Compute Engine Instance Group Manager. + +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceGroupManagerUpdateTemplateOperator`. + +Arguments +""""""""" + +The following examples of OS environment variables show how you can build parameters +passed to the operator and build default args to pass them to multiple tasks: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py + :language: python + :start-after: [START howto_operator_compute_igm_common_args] + :end-before: [END howto_operator_compute_igm_common_args] + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py + :language: python + :start-after: [START howto_operator_compute_igm_update_template_args] + :end-before: [END howto_operator_compute_igm_update_template_args] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gce_igm_update_template] + :end-before: [END howto_operator_gce_igm_update_template] + +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py + :language: python + :dedent: 4 + :start-after: [START gce_igm_update_template_operator_template_fields] + :end-before: [END gce_igm_update_template_operator_template_fields] + +Troubleshooting +""""""""""""""" + +You might find that your GceInstanceGroupManagerUpdateTemplateOperator fails with +missing permissions. The service account has to have Service Account User role assigned +via IAM permissions in order to execute the operation. + +More information +"""""""""""""""" + +See `Google Compute Engine API documentation `_ + +Google Cloud Functions Operators +-------------------------------- GcfFunctionDeleteOperator ^^^^^^^^^^^^^^^^^^^^^^^^^ -Use the ``default_args`` dict to pass arguments to the operator. +Use the operator to delete a function from Google Cloud Functions. + +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator`. + +Arguments +""""""""" + +The following examples of OS environment variables show how you can build function name +to use in the operator and build default args to pass them to multiple tasks: .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py :language: python :start-after: [START howto_operator_gcf_delete_args] :end-before: [END howto_operator_gcf_delete_args] - -Use the :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator` -to delete a function from Google Cloud Functions. +Using the operator +"""""""""""""""""" .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py :language: python + :dedent: 4 :start-after: [START howto_operator_gcf_delete] :end-before: [END howto_operator_gcf_delete] +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_function_operator.py + :language: python + :dedent: 4 + :start-after: [START gce_function_delete_template_operator_template_fields] + :end-before: [END gce_function_delete_template_operator_template_fields] + Troubleshooting """"""""""""""" If you want to run or deploy an operator using a service account and get “forbidden 403” @@ -191,7 +397,6 @@ The typical way of assigning Cloud IAM permissions with `gcloud` is shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project and SERVICE_ACCOUNT_EMAIL with the email ID of your service account. - .. code-block:: bash gcloud iam service-accounts add-iam-policy-binding \ @@ -202,13 +407,24 @@ and SERVICE_ACCOUNT_EMAIL with the email ID of your service account. See `Adding the IAM service agent user role to the runtime service `_ for details +More information +"""""""""""""""" + +See `Google Cloud Functions API documentation `_ + GcfFunctionDeployOperator ^^^^^^^^^^^^^^^^^^^^^^^^^ -Use the :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator` -to deploy a function from Google Cloud Functions. +Use the operator to deploy a function to Google Cloud Functions. -The following examples of Airflow variables show various variants and combinations +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator`. + + +Arguments +""""""""" + +The following examples of OS environment variables show various variants and combinations of default_args that you can use. The variables are defined as follows: .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py @@ -223,11 +439,12 @@ With those variables you can define the body of the request: :start-after: [START howto_operator_gcf_deploy_body] :end-before: [END howto_operator_gcf_deploy_body] -When you create a DAG, the default_args dictionary can be used to pass the body and -other arguments: +When you create a DAG, the default_args dictionary can be used to pass +arguments common with other tasks: .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py :language: python + :dedent: 4 :start-after: [START howto_operator_gcf_deploy_args] :end-before: [END howto_operator_gcf_deploy_args] @@ -235,11 +452,14 @@ Note that the neither the body nor the default args are complete in the above ex Depending on the set variables, there might be different variants on how to pass source code related fields. Currently, you can pass either sourceArchiveUrl, sourceRepository or sourceUploadUrl as described in the -`CloudFunction API specification `_. +`Cloud Functions API specification `_. Additionally, default_args might contain zip_path parameter to run the extra step of uploading the source code before deploying it. In the last case, you also need to provide an empty `sourceUploadUrl` parameter in the body. +Using the operator +"""""""""""""""""" + Based on the variables defined above, example logic of setting the source code related fields is shown here: @@ -252,9 +472,20 @@ The code to create the operator: .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py :language: python + :dedent: 4 :start-after: [START howto_operator_gcf_deploy] :end-before: [END howto_operator_gcf_deploy] +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcp_function_operator.py + :language: python + :dedent: 4 + :start-after: [START gce_function_deploy_template_operator_template_fields] + :end-before: [END gce_function_deploy_template_operator_template_fields] + + Troubleshooting """"""""""""""" @@ -277,13 +508,20 @@ and SERVICE_ACCOUNT_EMAIL with the email ID of your service account. --member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \ --role="roles/iam.serviceAccountUser" - See `Adding the IAM service agent user role to the runtime service `_ for details If the source code for your function is in Google Source Repository, make sure that your service account has the Source Repository Viewer role so that the source code can be downloaded if necessary. +More information +"""""""""""""""" + +See `Google Cloud Functions API documentation `_ + +Google Cloud Sql Operators +-------------------------- + CloudSqlInstanceDatabaseCreateOperator ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/integration.rst b/docs/integration.rst index a0445409e7..46ea436bd7 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -100,28 +100,28 @@ field (see connection `wasb_default` for an example). .. _WasbBlobSensor: WasbBlobSensor -""""""""""""""" +"""""""""""""" .. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor .. _WasbPrefixSensor: WasbPrefixSensor -""""""""""""""""" +"""""""""""""""" .. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor .. _FileToWasbOperator: FileToWasbOperator -""""""""""""""""""" +"""""""""""""""""" .. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator .. _WasbHook: WasbHook -""""""""" +"""""""" .. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook @@ -582,6 +582,16 @@ Compute Engine Operators - :ref:`GceInstanceStartOperator` : start an existing Google Compute Engine instance. - :ref:`GceInstanceStopOperator` : stop an existing Google Compute Engine instance. - :ref:`GceSetMachineTypeOperator` : change the machine type for a stopped instance. +- :ref:`GceInstanceTemplateCopyOperator` : copy the Instance Template, applying + specified changes. +- :ref:`GceInstanceGroupManagerUpdateTemplateOperator` : patch the Instance Group Manager, + replacing source Instance Template URL with the destination one. + +The operators have common base operator: + +.. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceBaseOperator + +They also use :ref:`GceHook` hook to communicate with Google Cloud Platform. .. _GceInstanceStartOperator: @@ -604,6 +614,28 @@ GceSetMachineTypeOperator .. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator +.. _GceInstanceTemplateCopyOperator: + +GceInstanceTemplateCopyOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceInstanceTemplateCopyOperator + +.. _GceInstanceGroupManagerUpdateTemplateOperator: + +GceInstanceGroupManagerUpdateTemplateOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceInstanceGroupManagerUpdateTemplateOperator + +.. _GceHook: + +Compute Engine Hook +""""""""""""""""""" + +.. autoclass:: airflow.contrib.hooks.gcp_compute_hook.GceHook +:members: + Cloud Functions ''''''''''''''' @@ -616,6 +648,8 @@ Cloud Functions Operators .. autoclass:: airflow.contrib.operators.gcp_operator.GCP +They also use :ref:`GcfHook` hook to communicate with Google Cloud Platform. + .. _GcfFunctionDeployOperator: GcfFunctionDeployOperator @@ -632,6 +666,8 @@ GcfFunctionDeleteOperator .. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator +.. _GcfHook: + Cloud Functions Hook """""""""""""""""""" @@ -741,7 +777,7 @@ DataprocClusterCreateOperator .. _DataprocClusterScaleOperator: DataprocClusterScaleOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator diff --git a/setup.py b/setup.py index 172b2aac70..7aeb5b59c9 100644 --- a/setup.py +++ b/setup.py @@ -309,6 +309,7 @@ def do_setup(): 'gitpython>=2.0.2', 'gunicorn>=19.4.0, <20.0', 'iso8601>=0.1.12', + 'json-merge-patch==0.2', 'jinja2>=2.7.3, <2.9.0', 'lxml>=4.0.0', 'markdown>=2.5.2, <3.0', diff --git a/tests/contrib/operators/test_gcp_compute_operator.py b/tests/contrib/operators/test_gcp_compute_operator.py index 449c4e015f..e8f9bf0165 100644 --- a/tests/contrib/operators/test_gcp_compute_operator.py +++ b/tests/contrib/operators/test_gcp_compute_operator.py @@ -18,10 +18,15 @@ # under the License. import ast import unittest +from copy import deepcopy + +import httplib2 +from googleapiclient.errors import HttpError from airflow import AirflowException, configuration from airflow.contrib.operators.gcp_compute_operator import GceInstanceStartOperator, \ - GceInstanceStopOperator, GceSetMachineTypeOperator + GceInstanceStopOperator, GceSetMachineTypeOperator, GceInstanceTemplateCopyOperator, \ + GceInstanceGroupManagerUpdateTemplateOperator from airflow.models import TaskInstance, DAG from airflow.utils import timezone @@ -34,12 +39,14 @@ except ImportError: except ImportError: mock = None +EMPTY_CONTENT = ''.encode('utf8') + PROJECT_ID = 'project-id' -LOCATION = 'zone' +ZONE = 'zone' RESOURCE_ID = 'resource-id' SHORT_MACHINE_TYPE_NAME = 'n1-machine-type' SET_MACHINE_TYPE_BODY = { - 'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, SHORT_MACHINE_TYPE_NAME) + 'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, SHORT_MACHINE_TYPE_NAME) } DEFAULT_DATE = timezone.datetime(2017, 1, 1) @@ -51,7 +58,7 @@ class GceInstanceStartTest(unittest.TestCase): mock_hook.return_value.start_instance.return_value = True op = GceInstanceStartOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=RESOURCE_ID, task_id='id' ) @@ -59,11 +66,11 @@ class GceInstanceStartTest(unittest.TestCase): mock_hook.assert_called_once_with(api_version='v1', gcp_conn_id='google_cloud_default') mock_hook.return_value.start_instance.assert_called_once_with( - PROJECT_ID, LOCATION, RESOURCE_ID + PROJECT_ID, ZONE, RESOURCE_ID ) self.assertTrue(result) - # Setting all of the operator's input parameters as templated dag_ids + # Setting all of the operator's input parameters as template dag_ids # (could be anything else) just to test if the templating works for all fields @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') def test_instance_start_with_templates(self, mock_hook): @@ -95,7 +102,7 @@ class GceInstanceStartTest(unittest.TestCase): with self.assertRaises(AirflowException) as cm: op = GceInstanceStartOperator( project_id="", - zone=LOCATION, + zone=ZONE, resource_id=RESOURCE_ID, task_id='id' ) @@ -123,7 +130,7 @@ class GceInstanceStartTest(unittest.TestCase): with self.assertRaises(AirflowException) as cm: op = GceInstanceStartOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id="", task_id='id' ) @@ -132,12 +139,14 @@ class GceInstanceStartTest(unittest.TestCase): self.assertIn("The required parameter 'resource_id' is missing", str(err)) mock_hook.assert_not_called() + +class GceInstanceStopTest(unittest.TestCase): @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') def test_instance_stop(self, mock_hook): mock_hook.return_value.stop_instance.return_value = True op = GceInstanceStopOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=RESOURCE_ID, task_id='id' ) @@ -145,7 +154,7 @@ class GceInstanceStartTest(unittest.TestCase): mock_hook.assert_called_once_with(api_version='v1', gcp_conn_id='google_cloud_default') mock_hook.return_value.stop_instance.assert_called_once_with( - PROJECT_ID, LOCATION, RESOURCE_ID + PROJECT_ID, ZONE, RESOURCE_ID ) self.assertTrue(result) @@ -181,7 +190,7 @@ class GceInstanceStartTest(unittest.TestCase): with self.assertRaises(AirflowException) as cm: op = GceInstanceStopOperator( project_id="", - zone=LOCATION, + zone=ZONE, resource_id=RESOURCE_ID, task_id='id' ) @@ -209,7 +218,7 @@ class GceInstanceStartTest(unittest.TestCase): with self.assertRaises(AirflowException) as cm: op = GceInstanceStopOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id="", task_id='id' ) @@ -218,12 +227,14 @@ class GceInstanceStartTest(unittest.TestCase): self.assertIn("The required parameter 'resource_id' is missing", str(err)) mock_hook.assert_not_called() + +class GceInstanceSetMachineTypeTest(unittest.TestCase): @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') def test_set_machine_type(self, mock_hook): mock_hook.return_value.set_machine_type.return_value = True op = GceSetMachineTypeOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=RESOURCE_ID, body=SET_MACHINE_TYPE_BODY, task_id='id' @@ -232,7 +243,7 @@ class GceInstanceStartTest(unittest.TestCase): mock_hook.assert_called_once_with(api_version='v1', gcp_conn_id='google_cloud_default') mock_hook.return_value.set_machine_type.assert_called_once_with( - PROJECT_ID, LOCATION, RESOURCE_ID, SET_MACHINE_TYPE_BODY + PROJECT_ID, ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY ) self.assertTrue(result) @@ -269,7 +280,7 @@ class GceInstanceStartTest(unittest.TestCase): with self.assertRaises(AirflowException) as cm: op = GceSetMachineTypeOperator( project_id="", - zone=LOCATION, + zone=ZONE, resource_id=RESOURCE_ID, body=SET_MACHINE_TYPE_BODY, task_id='id' @@ -299,7 +310,7 @@ class GceInstanceStartTest(unittest.TestCase): with self.assertRaises(AirflowException) as cm: op = GceSetMachineTypeOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id="", body=SET_MACHINE_TYPE_BODY, task_id='id' @@ -314,7 +325,7 @@ class GceInstanceStartTest(unittest.TestCase): with self.assertRaises(AirflowException) as cm: op = GceSetMachineTypeOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=RESOURCE_ID, body={}, task_id='id' @@ -332,10 +343,10 @@ class GceInstanceStartTest(unittest.TestCase): "'zone': 'https://www.googleapis.com/compute/v1/projects/polidea" \ "-airflow/zones/europe-west3-b', 'operationType': " \ "'setMachineType', 'targetLink': " \ - "'https://www.googleapis.com/compute/v1/projects/polidea-airflow" \ + "'https://www.googleapis.com/compute/v1/projects/example-airflow" \ "/zones/europe-west3-b/instances/pa-1', 'targetId': " \ "'2480086944131075860', 'status': 'DONE', 'user': " \ - "'uberdarek@polidea-airflow.iam.gserviceaccount.com', " \ + "'uberdarek@example-airflow.iam.gserviceaccount.com', " \ "'progress': 100, 'insertTime': '2018-10-03T07:50:07.951-07:00', "\ "'startTime': '2018-10-03T07:50:08.324-07:00', 'endTime': " \ "'2018-10-03T07:50:08.484-07:00', 'error': {'errors': [{'code': " \ @@ -343,35 +354,688 @@ class GceInstanceStartTest(unittest.TestCase): "'machine-type-1' does not exist in zone 'europe-west3-b'.\"}]}, "\ "'httpErrorStatusCode': 400, 'httpErrorMessage': 'BAD REQUEST', " \ "'selfLink': " \ - "'https://www.googleapis.com/compute/v1/projects/polidea-airflow" \ + "'https://www.googleapis.com/compute/v1/projects/example-airflow" \ "/zones/europe-west3-b/operations/operation-1538578207537" \ "-577542784f769-7999ab71-94f9ec1d'} " @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook' - '._check_operation_status') + '._check_zone_operation_status') @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook' '._execute_set_machine_type') @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook.get_conn') def test_set_machine_type_should_handle_and_trim_gce_error( - self, get_conn, _execute_set_machine_type, _check_operation_status): + self, get_conn, _execute_set_machine_type, _check_zone_operation_status): get_conn.return_value = {} _execute_set_machine_type.return_value = {"name": "test-operation"} - _check_operation_status.return_value = ast.literal_eval(self.MOCK_OP_RESPONSE) + _check_zone_operation_status.return_value = ast.literal_eval(self.MOCK_OP_RESPONSE) with self.assertRaises(AirflowException) as cm: op = GceSetMachineTypeOperator( project_id=PROJECT_ID, - zone=LOCATION, + zone=ZONE, resource_id=RESOURCE_ID, body=SET_MACHINE_TYPE_BODY, task_id='id' ) op.execute(None) err = cm.exception - _check_operation_status.assert_called_once_with( - {}, "test-operation", PROJECT_ID, LOCATION) + _check_zone_operation_status.assert_called_once_with( + {}, "test-operation", PROJECT_ID, ZONE) _execute_set_machine_type.assert_called_once_with( - PROJECT_ID, LOCATION, RESOURCE_ID, SET_MACHINE_TYPE_BODY) + PROJECT_ID, ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY) # Checking the full message was sometimes failing due to different order # of keys in the serialized JSON self.assertIn("400 BAD REQUEST: {", str(err)) # checking the square bracket trim self.assertIn("UNSUPPORTED_OPERATION", str(err)) + + +GCE_INSTANCE_TEMPLATE_NAME = "instance-template-test" +GCE_INSTANCE_TEMPLATE_NEW_NAME = "instance-template-test-new" +GCE_INSTANCE_TEMPLATE_REQUEST_ID = "e12d5b48-4826-4ba9-ada6-0cff1e0b36a6" + +GCE_INSTANCE_TEMPLATE_BODY_GET = { + "kind": "compute#instanceTemplate", + "id": "6950321349997439715", + "creationTimestamp": "2018-10-15T06:20:12.777-07:00", + "name": GCE_INSTANCE_TEMPLATE_NAME, + "description": "", + "properties": { + "machineType": "n1-standard-1", + "networkInterfaces": [ + { + "kind": "compute#networkInterface", + "network": "https://www.googleapis.com/compute/v1/" + "projects/project/global/networks/default", + "accessConfigs": [ + { + "kind": "compute#accessConfig", + "type": "ONE_TO_ONE_NAT", + } + ] + }, + { + "network": "https://www.googleapis.com/compute/v1/" + "projects/project/global/networks/default", + "accessConfigs": [ + { + "kind": "compute#accessConfig", + "networkTier": "PREMIUM" + } + ] + } + ], + "disks": [ + { + "kind": "compute#attachedDisk", + "type": "PERSISTENT", + "licenses": [ + "A String", + ] + } + ], + "metadata": { + "kind": "compute#metadata", + "fingerprint": "GDPUYxlwHe4=" + }, + }, + "selfLink": "https://www.googleapis.com/compute/v1/projects/project" + "/global/instanceTemplates/instance-template-test" +} + +GCE_INSTANCE_TEMPLATE_BODY_INSERT = { + "name": GCE_INSTANCE_TEMPLATE_NEW_NAME, + "description": "", + "properties": { + "machineType": "n1-standard-1", + "networkInterfaces": [ + { + "network": "https://www.googleapis.com/compute/v1/" + "projects/project/global/networks/default", + "accessConfigs": [ + { + "type": "ONE_TO_ONE_NAT", + } + ] + }, + { + "network": "https://www.googleapis.com/compute/v1/" + "projects/project/global/networks/default", + "accessConfigs": [ + { + "networkTier": "PREMIUM" + } + ] + } + ], + "disks": [ + { + "type": "PERSISTENT", + } + ], + "metadata": { + "fingerprint": "GDPUYxlwHe4=" + }, + }, +} + +GCE_INSTANCE_TEMPLATE_BODY_GET_NEW = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_GET) +GCE_INSTANCE_TEMPLATE_BODY_GET_NEW['name'] = GCE_INSTANCE_TEMPLATE_NEW_NAME + + +class GceInstanceTemplateCopyTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_copy_template(self, mock_hook): + mock_hook.return_value.get_instance_template.side_effect = [ + HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT), + GCE_INSTANCE_TEMPLATE_BODY_GET, + GCE_INSTANCE_TEMPLATE_BODY_GET_NEW + ] + op = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=GCE_INSTANCE_TEMPLATE_NAME, + task_id='id', + body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME} + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.insert_instance_template.assert_called_once_with( + project_id=PROJECT_ID, + body=GCE_INSTANCE_TEMPLATE_BODY_INSERT, + request_id=None + ) + self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_idempotent_copy_template_when_already_copied(self, mock_hook): + mock_hook.return_value.get_instance_template.side_effect = [ + GCE_INSTANCE_TEMPLATE_BODY_GET_NEW + ] + op = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=GCE_INSTANCE_TEMPLATE_NAME, + task_id='id', + body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME} + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.insert_instance_template.assert_not_called() + self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_copy_template_with_request_id(self, mock_hook): + mock_hook.return_value.get_instance_template.side_effect = [ + HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT), + GCE_INSTANCE_TEMPLATE_BODY_GET, + GCE_INSTANCE_TEMPLATE_BODY_GET_NEW + ] + op = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=GCE_INSTANCE_TEMPLATE_NAME, + request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID, + task_id='id', + body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME} + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.insert_instance_template.assert_called_once_with( + project_id=PROJECT_ID, + body=GCE_INSTANCE_TEMPLATE_BODY_INSERT, + request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID, + ) + self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_copy_template_with_description_fields(self, mock_hook): + mock_hook.return_value.get_instance_template.side_effect = [ + HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT), + GCE_INSTANCE_TEMPLATE_BODY_GET, + GCE_INSTANCE_TEMPLATE_BODY_GET_NEW + ] + op = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=GCE_INSTANCE_TEMPLATE_NAME, + request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID, + task_id='id', + body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME, + "description": "New description"} + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + + body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT) + body_insert["description"] = "New description" + mock_hook.return_value.insert_instance_template.assert_called_once_with( + project_id=PROJECT_ID, + body=body_insert, + request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID, + ) + self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_copy_with_some_validation_warnings(self, mock_hook): + mock_hook.return_value.get_instance_template.side_effect = [ + HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT), + GCE_INSTANCE_TEMPLATE_BODY_GET, + GCE_INSTANCE_TEMPLATE_BODY_GET_NEW + ] + op = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=GCE_INSTANCE_TEMPLATE_NAME, + task_id='id', + body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME, + "some_wrong_field": "test", + "properties": { + "some_other_wrong_field": "test" + }} + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT) + body_insert["some_wrong_field"] = "test" + body_insert["properties"]["some_other_wrong_field"] = "test" + mock_hook.return_value.insert_instance_template.assert_called_once_with( + project_id=PROJECT_ID, + body=body_insert, + request_id=None, + ) + self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_copy_template_with_updated_nested_fields(self, mock_hook): + mock_hook.return_value.get_instance_template.side_effect = [ + HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT), + GCE_INSTANCE_TEMPLATE_BODY_GET, + GCE_INSTANCE_TEMPLATE_BODY_GET_NEW + ] + op = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=GCE_INSTANCE_TEMPLATE_NAME, + task_id='id', + body_patch={ + "name": GCE_INSTANCE_TEMPLATE_NEW_NAME, + "properties": { + "machineType": "n1-standard-2", + } + } + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT) + body_insert["properties"]["machineType"] = "n1-standard-2" + mock_hook.return_value.insert_instance_template.assert_called_once_with( + project_id=PROJECT_ID, + body=body_insert, + request_id=None + ) + self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_copy_template_with_smaller_array_fields(self, mock_hook): + mock_hook.return_value.get_instance_template.side_effect = [ + HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT), + GCE_INSTANCE_TEMPLATE_BODY_GET, + GCE_INSTANCE_TEMPLATE_BODY_GET_NEW + ] + op = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=GCE_INSTANCE_TEMPLATE_NAME, + task_id='id', + body_patch={ + "name": GCE_INSTANCE_TEMPLATE_NEW_NAME, + "properties": { + "machineType": "n1-standard-1", + "networkInterfaces": [ + { + "network": "https://www.googleapis.com/compute/v1/" + "projects/project/global/networks/default", + "accessConfigs": [ + { + "type": "ONE_TO_ONE_NAT", + "natIP": "8.8.8.8" + } + ] + } + ] + } + } + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT) + body_insert["properties"]["networkInterfaces"] = [ + { + "network": "https://www.googleapis.com/compute/v1/" + "projects/project/global/networks/default", + "accessConfigs": [ + { + "type": "ONE_TO_ONE_NAT", + "natIP": "8.8.8.8" + } + ] + } + ] + mock_hook.return_value.insert_instance_template.assert_called_once_with( + project_id=PROJECT_ID, + body=body_insert, + request_id=None + ) + self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_copy_template_with_bigger_array_fields(self, mock_hook): + mock_hook.return_value.get_instance_template.side_effect = [ + HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT), + GCE_INSTANCE_TEMPLATE_BODY_GET, + GCE_INSTANCE_TEMPLATE_BODY_GET_NEW + ] + op = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=GCE_INSTANCE_TEMPLATE_NAME, + task_id='id', + body_patch={ + "name": GCE_INSTANCE_TEMPLATE_NEW_NAME, + "properties": { + "disks": [ + { + "kind": "compute#attachedDisk", + "type": "SCRATCH", + "licenses": [ + "Updated String", + ] + }, + { + "kind": "compute#attachedDisk", + "type": "PERSISTENT", + "licenses": [ + "Another String", + ] + } + ], + } + } + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='v1', + gcp_conn_id='google_cloud_default') + + body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT) + body_insert["properties"]["disks"] = [ + { + "kind": "compute#attachedDisk", + "type": "SCRATCH", + "licenses": [ + "Updated String", + ] + }, + { + "kind": "compute#attachedDisk", + "type": "PERSISTENT", + "licenses": [ + "Another String", + ] + } + ] + mock_hook.return_value.insert_instance_template.assert_called_once_with( + project_id=PROJECT_ID, + body=body_insert, + request_id=None, + ) + self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_missing_name(self, mock_hook): + mock_hook.return_value.get_instance_template.side_effect = [ + HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT), + GCE_INSTANCE_TEMPLATE_BODY_GET, + GCE_INSTANCE_TEMPLATE_BODY_GET_NEW + ] + with self.assertRaises(AirflowException) as cm: + op = GceInstanceTemplateCopyOperator( + project_id=PROJECT_ID, + resource_id=GCE_INSTANCE_TEMPLATE_NAME, + request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID, + task_id='id', + body_patch={"description": "New description"} + ) + op.execute(None) + err = cm.exception + self.assertIn("should contain at least name for the new operator " + "in the 'name' field", str(err)) + mock_hook.assert_not_called() + + +GCE_INSTANCE_GROUP_MANAGER_NAME = "instance-group-test" +GCE_INSTANCE_TEMPLATE_SOURCE_URL = \ + "https://www.googleapis.com/compute/beta/projects/project" \ + "/global/instanceTemplates/instance-template-test" + +GCE_INSTANCE_TEMPLATE_OTHER_URL = \ + "https://www.googleapis.com/compute/beta/projects/project" \ + "/global/instanceTemplates/instance-template-other" + +GCE_INSTANCE_TEMPLATE_NON_EXISTING_URL = \ + "https://www.googleapis.com/compute/beta/projects/project" \ + "/global/instanceTemplates/instance-template-non-existing" + +GCE_INSTANCE_TEMPLATE_DESTINATION_URL = \ + "https://www.googleapis.com/compute/beta/projects/project" \ + "/global/instanceTemplates/instance-template-new" + +GCE_INSTANCE_GROUP_MANAGER_GET = { + "kind": "compute#instanceGroupManager", + "id": "2822359583810032488", + "creationTimestamp": "2018-10-17T05:39:35.793-07:00", + "name": GCE_INSTANCE_GROUP_MANAGER_NAME, + "zone": "https://www.googleapis.com/compute/beta/projects/project/zones/zone", + "instanceTemplate": GCE_INSTANCE_TEMPLATE_SOURCE_URL, + "versions": [ + { + "name": "v1", + "instanceTemplate": GCE_INSTANCE_TEMPLATE_SOURCE_URL, + "targetSize": { + "calculated": 1 + } + }, + { + "name": "v2", + "instanceTemplate": GCE_INSTANCE_TEMPLATE_OTHER_URL, + } + ], + "instanceGroup": GCE_INSTANCE_TEMPLATE_SOURCE_URL, + "baseInstanceName": GCE_INSTANCE_GROUP_MANAGER_NAME, + "fingerprint": "BKWB_igCNbQ=", + "currentActions": { + "none": 1, + "creating": 0, + "creatingWithoutRetries": 0, + "verifying": 0, + "recreating": 0, + "deleting": 0, + "abandoning": 0, + "restarting": 0, + "refreshing": 0 + }, + "pendingActions": { + "creating": 0, + "deleting": 0, + "recreating": 0, + "restarting": 0 + }, + "targetSize": 1, + "selfLink": "https://www.googleapis.com/compute/beta/projects/project/zones/" + "zone/instanceGroupManagers/" + GCE_INSTANCE_GROUP_MANAGER_NAME, + "autoHealingPolicies": [ + { + "initialDelaySec": 300 + } + ], + "serviceAccount": "198907790164@cloudservices.gserviceaccount.com" +} + +GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH = { + "instanceTemplate": GCE_INSTANCE_TEMPLATE_DESTINATION_URL, + "versions": [ + { + "name": "v1", + "instanceTemplate": GCE_INSTANCE_TEMPLATE_DESTINATION_URL, + "targetSize": { + "calculated": 1 + } + }, + { + "name": "v2", + "instanceTemplate": GCE_INSTANCE_TEMPLATE_OTHER_URL, + } + ], +} + +GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID = "e12d5b48-4826-4ba9-ada6-0cff1e0b36a6" + +GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY = { + "type": "OPPORTUNISTIC", + "minimalAction": "RESTART", + "maxSurge": { + "fixed": 1 + }, + "maxUnavailable": { + "percent": 10 + }, + "minReadySec": 1800 +} + + +class GceInstanceGroupManagerUpdateTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_instance_group_update(self, mock_hook): + mock_hook.return_value.get_instance_group_manager.return_value = \ + deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET) + op = GceInstanceGroupManagerUpdateTemplateOperator( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + task_id='id', + source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL, + destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='beta', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.patch_instance_group_manager.assert_called_once_with( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + body=GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH, + request_id=None + ) + self.assertTrue(result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_instance_group_update_no_instance_template_field(self, mock_hook): + instance_group_manager_no_template = deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET) + del instance_group_manager_no_template['instanceTemplate'] + mock_hook.return_value.get_instance_group_manager.return_value = \ + instance_group_manager_no_template + op = GceInstanceGroupManagerUpdateTemplateOperator( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + task_id='id', + source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL, + destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='beta', + gcp_conn_id='google_cloud_default') + expected_patch_no_instance_template = \ + deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH) + del expected_patch_no_instance_template['instanceTemplate'] + mock_hook.return_value.patch_instance_group_manager.assert_called_once_with( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + body=expected_patch_no_instance_template, + request_id=None + ) + self.assertTrue(result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_instance_group_update_no_versions_field(self, mock_hook): + instance_group_manager_no_versions = deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET) + del instance_group_manager_no_versions['versions'] + mock_hook.return_value.get_instance_group_manager.return_value = \ + instance_group_manager_no_versions + op = GceInstanceGroupManagerUpdateTemplateOperator( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + task_id='id', + source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL, + destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='beta', + gcp_conn_id='google_cloud_default') + expected_patch_no_versions = \ + deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH) + del expected_patch_no_versions['versions'] + mock_hook.return_value.patch_instance_group_manager.assert_called_once_with( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + body=expected_patch_no_versions, + request_id=None + ) + self.assertTrue(result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_instance_group_update_with_update_policy(self, mock_hook): + mock_hook.return_value.get_instance_group_manager.return_value = \ + deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET) + op = GceInstanceGroupManagerUpdateTemplateOperator( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + task_id='id', + update_policy=GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY, + source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL, + destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='beta', + gcp_conn_id='google_cloud_default') + expected_patch_with_update_policy = \ + deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH) + expected_patch_with_update_policy['updatePolicy'] = \ + GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY + mock_hook.return_value.patch_instance_group_manager.assert_called_once_with( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + body=expected_patch_with_update_policy, + request_id=None + ) + self.assertTrue(result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_successful_instance_group_update_with_request_id(self, mock_hook): + mock_hook.return_value.get_instance_group_manager.return_value = \ + deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET) + op = GceInstanceGroupManagerUpdateTemplateOperator( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + task_id='id', + source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL, + request_id=GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID, + destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='beta', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.patch_instance_group_manager.assert_called_once_with( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + body=GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH, + request_id=GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID + ) + self.assertTrue(result) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_try_to_use_api_v1(self, mock_hook): + with self.assertRaises(AirflowException) as cm: + GceInstanceGroupManagerUpdateTemplateOperator( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + task_id='id', + api_version='v1', + source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL, + destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL + ) + err = cm.exception + self.assertIn("Use beta api version or above", str(err)) + + @mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook') + def test_try_to_use_non_existing_template(self, mock_hook): + mock_hook.return_value.get_instance_group_manager.return_value = \ + deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET) + op = GceInstanceGroupManagerUpdateTemplateOperator( + project_id=PROJECT_ID, + zone=ZONE, + resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME, + task_id='id', + source_template=GCE_INSTANCE_TEMPLATE_NON_EXISTING_URL, + destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL + ) + result = op.execute(None) + mock_hook.assert_called_once_with(api_version='beta', + gcp_conn_id='google_cloud_default') + mock_hook.return_value.patch_instance_group_manager.assert_not_called() + self.assertTrue(result)