[AIRFLOW-3220] Add Instance Group Manager Operators for GCE (#4167)

This commit is contained in:
Jarek Potiuk 2018-11-13 00:32:19 +01:00 коммит произвёл Kaxil Naik
Родитель 34b310e56f
Коммит 7ee30b6fac
14 изменённых файлов: 1790 добавлений и 135 удалений

6
.gitignore поставляемый
Просмотреть файл

@ -146,3 +146,9 @@ npm-debug.log*
static/dist
derby.log
metastore_db
# Airflow log files when airflow is run locally
airflow-*.err
airflow-*.out
airflow-*.log
airflow-*.pid

Просмотреть файл

@ -24,7 +24,7 @@ Engine instance.
This DAG relies on the following Airflow variables
https://airflow.apache.org/concepts.html#variables
* PROJECT_ID - Google Cloud Platform project where the Compute Engine instance exists.
* LOCATION - Google Cloud Platform zone where the instance exists.
* ZONE - Google Cloud Platform zone where the instance exists.
* INSTANCE - Name of the Compute Engine instance.
* SHORT_MACHINE_TYPE_NAME - Machine type resource name to set, e.g. 'n1-standard-1'.
See https://cloud.google.com/compute/docs/machine-types
@ -37,19 +37,23 @@ from airflow import models
from airflow.contrib.operators.gcp_compute_operator import GceInstanceStartOperator, \
GceInstanceStopOperator, GceSetMachineTypeOperator
# [START howto_operator_gce_args]
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
INSTANCE = models.Variable.get('INSTANCE', '')
SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', '')
SET_MACHINE_TYPE_BODY = {
'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, SHORT_MACHINE_TYPE_NAME)
}
# [START howto_operator_gce_args_common]
PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow')
ZONE = models.Variable.get('ZONE', 'europe-west1-b')
INSTANCE = models.Variable.get('INSTANCE', 'test-instance')
default_args = {
'start_date': airflow.utils.dates.days_ago(1)
}
# [END howto_operator_gce_args]
# [END howto_operator_gce_args_common]
# [START howto_operator_gce_args_set_machine_type]
SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', 'n1-standard-1')
SET_MACHINE_TYPE_BODY = {
'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, SHORT_MACHINE_TYPE_NAME)
}
# [END howto_operator_gce_args_set_machine_type]
with models.DAG(
'example_gcp_compute',
@ -59,7 +63,7 @@ with models.DAG(
# [START howto_operator_gce_start]
gce_instance_start = GceInstanceStartOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=INSTANCE,
task_id='gcp_compute_start_task'
)
@ -67,14 +71,14 @@ with models.DAG(
# Duplicate start for idempotence testing
gce_instance_start2 = GceInstanceStartOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=INSTANCE,
task_id='gcp_compute_start_task2'
)
# [START howto_operator_gce_stop]
gce_instance_stop = GceInstanceStopOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=INSTANCE,
task_id='gcp_compute_stop_task'
)
@ -82,14 +86,14 @@ with models.DAG(
# Duplicate stop for idempotence testing
gce_instance_stop2 = GceInstanceStopOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=INSTANCE,
task_id='gcp_compute_stop_task2'
)
# [START howto_operator_gce_set_machine_type]
gce_set_machine_type = GceSetMachineTypeOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=INSTANCE,
body=SET_MACHINE_TYPE_BODY,
task_id='gcp_compute_set_machine_type'
@ -98,7 +102,7 @@ with models.DAG(
# Duplicate set machine type for idempotence testing
gce_set_machine_type2 = GceSetMachineTypeOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=INSTANCE,
body=SET_MACHINE_TYPE_BODY,
task_id='gcp_compute_set_machine_type2'

Просмотреть файл

@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that uses IGM-type compute operations:
* copy of Instance Template
* update template in Instance Group Manager
This DAG relies on the following OS environment variables
* PROJECT_ID - the Google Cloud Platform project where the Compute Engine instance exists
* ZONE - the zone where the Compute Engine instance exists
Variables for copy template operator:
* TEMPLATE_NAME - name of the template to copy
* NEW_TEMPLATE_NAME - name of the new template
* NEW_DESCRIPTION - description added to the template
Variables for update template in Group Manager:
* INSTANCE_GROUP_MANAGER_NAME - name of the Instance Group Manager
* SOURCE_TEMPLATE_URL - url of the template to replace in the Instance Group Manager
* DESTINATION_TEMPLATE_URL - url of the new template to set in the Instance Group Manager
"""
import os
import datetime
import airflow
from airflow import models
from airflow.contrib.operators.gcp_compute_operator import \
GceInstanceTemplateCopyOperator, GceInstanceGroupManagerUpdateTemplateOperator
# [START howto_operator_compute_igm_common_args]
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
ZONE = os.environ.get('ZONE', 'europe-west1-b')
default_args = {
'start_date': airflow.utils.dates.days_ago(1)
}
# [END howto_operator_compute_igm_common_args]
# [START howto_operator_compute_template_copy_args]
TEMPLATE_NAME = os.environ.get('TEMPLATE_NAME', 'instance-template-test')
NEW_TEMPLATE_NAME = os.environ.get('NEW_TEMPLATE_NAME',
'instance-template-test-new')
NEW_DESCRIPTION = os.environ.get('NEW_DESCRIPTION', 'Test new description')
GCE_INSTANCE_TEMPLATE_BODY_UPDATE = {
"name": NEW_TEMPLATE_NAME,
"description": NEW_DESCRIPTION,
"properties": {
"machineType": "n1-standard-2"
}
}
# [END howto_operator_compute_template_copy_args]
# [START howto_operator_compute_igm_update_template_args]
INSTANCE_GROUP_MANAGER_NAME = os.environ.get('INSTANCE_GROUP_MANAGER_NAME',
'instance-group-test')
SOURCE_TEMPLATE_URL = os.environ.get(
'SOURCE_TEMPLATE_URL',
"https://www.googleapis.com/compute/beta/projects/"
"example-project/global/instanceTemplates/instance-template-test")
DESTINATION_TEMPLATE_URL = os.environ.get(
'DESTINATION_TEMPLATE_URL',
"https://www.googleapis.com/compute/beta/projects/"
"example-airflow/global/instanceTemplates/" + NEW_TEMPLATE_NAME)
UPDATE_POLICY = {
"type": "OPPORTUNISTIC",
"minimalAction": "RESTART",
"maxSurge": {
"fixed": 1
},
"minReadySec": 1800
}
# [END howto_operator_compute_igm_update_template_args]
with models.DAG(
'example_gcp_compute_igm',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)
) as dag:
# [START howto_operator_gce_igm_copy_template]
gce_instance_template_copy = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=TEMPLATE_NAME,
body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
task_id='gcp_compute_igm_copy_template_task'
)
# [END howto_operator_gce_igm_copy_template]
# Added to check for idempotence
gce_instance_template_copy2 = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=TEMPLATE_NAME,
body_patch=GCE_INSTANCE_TEMPLATE_BODY_UPDATE,
task_id='gcp_compute_igm_copy_template_task_2'
)
# [START howto_operator_gce_igm_update_template]
gce_instance_group_manager_update_template = \
GceInstanceGroupManagerUpdateTemplateOperator(
project_id=PROJECT_ID,
resource_id=INSTANCE_GROUP_MANAGER_NAME,
zone=ZONE,
source_template=SOURCE_TEMPLATE_URL,
destination_template=DESTINATION_TEMPLATE_URL,
update_policy=UPDATE_POLICY,
task_id='gcp_compute_igm_group_manager_update_template'
)
# [END howto_operator_gce_igm_update_template]
# Added to check for idempotence (and without UPDATE_POLICY)
gce_instance_group_manager_update_template2 = \
GceInstanceGroupManagerUpdateTemplateOperator(
project_id=PROJECT_ID,
resource_id=INSTANCE_GROUP_MANAGER_NAME,
zone=ZONE,
source_template=SOURCE_TEMPLATE_URL,
destination_template=DESTINATION_TEMPLATE_URL,
task_id='gcp_compute_igm_group_manager_update_template_2'
)
gce_instance_template_copy >> gce_instance_template_copy2 >> \
gce_instance_group_manager_update_template >> \
gce_instance_group_manager_update_template2

Просмотреть файл

@ -33,9 +33,9 @@ from airflow import models
from airflow.contrib.operators.gcp_function_operator import GcfFunctionDeleteOperator
# [START howto_operator_gcf_delete_args]
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow')
LOCATION = models.Variable.get('LOCATION', 'europe-west1')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', 'helloWorld')
# A fully-qualified name of the function to delete
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,

Просмотреть файл

@ -45,11 +45,14 @@ from airflow.contrib.operators.gcp_function_operator \
from airflow.utils import dates
# [START howto_operator_gcf_deploy_variables]
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow')
LOCATION = models.Variable.get('LOCATION', 'europe-west1')
SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')
SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY',
'https://source.developers.google.com/'
'projects/example-airflow/'
'repos/hello-world/moveable-aliases/master')
ZIP_PATH = models.Variable.get('ZIP_PATH', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
@ -70,11 +73,7 @@ body = {
# [START howto_operator_gcf_deploy_args]
default_args = {
'start_date': dates.days_ago(1),
'project_id': PROJECT_ID,
'location': LOCATION,
'body': body,
'validate_body': VALIDATE_BODY
'start_date': dates.days_ago(1)
}
# [END howto_operator_gcf_deploy_args]
@ -103,11 +102,15 @@ with models.DAG(
# [START howto_operator_gcf_deploy]
deploy_task = GcfFunctionDeployOperator(
task_id="gcf_deploy_task",
name=FUNCTION_NAME
name=FUNCTION_NAME,
project_id=PROJECT_ID,
location=LOCATION,
body=body,
validate_body=VALIDATE_BODY
)
# [END howto_operator_gcf_deploy]
delete_task = GcfFunctionDeleteOperator(
task_id="gcf_delete_task",
name=FUNCTION_NAME
name=FUNCTION_NAME,
)
deploy_task >> delete_task

Просмотреть файл

@ -68,14 +68,14 @@ class GceHook(GoogleCloudBaseHook):
"""
Starts an existing instance defined by project_id, zone and resource_id.
:param project_id: Google Cloud Platform project where the Compute Engine
instance exists.
:param project_id: Google Cloud Platform project ID where the Compute Engine
Instance exists
:type project_id: str
:param zone: Google Cloud Platform zone where the instance exists.
:param zone: Google Cloud Platform zone where the instance exists
:type zone: str
:param resource_id: Name of the Compute Engine instance resource.
:param resource_id: Name of the Compute Engine instance resource
:type resource_id: str
:return: True if the operation succeeded, raises an error otherwise
:return: True if the operation succeeded, raises an error otherwise.
:rtype: bool
"""
response = self.get_conn().instances().start(
@ -83,21 +83,26 @@ class GceHook(GoogleCloudBaseHook):
zone=zone,
instance=resource_id
).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(project_id, zone, operation_name)
try:
operation_name = response["name"]
except KeyError:
raise AirflowException(
"Wrong response '{}' returned - it should contain "
"'name' field".format(response))
return self._wait_for_operation_to_complete(project_id, operation_name, zone)
def stop_instance(self, project_id, zone, resource_id):
"""
Stops an instance defined by project_id, zone and resource_id.
Stops an instance defined by project_id, zone and resource_id
:param project_id: Google Cloud Platform project where the Compute Engine
instance exists.
:param project_id: Google Cloud Platform project ID where the Compute Engine
Instance exists
:type project_id: str
:param zone: Google Cloud Platform zone where the instance exists.
:param zone: Google Cloud Platform zone where the instance exists
:type zone: str
:param resource_id: Name of the Compute Engine instance resource.
:param resource_id: Name of the Compute Engine instance resource
:type resource_id: str
:return: True if the operation succeeded, raises an error otherwise
:return: True if the operation succeeded, raises an error otherwise.
:rtype: bool
"""
response = self.get_conn().instances().stop(
@ -105,50 +110,178 @@ class GceHook(GoogleCloudBaseHook):
zone=zone,
instance=resource_id
).execute(num_retries=NUM_RETRIES)
operation_name = response["name"]
return self._wait_for_operation_to_complete(project_id, zone, operation_name)
try:
operation_name = response["name"]
except KeyError:
raise AirflowException(
"Wrong response '{}' returned - it should contain "
"'name' field".format(response))
return self._wait_for_operation_to_complete(project_id, operation_name, zone)
def set_machine_type(self, project_id, zone, resource_id, body):
"""
Sets machine type of an instance defined by project_id, zone and resource_id.
:param project_id: Google Cloud Platform project where the Compute Engine
instance exists.
:param project_id: Google Cloud Platform project ID where the Compute Engine
Instance exists
:type project_id: str
:param zone: Google Cloud Platform zone where the instance exists.
:type zone: str
:param resource_id: Name of the Compute Engine instance resource.
:param resource_id: Name of the Compute Engine instance resource
:type resource_id: str
:param body: Body required by the Compute Engine setMachineType API,
as described in
https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType
as described in
https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType
:type body: dict
:return: True if the operation succeeded, raises an error otherwise
:return: True if the operation succeeded, raises an error otherwise.
:rtype: bool
"""
response = self._execute_set_machine_type(project_id, zone, resource_id, body)
operation_name = response["name"]
return self._wait_for_operation_to_complete(project_id, zone, operation_name)
try:
operation_name = response["name"]
except KeyError:
raise AirflowException(
"Wrong response '{}' returned - it should contain "
"'name' field".format(response))
return self._wait_for_operation_to_complete(project_id, operation_name, zone)
def _execute_set_machine_type(self, project_id, zone, resource_id, body):
return self.get_conn().instances().setMachineType(
project=project_id, zone=zone, instance=resource_id, body=body)\
.execute(num_retries=NUM_RETRIES)
def _wait_for_operation_to_complete(self, project_id, zone, operation_name):
def get_instance_template(self, project_id, resource_id):
"""
Waits for the named operation to complete - checks status of the
asynchronous call.
Retrieves instance template by project_id and resource_id.
:param project_id: Google Cloud Platform project ID where the Compute Engine
Instance template exists
:type project_id: str
:param resource_id: Name of the instance template
:type resource_id: str
:return: Instance template representation as object according to
https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates
:rtype: dict
"""
response = self.get_conn().instanceTemplates().get(
project=project_id,
instanceTemplate=resource_id
).execute(num_retries=NUM_RETRIES)
return response
def insert_instance_template(self, project_id, body, request_id=None):
"""
Inserts instance template using body specified
:param project_id: Google Cloud Platform project ID where the Compute Engine
Instance exists
:type project_id: str
:param body: Instance template representation as object according to
https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates
:type body: dict
:param request_id: Optional, unique request_id that you might add to achieve
full idempotence (for example when client call times out repeating the request
with the same request id will not create a new instance template again)
It should be in UUID format as defined in RFC 4122
:type request_id: str
:return: True if the operation succeeded
:rtype: bool
"""
response = self.get_conn().instanceTemplates().insert(
project=project_id,
body=body,
requestId=request_id
).execute(num_retries=NUM_RETRIES)
try:
operation_name = response["name"]
except KeyError:
raise AirflowException(
"Wrong response '{}' returned - it should contain "
"'name' field".format(response))
return self._wait_for_operation_to_complete(project_id, operation_name)
def get_instance_group_manager(self, project_id, zone, resource_id):
"""
Retrieves Instance Group Manager by project_id, zone and resource_id.
:param project_id: Google Cloud Platform project ID where the Compute Engine
Instance Group Manager exists
:type project_id: str
:param zone: Google Cloud Platform zone where the Instance Group Manager exists
:type zone: str
:param resource_id: Name of the Instance Group Manager
:type resource_id: str
:return: Instance group manager representation as object according to
https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers
:rtype: dict
"""
response = self.get_conn().instanceGroupManagers().get(
project=project_id,
zone=zone,
instanceGroupManager=resource_id
).execute(num_retries=NUM_RETRIES)
return response
def patch_instance_group_manager(self, project_id, zone, resource_id,
body, request_id=None):
"""
Patches Instance Group Manager with the specified body.
:param project_id: Google Cloud Platform project ID where the Compute Engine
Instance Group Manager exists
:type project_id: str
:param zone: Google Cloud Platform zone where the Instance Group Manager exists
:type zone: str
:param resource_id: Name of the Instance Group Manager
:type resource_id: str
:param body: Instance Group Manager representation as json-merge-patch object
according to
https://cloud.google.com/compute/docs/reference/rest/beta/instanceTemplates/patch
:type body: dict
:param request_id: Optional, unique request_id that you might add to achieve
full idempotence (for example when client call times out repeating the request
with the same request id will not create a new instance template again).
It should be in UUID format as defined in RFC 4122
:type request_id: str
:return: True if the operation succeeded
:rtype: bool
"""
response = self.get_conn().instanceGroupManagers().patch(
project=project_id,
zone=zone,
instanceGroupManager=resource_id,
body=body,
requestId=request_id
).execute(num_retries=NUM_RETRIES)
try:
operation_name = response["name"]
except KeyError:
raise AirflowException(
"Wrong response '{}' returned - it should contain "
"'name' field".format(response))
return self._wait_for_operation_to_complete(project_id, operation_name, zone)
def _wait_for_operation_to_complete(self, project_id, operation_name, zone=None):
"""
Waits for the named operation to complete - checks status of the async call.
:param operation_name: name of the operation
:type operation_name: str
:param zone: optional region of the request (might be None for global operations)
:type zone: str
:return: True if the operation succeeded, raises an error otherwise
:rtype: bool
"""
service = self.get_conn()
while True:
operation_response = self._check_operation_status(
service, operation_name, project_id, zone)
if zone is None:
# noinspection PyTypeChecker
operation_response = self._check_global_operation_status(
service, operation_name, project_id)
else:
# noinspection PyTypeChecker
operation_response = self._check_zone_operation_status(
service, operation_name, project_id, zone)
if operation_response.get("status") == GceOperationStatus.DONE:
error = operation_response.get("error")
if error:
@ -161,7 +294,14 @@ class GceHook(GoogleCloudBaseHook):
return True
time.sleep(TIME_TO_SLEEP_IN_SECONDS)
def _check_operation_status(self, service, operation_name, project_id, zone):
@staticmethod
def _check_zone_operation_status(service, operation_name, project_id, zone):
return service.zoneOperations().get(
project=project_id, zone=zone, operation=operation_name).execute(
num_retries=NUM_RETRIES)
@staticmethod
def _check_global_operation_status(service, operation_name, project_id):
return service.globalOperations().get(
project=project_id, operation=operation_name).execute(
num_retries=NUM_RETRIES)

Просмотреть файл

@ -65,7 +65,7 @@ class GcfHook(GoogleCloudBaseHook):
:param name: name of the function
:type name: str
:return: a CloudFunction object representing the function
:return: a Cloud Functions object representing the function
:rtype: dict
"""
return self.get_conn().projects().locations().functions().get(
@ -78,7 +78,7 @@ class GcfHook(GoogleCloudBaseHook):
:param full_location: full location including the project in the form of
of /projects/<PROJECT>/location/<LOCATION>
:type full_location: str
:return: array of CloudFunction objects - representing functions in the location
:return: array of Cloud Functions objects - representing functions in the location
:rtype: [dict]
"""
list_response = self.get_conn().projects().locations().functions().list(

Просмотреть файл

@ -16,18 +16,24 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from copy import deepcopy
from googleapiclient.errors import HttpError
from airflow import AirflowException
from airflow.contrib.hooks.gcp_compute_hook import GceHook
from airflow.contrib.utils.gcp_field_sanitizer import GcpBodyFieldSanitizer
from airflow.contrib.utils.gcp_field_validator import GcpBodyFieldValidator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from json_merge_patch import merge
class GceBaseOperator(BaseOperator):
"""
Abstract base operator for Google Compute Engine operators to inherit from.
"""
@apply_defaults
def __init__(self,
project_id,
@ -61,10 +67,10 @@ class GceBaseOperator(BaseOperator):
class GceInstanceStartOperator(GceBaseOperator):
"""
Start an instance in Google Compute Engine.
Starts an instance in Google Compute Engine.
:param project_id: Google Cloud Platform project where the Compute Engine
instance exists.
:param project_id: Google Cloud Platform Project ID where the Compute Engine
Instance exists.
:type project_id: str
:param zone: Google Cloud Platform zone where the instance exists.
:type zone: str
@ -72,10 +78,12 @@ class GceInstanceStartOperator(GceBaseOperator):
:type resource_id: str
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: API version used (e.g. v1).
:param api_version: API version used (for example v1 or beta).
:type api_version: str
"""
# [START gce_instance_start_template_fields]
template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version')
# [END gce_instance_start_template_fields]
@apply_defaults
def __init__(self,
@ -95,10 +103,10 @@ class GceInstanceStartOperator(GceBaseOperator):
class GceInstanceStopOperator(GceBaseOperator):
"""
Stop an instance in Google Compute Engine.
Stops an instance in Google Compute Engine.
:param project_id: Google Cloud Platform project where the Compute Engine
instance exists.
:param project_id: Google Cloud Platform Project ID where the Compute Engine
Instance exists.
:type project_id: str
:param zone: Google Cloud Platform zone where the instance exists.
:type zone: str
@ -106,10 +114,12 @@ class GceInstanceStopOperator(GceBaseOperator):
:type resource_id: str
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: API version used (e.g. v1).
:param api_version: API version used (for example v1 or beta).
:type api_version: str
"""
# [START gce_instance_stop_template_fields]
template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version')
# [END gce_instance_stop_template_fields]
@apply_defaults
def __init__(self,
@ -135,10 +145,10 @@ SET_MACHINE_TYPE_VALIDATION_SPECIFICATION = [
class GceSetMachineTypeOperator(GceBaseOperator):
"""
Changes the machine type for a stopped instance to the machine type specified in
the request.
the request.
:param project_id: Google Cloud Platform project where the Compute Engine
instance exists.
:param project_id: Google Cloud Platform Project ID where the Compute Engine
Instance exists.
:type project_id: str
:param zone: Google Cloud Platform zone where the instance exists.
:type zone: str
@ -149,10 +159,14 @@ class GceSetMachineTypeOperator(GceBaseOperator):
:type body: dict
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: API version used (e.g. v1).
:param api_version: API version used (for example v1 or beta).
:type api_version: str
:param validate_body: If set to False, body validation is not performed.
:type validate_body: bool
"""
# [START gce_instance_set_machine_type_template_fields]
template_fields = ('project_id', 'zone', 'resource_id', 'gcp_conn_id', 'api_version')
# [END gce_instance_set_machine_type_template_fields]
@apply_defaults
def __init__(self,
@ -181,3 +195,241 @@ class GceSetMachineTypeOperator(GceBaseOperator):
self._validate_all_body_fields()
return self._hook.set_machine_type(self.project_id, self.zone,
self.resource_id, self.body)
GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION = [
dict(name="name", regexp="^.+$"),
dict(name="description", optional=True),
dict(name="properties", type='dict', optional=True, fields=[
dict(name="description", optional=True),
dict(name="tags", optional=True, fields=[
dict(name="items", optional=True)
]),
dict(name="machineType", optional=True),
dict(name="canIpForward", optional=True),
dict(name="networkInterfaces", optional=True), # not validating deeper
dict(name="disks", optional=True), # not validating the array deeper
dict(name="metadata", optional=True, fields=[
dict(name="fingerprint", optional=True),
dict(name="items", optional=True),
dict(name="kind", optional=True),
]),
dict(name="serviceAccounts", optional=True), # not validating deeper
dict(name="scheduling", optional=True, fields=[
dict(name="onHostMaintenance", optional=True),
dict(name="automaticRestart", optional=True),
dict(name="preemptible", optional=True),
dict(name="nodeAffinitites", optional=True), # not validating deeper
]),
dict(name="labels", optional=True),
dict(name="guestAccelerators", optional=True), # not validating deeper
dict(name="minCpuPlatform", optional=True),
]),
]
GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE = [
"kind",
"id",
"name",
"creationTimestamp",
"properties.disks.sha256",
"properties.disks.kind",
"properties.disks.sourceImageEncryptionKey.sha256",
"properties.disks.index",
"properties.disks.licenses",
"properties.networkInterfaces.kind",
"properties.networkInterfaces.accessConfigs.kind",
"properties.networkInterfaces.name",
"properties.metadata.kind",
"selfLink"
]
class GceInstanceTemplateCopyOperator(GceBaseOperator):
"""
Copies the instance template, applying specified changes.
:param project_id: Google Cloud Platform Project ID where the Compute Engine
instance exists.
:type project_id: str
:param resource_id: Name of the Instance Template
:type resource_id: str
:param body_patch: Patch to the body of instanceTemplates object following rfc7386
PATCH semantics. The body_patch content follows
https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates
Name field is required as we need to rename the template,
all the other fields are optional. It is important to follow PATCH semantics
- arrays are replaced fully, so if you need to update an array you should
provide the whole target array as patch element.
:type body_patch: dict
:param request_id: Optional, unique request_id that you might add to achieve
full idempotence (for example when client call times out repeating the request
with the same request id will not create a new instance template again).
It should be in UUID format as defined in RFC 4122.
:type request_id: str
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: API version used (for example v1 or beta).
:type api_version: str
:param validate_body: If set to False, body validation is not performed.
:type validate_body: bool
"""
# [START gce_instance_template_copy_operator_template_fields]
template_fields = ('project_id', 'resource_id', 'request_id',
'gcp_conn_id', 'api_version')
# [END gce_instance_template_copy_operator_template_fields]
@apply_defaults
def __init__(self,
project_id,
resource_id,
body_patch,
request_id=None,
gcp_conn_id='google_cloud_default',
api_version='v1',
validate_body=True,
*args, **kwargs):
self.body_patch = body_patch
self.request_id = request_id
self._field_validator = None
if 'name' not in self.body_patch:
raise AirflowException("The body '{}' should contain at least "
"name for the new operator in the 'name' field".
format(body_patch))
if validate_body:
self._field_validator = GcpBodyFieldValidator(
GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION, api_version=api_version)
self._field_sanitizer = GcpBodyFieldSanitizer(
GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE)
super(GceInstanceTemplateCopyOperator, self).__init__(
project_id=project_id, zone='global', resource_id=resource_id,
gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs)
def _validate_all_body_fields(self):
if self._field_validator:
self._field_validator.validate(self.body_patch)
def execute(self, context):
self._validate_all_body_fields()
try:
# Idempotence check (sort of) - we want to check if the new template
# is already created and if is, then we assume it was created by previous run
# of CopyTemplate operator - we do not check if content of the template
# is as expected. Templates are immutable so we cannot update it anyway
# and deleting/recreating is not worth the hassle especially
# that we cannot delete template if it is already used in some Instance
# Group Manager. We assume success if the template is simply present
existing_template = self._hook.get_instance_template(
project_id=self.project_id, resource_id=self.body_patch['name'])
self.log.info("The {} template already existed. It was likely "
"created by previous run of the operator. Assuming success.")
return existing_template
except HttpError as e:
# We actually expect to get 404 / Not Found here as the template should
# not yet exist
if not e.resp.status == 404:
raise e
old_body = self._hook.get_instance_template(project_id=self.project_id,
resource_id=self.resource_id)
new_body = deepcopy(old_body)
self._field_sanitizer.sanitize(new_body)
new_body = merge(new_body, self.body_patch)
self.log.info("Calling insert instance template with updated body: {}".
format(new_body))
self._hook.insert_instance_template(project_id=self.project_id,
body=new_body,
request_id=self.request_id)
return self._hook.get_instance_template(project_id=self.project_id,
resource_id=self.body_patch['name'])
class GceInstanceGroupManagerUpdateTemplateOperator(GceBaseOperator):
"""
Patches the Instance Group Manager, replacing source template URL with the
destination one. API V1 does not have update/patch operations for Instance
Group Manager, so you must use beta or newer API version. Beta is the default.
:param project_id: Google Cloud Platform Project ID where the Compute Engine
Instance exists.
:type project_id: str
:param resource_id: Name of the Instance Group Manager
:type resource_id: str
:param zone: Google Cloud Platform zone where the Instance Group Manager exists.
:type zone: str
:param request_id: Optional, unique request_id that you might add to achieve
full idempotence (for example when client call times out repeating the request
with the same request id will not create a new instance template again).
It should be in UUID format as defined in RFC 4122
:type request_id: str
:param update_policy: The update policy for this managed instance group. See
https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/patch
for details of the updatePolicy fields. It's an optional field.
:type dict
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: API version used (for example beta).
:type api_version: str
"""
# [START gce_igm_update_template_operator_template_fields]
template_fields = ('project_id', 'resource_id', 'zone', 'request_id',
'source_template', 'destination_template',
'gcp_conn_id', 'api_version')
# [END gce_igm_update_template_operator_template_fields]
@apply_defaults
def __init__(self,
project_id,
resource_id,
zone,
source_template,
destination_template,
update_policy=None,
request_id=None,
gcp_conn_id='google_cloud_default',
api_version='beta',
*args, **kwargs):
self.zone = zone
self.source_template = source_template
self.destination_template = destination_template
self.request_id = request_id
self.update_policy = update_policy
self._change_performed = False
if api_version == 'v1':
raise AirflowException("Api version v1 does not have update/patch "
"operations for Instance Group Managers. Use beta"
" api version or above")
super(GceInstanceGroupManagerUpdateTemplateOperator, self).__init__(
project_id=project_id, zone=self.zone, resource_id=resource_id,
gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs)
def _possibly_replace_template(self, dictionary):
# type: (dict) -> None
if dictionary.get('instanceTemplate') == self.source_template:
dictionary['instanceTemplate'] = self.destination_template
self._change_performed = True
def execute(self, context):
old_instance_group_manager = self._hook.get_instance_group_manager(
project_id=self.project_id,
zone=self.zone,
resource_id=self.resource_id)
patch_body = {}
if 'versions' in old_instance_group_manager:
patch_body['versions'] = old_instance_group_manager['versions']
if 'instanceTemplate' in old_instance_group_manager:
patch_body['instanceTemplate'] = old_instance_group_manager['instanceTemplate']
if self.update_policy:
patch_body['updatePolicy'] = self.update_policy
self._possibly_replace_template(patch_body)
if 'versions' in patch_body:
for version in patch_body['versions']:
self._possibly_replace_template(version)
if self._change_performed or self.update_policy:
self.log.info("Calling patch instance template with updated body: {}".
format(patch_body))
return self._hook.patch_instance_group_manager(
project_id=self.project_id, zone=self.zone, resource_id=self.resource_id,
body=patch_body, request_id=self.request_id)
else:
# Idempotence achieved
return True

Просмотреть файл

@ -93,7 +93,7 @@ class GcfFunctionDeployOperator(BaseOperator):
. Different API versions require different variants of the Cloud Functions
dictionary.
:type body: dict or google.cloud.functions.v1.CloudFunction
:param gcp_conn_id: The connection ID to use to connect to Google Cloud Platform.
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
:param api_version: API version used (for example v1 or v1beta1).
:type api_version: str
@ -105,6 +105,9 @@ class GcfFunctionDeployOperator(BaseOperator):
:param validate_body: If set to False, body validation is not performed.
:type validate_body: bool
"""
# [START gce_function_deploy_template_operator_template_fields]
template_fields = ('project_id', 'location', 'gcp_conn_id', 'api_version')
# [END gce_function_deploy_template_operator_template_fields]
@apply_defaults
def __init__(self,
@ -276,6 +279,9 @@ class GcfFunctionDeleteOperator(BaseOperator):
:param api_version: API version used (for example v1 or v1beta1).
:type api_version: str
"""
# [START gce_function_delete_template_operator_template_fields]
template_fields = ('name', 'gcp_conn_id', 'api_version')
# [END gce_function_delete_template_operator_template_fields]
@apply_defaults
def __init__(self,

Просмотреть файл

@ -0,0 +1,162 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Sanitizer for body fields sent via GCP API.
The sanitizer removes fields specified from the body.
Context
-------
In some cases where GCP operation requires modification of existing resources (such
as instances or instance templates) we need to sanitize body of the resources returned
via GCP APIs. This is in the case when we retrieve information from GCP first,
modify the body and either update the existing resource or create a new one with the
modified body. Usually when you retrieve resource from GCP you get some extra fields which
are Output-only, and we need to delete those fields if we want to use
the body as input for subsequent create/insert type operation.
Field specification
-------------------
Specification of fields is an array of strings which denote names of fields to be removed.
The field can be either direct field name to remove from the body or the full
specification of the path you should delete - separated with '.'
>>> FIELDS_TO_SANITIZE = [
>>> "kind",
>>> "properties.disks.kind",
>>> "properties.metadata.kind",
>>>]
>>> body = {
>>> "kind": "compute#instanceTemplate",
>>> "name": "instance",
>>> "properties": {
>>> "disks": [
>>> {
>>> "name": "a",
>>> "kind": "compute#attachedDisk",
>>> "type": "PERSISTENT",
>>> "mode": "READ_WRITE",
>>> },
>>> {
>>> "name": "b",
>>> "kind": "compute#attachedDisk",
>>> "type": "PERSISTENT",
>>> "mode": "READ_WRITE",
>>> }
>>> ],
>>> "metadata": {
>>> "kind": "compute#metadata",
>>> "fingerprint": "GDPUYxlwHe4="
>>> },
>>> }
>>> }
>>> sanitizer=GcpBodyFieldSanitizer(FIELDS_TO_SANITIZE)
>>> SANITIZED_BODY = sanitizer.sanitize(body)
>>> json.dumps(SANITIZED_BODY, indent=2)
{
"name": "instance",
"properties": {
"disks": [
{
"name": "a",
"type": "PERSISTENT",
"mode": "READ_WRITE",
},
{
"name": "b",
"type": "PERSISTENT",
"mode": "READ_WRITE",
}
],
"metadata": {
"fingerprint": "GDPUYxlwHe4="
},
}
}
Note that the components of the path can be either dictionaries or arrays of dictionaries.
In case they are dictionaries, subsequent component names key of the field, in case of
arrays - the sanitizer iterates through all dictionaries in the array and searches
components in all elements of the array.
"""
from airflow import LoggingMixin, AirflowException
class GcpFieldSanitizerException(AirflowException):
"""Thrown when sanitizer finds unexpected field type in the path
(other than dict or array).
"""
def __init__(self, message):
super(GcpFieldSanitizerException, self).__init__(message)
class GcpBodyFieldSanitizer(LoggingMixin):
"""Sanitizes the body according to specification.
:param sanitize_specs: array of strings that specifies which fields to remove
:type sanitize_specs: [string]
"""
def __init__(self, sanitize_specs):
# type: ([str]) -> None
super(GcpBodyFieldSanitizer, self).__init__()
self._sanitize_specs = sanitize_specs
def _sanitize(self, dictionary, remaining_field_spec, current_path):
field_split = remaining_field_spec.split(".", 1)
if len(field_split) == 1:
field_name = field_split[0]
if field_name in dictionary:
self.log.info("Deleted {} [{}]".format(field_name, current_path))
del dictionary[field_name]
else:
self.log.debug("The field {} is missing in {} at the path {}.".
format(field_name, dictionary, current_path))
else:
field_name = field_split[0]
remaining_path = field_split[1]
child = dictionary.get(field_name)
if child is None:
self.log.debug("The field {} is missing in {} at the path {}. ".
format(field_name, dictionary, current_path))
elif isinstance(child, dict):
self._sanitize(child, remaining_path, "{}.{}".format(
current_path, field_name))
elif isinstance(child, list):
for index, elem in enumerate(child):
if not isinstance(elem, dict):
self.log.warn(
"The field {} element at index {} is of wrong type. "
"It should be dict and is {}. Skipping it.".
format(current_path, index, elem))
self._sanitize(elem, remaining_path, "{}.{}[{}]".format(
current_path, field_name, index))
else:
self.log.warn(
"The field {} is of wrong type. "
"It should be dict or list and it is {}. Skipping it.".
format(current_path, child))
def sanitize(self, body):
for elem in self._sanitize_specs:
self._sanitize(body, elem, "")

Просмотреть файл

@ -86,8 +86,8 @@ template variables <macros>` and a ``templates_dict`` argument.
The ``templates_dict`` argument is templated, so each value in the dictionary
is evaluated as a :ref:`Jinja template <jinja-templating>`.
Google Cloud Platform Operators
-------------------------------
Google Cloud Storage Operators
------------------------------
GoogleCloudStorageToBigQueryOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@ -102,22 +102,31 @@ to execute a BigQuery load job.
:start-after: [START howto_operator_gcs_to_bq]
:end-before: [END howto_operator_gcs_to_bq]
Google Compute Engine Operators
-------------------------------
GceInstanceStartOperator
^^^^^^^^^^^^^^^^^^^^^^^^
Allows to start an existing Google Compute Engine instance.
Use the
:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator`
to start an existing Google Compute Engine instance.
In this example parameter values are extracted from Airflow variables.
Moreover, the ``default_args`` dict is used to pass common arguments to all operators in a single DAG.
Arguments
"""""""""
The following examples of OS environment variables show how you can build function name
to use in the operator and build default args to pass them to multiple tasks:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
:language: python
:start-after: [START howto_operator_gce_args]
:end-before: [END howto_operator_gce_args]
:start-after: [START howto_operator_gce_args_common]
:end-before: [END howto_operator_gce_args_common]
Define the :class:`~airflow.contrib.operators.gcp_compute_operator
.GceInstanceStartOperator` by passing the required arguments to the constructor.
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
:language: python
@ -125,15 +134,42 @@ Define the :class:`~airflow.contrib.operators.gcp_compute_operator
:start-after: [START howto_operator_gce_start]
:end-before: [END howto_operator_gce_start]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py
:language: python
:dedent: 4
:start-after: [START gce_instance_start_template_fields]
:end-before: [END gce_instance_start_template_fields]
More information
""""""""""""""""
See `Google Compute Engine API documentation <https://cloud.google.com/compute/docs/reference/rest/v1/instances/start>`_
GceInstanceStopOperator
^^^^^^^^^^^^^^^^^^^^^^^
Allows to stop an existing Google Compute Engine instance.
Use the operator to stop Google Compute Engine instance.
For parameter definition take a look at :class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator` above.
For parameter definition take a look at
:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStopOperator`
Define the :class:`~airflow.contrib.operators.gcp_compute_operator
.GceInstanceStopOperator` by passing the required arguments to the constructor.
Arguments
"""""""""
The following examples of OS environment variables show how you can build function name
to use in the operator and build default args to pass them to multiple tasks:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
:language: python
:start-after: [START howto_operator_gce_args_common]
:end-before: [END howto_operator_gce_args_common]
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
:language: python
@ -141,15 +177,48 @@ Define the :class:`~airflow.contrib.operators.gcp_compute_operator
:start-after: [START howto_operator_gce_stop]
:end-before: [END howto_operator_gce_stop]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py
:language: python
:dedent: 4
:start-after: [START gce_instance_stop_template_fields]
:end-before: [END gce_instance_stop_template_fields]
More information
""""""""""""""""
See `Google Compute Engine API documentation <https://cloud.google.com/compute/docs/reference/rest/v1/instances/stop>`_
GceSetMachineTypeOperator
^^^^^^^^^^^^^^^^^^^^^^^^^
Allows to change the machine type for a stopped instance to the specified machine type.
Use the operator to change machine type of a Google Compute Engine instance.
For parameter definition take a look at :class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator` above.
For parameter definition take a look at
:class:`~airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator`
Define the :class:`~airflow.contrib.operators.gcp_compute_operator
.GceSetMachineTypeOperator` by passing the required arguments to the constructor.
Arguments
"""""""""
The following examples of OS environment variables show how you can build function name
to use in the operator and build default args to pass them to multiple tasks:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
:language: python
:start-after: [START howto_operator_gce_args_common]
:end-before: [END howto_operator_gce_args_common]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
:language: python
:start-after: [START howto_operator_gce_args_set_machine_type]
:end-before: [END howto_operator_gce_args_set_machine_type]
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
:language: python
@ -157,26 +226,163 @@ Define the :class:`~airflow.contrib.operators.gcp_compute_operator
:start-after: [START howto_operator_gce_set_machine_type]
:end-before: [END howto_operator_gce_set_machine_type]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py
:language: python
:dedent: 4
:start-after: [START gce_instance_set_machine_type_template_fields]
:end-before: [END gce_instance_set_machine_type_template_fields]
More information
""""""""""""""""
See `Google Compute Engine API documentation <https://cloud.google.com/compute/docs/reference/rest/v1/instances/setMachineType>`_
GceInstanceTemplateCopyOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Use the operator to copy an existing Google Compute Engine instance template
applying a patch to it.
For parameter definition take a look at
:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceTemplateCopyOperator`.
Arguments
"""""""""
The following examples of OS environment variables show how you can build parameters
passed to the operator and build default args to pass them to multiple tasks:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py
:language: python
:start-after: [START howto_operator_compute_igm_common_args]
:end-before: [END howto_operator_compute_igm_common_args]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py
:language: python
:start-after: [START howto_operator_compute_template_copy_args]
:end-before: [END howto_operator_compute_template_copy_args]
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gce_igm_copy_template]
:end-before: [END howto_operator_gce_igm_copy_template]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py
:language: python
:dedent: 4
:start-after: [START gce_instance_template_copy_operator_template_fields]
:end-before: [END gce_instance_template_copy_operator_template_fields]
More information
""""""""""""""""
See `Google Compute Engine API documentation <https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates>`_
GceInstanceGroupManagerUpdateTemplateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Use the operator to update template in Google Compute Engine Instance Group Manager.
For parameter definition take a look at
:class:`~airflow.contrib.operators.gcp_compute_operator.GceInstanceGroupManagerUpdateTemplateOperator`.
Arguments
"""""""""
The following examples of OS environment variables show how you can build parameters
passed to the operator and build default args to pass them to multiple tasks:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py
:language: python
:start-after: [START howto_operator_compute_igm_common_args]
:end-before: [END howto_operator_compute_igm_common_args]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py
:language: python
:start-after: [START howto_operator_compute_igm_update_template_args]
:end-before: [END howto_operator_compute_igm_update_template_args]
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gce_igm_update_template]
:end-before: [END howto_operator_gce_igm_update_template]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_compute_operator.py
:language: python
:dedent: 4
:start-after: [START gce_igm_update_template_operator_template_fields]
:end-before: [END gce_igm_update_template_operator_template_fields]
Troubleshooting
"""""""""""""""
You might find that your GceInstanceGroupManagerUpdateTemplateOperator fails with
missing permissions. The service account has to have Service Account User role assigned
via IAM permissions in order to execute the operation.
More information
""""""""""""""""
See `Google Compute Engine API documentation <https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers>`_
Google Cloud Functions Operators
--------------------------------
GcfFunctionDeleteOperator
^^^^^^^^^^^^^^^^^^^^^^^^^
Use the ``default_args`` dict to pass arguments to the operator.
Use the operator to delete a function from Google Cloud Functions.
For parameter definition take a look at
:class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator`.
Arguments
"""""""""
The following examples of OS environment variables show how you can build function name
to use in the operator and build default args to pass them to multiple tasks:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py
:language: python
:start-after: [START howto_operator_gcf_delete_args]
:end-before: [END howto_operator_gcf_delete_args]
Use the :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator`
to delete a function from Google Cloud Functions.
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcf_delete]
:end-before: [END howto_operator_gcf_delete]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_function_operator.py
:language: python
:dedent: 4
:start-after: [START gce_function_delete_template_operator_template_fields]
:end-before: [END gce_function_delete_template_operator_template_fields]
Troubleshooting
"""""""""""""""
If you want to run or deploy an operator using a service account and get “forbidden 403”
@ -191,7 +397,6 @@ The typical way of assigning Cloud IAM permissions with `gcloud` is
shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project
and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
.. code-block:: bash
gcloud iam service-accounts add-iam-policy-binding \
@ -202,13 +407,24 @@ and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
See `Adding the IAM service agent user role to the runtime service <https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account>`_ for details
More information
""""""""""""""""
See `Google Cloud Functions API documentation <https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/delete>`_
GcfFunctionDeployOperator
^^^^^^^^^^^^^^^^^^^^^^^^^
Use the :class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator`
to deploy a function from Google Cloud Functions.
Use the operator to deploy a function to Google Cloud Functions.
The following examples of Airflow variables show various variants and combinations
For parameter definition take a look at
:class:`~airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator`.
Arguments
"""""""""
The following examples of OS environment variables show various variants and combinations
of default_args that you can use. The variables are defined as follows:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
@ -223,11 +439,12 @@ With those variables you can define the body of the request:
:start-after: [START howto_operator_gcf_deploy_body]
:end-before: [END howto_operator_gcf_deploy_body]
When you create a DAG, the default_args dictionary can be used to pass the body and
other arguments:
When you create a DAG, the default_args dictionary can be used to pass
arguments common with other tasks:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcf_deploy_args]
:end-before: [END howto_operator_gcf_deploy_args]
@ -235,11 +452,14 @@ Note that the neither the body nor the default args are complete in the above ex
Depending on the set variables, there might be different variants on how to pass source
code related fields. Currently, you can pass either sourceArchiveUrl, sourceRepository
or sourceUploadUrl as described in the
`CloudFunction API specification <https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#CloudFunction>`_.
`Cloud Functions API specification <https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#CloudFunction>`_.
Additionally, default_args might contain zip_path parameter to run the extra step of
uploading the source code before deploying it. In the last case, you also need to
provide an empty `sourceUploadUrl` parameter in the body.
Using the operator
""""""""""""""""""
Based on the variables defined above, example logic of setting the source code
related fields is shown here:
@ -252,9 +472,20 @@ The code to create the operator:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcf_deploy]
:end-before: [END howto_operator_gcf_deploy]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_function_operator.py
:language: python
:dedent: 4
:start-after: [START gce_function_deploy_template_operator_template_fields]
:end-before: [END gce_function_deploy_template_operator_template_fields]
Troubleshooting
"""""""""""""""
@ -277,13 +508,20 @@ and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
--member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
--role="roles/iam.serviceAccountUser"
See `Adding the IAM service agent user role to the runtime service <https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account>`_ for details
If the source code for your function is in Google Source Repository, make sure that
your service account has the Source Repository Viewer role so that the source code
can be downloaded if necessary.
More information
""""""""""""""""
See `Google Cloud Functions API documentation <https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions/create>`_
Google Cloud Sql Operators
--------------------------
CloudSqlInstanceDatabaseCreateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Просмотреть файл

@ -100,28 +100,28 @@ field (see connection `wasb_default` for an example).
.. _WasbBlobSensor:
WasbBlobSensor
"""""""""""""""
""""""""""""""
.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbBlobSensor
.. _WasbPrefixSensor:
WasbPrefixSensor
"""""""""""""""""
""""""""""""""""
.. autoclass:: airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor
.. _FileToWasbOperator:
FileToWasbOperator
"""""""""""""""""""
""""""""""""""""""
.. autoclass:: airflow.contrib.operators.file_to_wasb.FileToWasbOperator
.. _WasbHook:
WasbHook
"""""""""
""""""""
.. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook
@ -582,6 +582,16 @@ Compute Engine Operators
- :ref:`GceInstanceStartOperator` : start an existing Google Compute Engine instance.
- :ref:`GceInstanceStopOperator` : stop an existing Google Compute Engine instance.
- :ref:`GceSetMachineTypeOperator` : change the machine type for a stopped instance.
- :ref:`GceInstanceTemplateCopyOperator` : copy the Instance Template, applying
specified changes.
- :ref:`GceInstanceGroupManagerUpdateTemplateOperator` : patch the Instance Group Manager,
replacing source Instance Template URL with the destination one.
The operators have common base operator:
.. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceBaseOperator
They also use :ref:`GceHook` hook to communicate with Google Cloud Platform.
.. _GceInstanceStartOperator:
@ -604,6 +614,28 @@ GceSetMachineTypeOperator
.. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator
.. _GceInstanceTemplateCopyOperator:
GceInstanceTemplateCopyOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceInstanceTemplateCopyOperator
.. _GceInstanceGroupManagerUpdateTemplateOperator:
GceInstanceGroupManagerUpdateTemplateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceInstanceGroupManagerUpdateTemplateOperator
.. _GceHook:
Compute Engine Hook
"""""""""""""""""""
.. autoclass:: airflow.contrib.hooks.gcp_compute_hook.GceHook
:members:
Cloud Functions
'''''''''''''''
@ -616,6 +648,8 @@ Cloud Functions Operators
.. autoclass:: airflow.contrib.operators.gcp_operator.GCP
They also use :ref:`GcfHook` hook to communicate with Google Cloud Platform.
.. _GcfFunctionDeployOperator:
GcfFunctionDeployOperator
@ -632,6 +666,8 @@ GcfFunctionDeleteOperator
.. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator
.. _GcfHook:
Cloud Functions Hook
""""""""""""""""""""
@ -741,7 +777,7 @@ DataprocClusterCreateOperator
.. _DataprocClusterScaleOperator:
DataprocClusterScaleOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator

Просмотреть файл

@ -309,6 +309,7 @@ def do_setup():
'gitpython>=2.0.2',
'gunicorn>=19.4.0, <20.0',
'iso8601>=0.1.12',
'json-merge-patch==0.2',
'jinja2>=2.7.3, <2.9.0',
'lxml>=4.0.0',
'markdown>=2.5.2, <3.0',

Просмотреть файл

@ -18,10 +18,15 @@
# under the License.
import ast
import unittest
from copy import deepcopy
import httplib2
from googleapiclient.errors import HttpError
from airflow import AirflowException, configuration
from airflow.contrib.operators.gcp_compute_operator import GceInstanceStartOperator, \
GceInstanceStopOperator, GceSetMachineTypeOperator
GceInstanceStopOperator, GceSetMachineTypeOperator, GceInstanceTemplateCopyOperator, \
GceInstanceGroupManagerUpdateTemplateOperator
from airflow.models import TaskInstance, DAG
from airflow.utils import timezone
@ -34,12 +39,14 @@ except ImportError:
except ImportError:
mock = None
EMPTY_CONTENT = ''.encode('utf8')
PROJECT_ID = 'project-id'
LOCATION = 'zone'
ZONE = 'zone'
RESOURCE_ID = 'resource-id'
SHORT_MACHINE_TYPE_NAME = 'n1-machine-type'
SET_MACHINE_TYPE_BODY = {
'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, SHORT_MACHINE_TYPE_NAME)
'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, SHORT_MACHINE_TYPE_NAME)
}
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
@ -51,7 +58,7 @@ class GceInstanceStartTest(unittest.TestCase):
mock_hook.return_value.start_instance.return_value = True
op = GceInstanceStartOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=RESOURCE_ID,
task_id='id'
)
@ -59,11 +66,11 @@ class GceInstanceStartTest(unittest.TestCase):
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
mock_hook.return_value.start_instance.assert_called_once_with(
PROJECT_ID, LOCATION, RESOURCE_ID
PROJECT_ID, ZONE, RESOURCE_ID
)
self.assertTrue(result)
# Setting all of the operator's input parameters as templated dag_ids
# Setting all of the operator's input parameters as template dag_ids
# (could be anything else) just to test if the templating works for all fields
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_instance_start_with_templates(self, mock_hook):
@ -95,7 +102,7 @@ class GceInstanceStartTest(unittest.TestCase):
with self.assertRaises(AirflowException) as cm:
op = GceInstanceStartOperator(
project_id="",
zone=LOCATION,
zone=ZONE,
resource_id=RESOURCE_ID,
task_id='id'
)
@ -123,7 +130,7 @@ class GceInstanceStartTest(unittest.TestCase):
with self.assertRaises(AirflowException) as cm:
op = GceInstanceStartOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id="",
task_id='id'
)
@ -132,12 +139,14 @@ class GceInstanceStartTest(unittest.TestCase):
self.assertIn("The required parameter 'resource_id' is missing", str(err))
mock_hook.assert_not_called()
class GceInstanceStopTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_instance_stop(self, mock_hook):
mock_hook.return_value.stop_instance.return_value = True
op = GceInstanceStopOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=RESOURCE_ID,
task_id='id'
)
@ -145,7 +154,7 @@ class GceInstanceStartTest(unittest.TestCase):
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
mock_hook.return_value.stop_instance.assert_called_once_with(
PROJECT_ID, LOCATION, RESOURCE_ID
PROJECT_ID, ZONE, RESOURCE_ID
)
self.assertTrue(result)
@ -181,7 +190,7 @@ class GceInstanceStartTest(unittest.TestCase):
with self.assertRaises(AirflowException) as cm:
op = GceInstanceStopOperator(
project_id="",
zone=LOCATION,
zone=ZONE,
resource_id=RESOURCE_ID,
task_id='id'
)
@ -209,7 +218,7 @@ class GceInstanceStartTest(unittest.TestCase):
with self.assertRaises(AirflowException) as cm:
op = GceInstanceStopOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id="",
task_id='id'
)
@ -218,12 +227,14 @@ class GceInstanceStartTest(unittest.TestCase):
self.assertIn("The required parameter 'resource_id' is missing", str(err))
mock_hook.assert_not_called()
class GceInstanceSetMachineTypeTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_set_machine_type(self, mock_hook):
mock_hook.return_value.set_machine_type.return_value = True
op = GceSetMachineTypeOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=RESOURCE_ID,
body=SET_MACHINE_TYPE_BODY,
task_id='id'
@ -232,7 +243,7 @@ class GceInstanceStartTest(unittest.TestCase):
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
mock_hook.return_value.set_machine_type.assert_called_once_with(
PROJECT_ID, LOCATION, RESOURCE_ID, SET_MACHINE_TYPE_BODY
PROJECT_ID, ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY
)
self.assertTrue(result)
@ -269,7 +280,7 @@ class GceInstanceStartTest(unittest.TestCase):
with self.assertRaises(AirflowException) as cm:
op = GceSetMachineTypeOperator(
project_id="",
zone=LOCATION,
zone=ZONE,
resource_id=RESOURCE_ID,
body=SET_MACHINE_TYPE_BODY,
task_id='id'
@ -299,7 +310,7 @@ class GceInstanceStartTest(unittest.TestCase):
with self.assertRaises(AirflowException) as cm:
op = GceSetMachineTypeOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id="",
body=SET_MACHINE_TYPE_BODY,
task_id='id'
@ -314,7 +325,7 @@ class GceInstanceStartTest(unittest.TestCase):
with self.assertRaises(AirflowException) as cm:
op = GceSetMachineTypeOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=RESOURCE_ID,
body={},
task_id='id'
@ -332,10 +343,10 @@ class GceInstanceStartTest(unittest.TestCase):
"'zone': 'https://www.googleapis.com/compute/v1/projects/polidea" \
"-airflow/zones/europe-west3-b', 'operationType': " \
"'setMachineType', 'targetLink': " \
"'https://www.googleapis.com/compute/v1/projects/polidea-airflow" \
"'https://www.googleapis.com/compute/v1/projects/example-airflow" \
"/zones/europe-west3-b/instances/pa-1', 'targetId': " \
"'2480086944131075860', 'status': 'DONE', 'user': " \
"'uberdarek@polidea-airflow.iam.gserviceaccount.com', " \
"'uberdarek@example-airflow.iam.gserviceaccount.com', " \
"'progress': 100, 'insertTime': '2018-10-03T07:50:07.951-07:00', "\
"'startTime': '2018-10-03T07:50:08.324-07:00', 'endTime': " \
"'2018-10-03T07:50:08.484-07:00', 'error': {'errors': [{'code': " \
@ -343,35 +354,688 @@ class GceInstanceStartTest(unittest.TestCase):
"'machine-type-1' does not exist in zone 'europe-west3-b'.\"}]}, "\
"'httpErrorStatusCode': 400, 'httpErrorMessage': 'BAD REQUEST', " \
"'selfLink': " \
"'https://www.googleapis.com/compute/v1/projects/polidea-airflow" \
"'https://www.googleapis.com/compute/v1/projects/example-airflow" \
"/zones/europe-west3-b/operations/operation-1538578207537" \
"-577542784f769-7999ab71-94f9ec1d'} "
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook'
'._check_operation_status')
'._check_zone_operation_status')
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook'
'._execute_set_machine_type')
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook.get_conn')
def test_set_machine_type_should_handle_and_trim_gce_error(
self, get_conn, _execute_set_machine_type, _check_operation_status):
self, get_conn, _execute_set_machine_type, _check_zone_operation_status):
get_conn.return_value = {}
_execute_set_machine_type.return_value = {"name": "test-operation"}
_check_operation_status.return_value = ast.literal_eval(self.MOCK_OP_RESPONSE)
_check_zone_operation_status.return_value = ast.literal_eval(self.MOCK_OP_RESPONSE)
with self.assertRaises(AirflowException) as cm:
op = GceSetMachineTypeOperator(
project_id=PROJECT_ID,
zone=LOCATION,
zone=ZONE,
resource_id=RESOURCE_ID,
body=SET_MACHINE_TYPE_BODY,
task_id='id'
)
op.execute(None)
err = cm.exception
_check_operation_status.assert_called_once_with(
{}, "test-operation", PROJECT_ID, LOCATION)
_check_zone_operation_status.assert_called_once_with(
{}, "test-operation", PROJECT_ID, ZONE)
_execute_set_machine_type.assert_called_once_with(
PROJECT_ID, LOCATION, RESOURCE_ID, SET_MACHINE_TYPE_BODY)
PROJECT_ID, ZONE, RESOURCE_ID, SET_MACHINE_TYPE_BODY)
# Checking the full message was sometimes failing due to different order
# of keys in the serialized JSON
self.assertIn("400 BAD REQUEST: {", str(err)) # checking the square bracket trim
self.assertIn("UNSUPPORTED_OPERATION", str(err))
GCE_INSTANCE_TEMPLATE_NAME = "instance-template-test"
GCE_INSTANCE_TEMPLATE_NEW_NAME = "instance-template-test-new"
GCE_INSTANCE_TEMPLATE_REQUEST_ID = "e12d5b48-4826-4ba9-ada6-0cff1e0b36a6"
GCE_INSTANCE_TEMPLATE_BODY_GET = {
"kind": "compute#instanceTemplate",
"id": "6950321349997439715",
"creationTimestamp": "2018-10-15T06:20:12.777-07:00",
"name": GCE_INSTANCE_TEMPLATE_NAME,
"description": "",
"properties": {
"machineType": "n1-standard-1",
"networkInterfaces": [
{
"kind": "compute#networkInterface",
"network": "https://www.googleapis.com/compute/v1/"
"projects/project/global/networks/default",
"accessConfigs": [
{
"kind": "compute#accessConfig",
"type": "ONE_TO_ONE_NAT",
}
]
},
{
"network": "https://www.googleapis.com/compute/v1/"
"projects/project/global/networks/default",
"accessConfigs": [
{
"kind": "compute#accessConfig",
"networkTier": "PREMIUM"
}
]
}
],
"disks": [
{
"kind": "compute#attachedDisk",
"type": "PERSISTENT",
"licenses": [
"A String",
]
}
],
"metadata": {
"kind": "compute#metadata",
"fingerprint": "GDPUYxlwHe4="
},
},
"selfLink": "https://www.googleapis.com/compute/v1/projects/project"
"/global/instanceTemplates/instance-template-test"
}
GCE_INSTANCE_TEMPLATE_BODY_INSERT = {
"name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
"description": "",
"properties": {
"machineType": "n1-standard-1",
"networkInterfaces": [
{
"network": "https://www.googleapis.com/compute/v1/"
"projects/project/global/networks/default",
"accessConfigs": [
{
"type": "ONE_TO_ONE_NAT",
}
]
},
{
"network": "https://www.googleapis.com/compute/v1/"
"projects/project/global/networks/default",
"accessConfigs": [
{
"networkTier": "PREMIUM"
}
]
}
],
"disks": [
{
"type": "PERSISTENT",
}
],
"metadata": {
"fingerprint": "GDPUYxlwHe4="
},
},
}
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_GET)
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW['name'] = GCE_INSTANCE_TEMPLATE_NEW_NAME
class GceInstanceTemplateCopyTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_copy_template(self, mock_hook):
mock_hook.return_value.get_instance_template.side_effect = [
HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT),
GCE_INSTANCE_TEMPLATE_BODY_GET,
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
]
op = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=GCE_INSTANCE_TEMPLATE_NAME,
task_id='id',
body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME}
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
mock_hook.return_value.insert_instance_template.assert_called_once_with(
project_id=PROJECT_ID,
body=GCE_INSTANCE_TEMPLATE_BODY_INSERT,
request_id=None
)
self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_idempotent_copy_template_when_already_copied(self, mock_hook):
mock_hook.return_value.get_instance_template.side_effect = [
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
]
op = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=GCE_INSTANCE_TEMPLATE_NAME,
task_id='id',
body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME}
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
mock_hook.return_value.insert_instance_template.assert_not_called()
self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_copy_template_with_request_id(self, mock_hook):
mock_hook.return_value.get_instance_template.side_effect = [
HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT),
GCE_INSTANCE_TEMPLATE_BODY_GET,
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
]
op = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=GCE_INSTANCE_TEMPLATE_NAME,
request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
task_id='id',
body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME}
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
mock_hook.return_value.insert_instance_template.assert_called_once_with(
project_id=PROJECT_ID,
body=GCE_INSTANCE_TEMPLATE_BODY_INSERT,
request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
)
self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_copy_template_with_description_fields(self, mock_hook):
mock_hook.return_value.get_instance_template.side_effect = [
HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT),
GCE_INSTANCE_TEMPLATE_BODY_GET,
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
]
op = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=GCE_INSTANCE_TEMPLATE_NAME,
request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
task_id='id',
body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
"description": "New description"}
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
body_insert["description"] = "New description"
mock_hook.return_value.insert_instance_template.assert_called_once_with(
project_id=PROJECT_ID,
body=body_insert,
request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
)
self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_copy_with_some_validation_warnings(self, mock_hook):
mock_hook.return_value.get_instance_template.side_effect = [
HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT),
GCE_INSTANCE_TEMPLATE_BODY_GET,
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
]
op = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=GCE_INSTANCE_TEMPLATE_NAME,
task_id='id',
body_patch={"name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
"some_wrong_field": "test",
"properties": {
"some_other_wrong_field": "test"
}}
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
body_insert["some_wrong_field"] = "test"
body_insert["properties"]["some_other_wrong_field"] = "test"
mock_hook.return_value.insert_instance_template.assert_called_once_with(
project_id=PROJECT_ID,
body=body_insert,
request_id=None,
)
self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_copy_template_with_updated_nested_fields(self, mock_hook):
mock_hook.return_value.get_instance_template.side_effect = [
HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT),
GCE_INSTANCE_TEMPLATE_BODY_GET,
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
]
op = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=GCE_INSTANCE_TEMPLATE_NAME,
task_id='id',
body_patch={
"name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
"properties": {
"machineType": "n1-standard-2",
}
}
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
body_insert["properties"]["machineType"] = "n1-standard-2"
mock_hook.return_value.insert_instance_template.assert_called_once_with(
project_id=PROJECT_ID,
body=body_insert,
request_id=None
)
self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_copy_template_with_smaller_array_fields(self, mock_hook):
mock_hook.return_value.get_instance_template.side_effect = [
HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT),
GCE_INSTANCE_TEMPLATE_BODY_GET,
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
]
op = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=GCE_INSTANCE_TEMPLATE_NAME,
task_id='id',
body_patch={
"name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
"properties": {
"machineType": "n1-standard-1",
"networkInterfaces": [
{
"network": "https://www.googleapis.com/compute/v1/"
"projects/project/global/networks/default",
"accessConfigs": [
{
"type": "ONE_TO_ONE_NAT",
"natIP": "8.8.8.8"
}
]
}
]
}
}
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
body_insert["properties"]["networkInterfaces"] = [
{
"network": "https://www.googleapis.com/compute/v1/"
"projects/project/global/networks/default",
"accessConfigs": [
{
"type": "ONE_TO_ONE_NAT",
"natIP": "8.8.8.8"
}
]
}
]
mock_hook.return_value.insert_instance_template.assert_called_once_with(
project_id=PROJECT_ID,
body=body_insert,
request_id=None
)
self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_copy_template_with_bigger_array_fields(self, mock_hook):
mock_hook.return_value.get_instance_template.side_effect = [
HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT),
GCE_INSTANCE_TEMPLATE_BODY_GET,
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
]
op = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=GCE_INSTANCE_TEMPLATE_NAME,
task_id='id',
body_patch={
"name": GCE_INSTANCE_TEMPLATE_NEW_NAME,
"properties": {
"disks": [
{
"kind": "compute#attachedDisk",
"type": "SCRATCH",
"licenses": [
"Updated String",
]
},
{
"kind": "compute#attachedDisk",
"type": "PERSISTENT",
"licenses": [
"Another String",
]
}
],
}
}
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='v1',
gcp_conn_id='google_cloud_default')
body_insert = deepcopy(GCE_INSTANCE_TEMPLATE_BODY_INSERT)
body_insert["properties"]["disks"] = [
{
"kind": "compute#attachedDisk",
"type": "SCRATCH",
"licenses": [
"Updated String",
]
},
{
"kind": "compute#attachedDisk",
"type": "PERSISTENT",
"licenses": [
"Another String",
]
}
]
mock_hook.return_value.insert_instance_template.assert_called_once_with(
project_id=PROJECT_ID,
body=body_insert,
request_id=None,
)
self.assertEqual(GCE_INSTANCE_TEMPLATE_BODY_GET_NEW, result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_missing_name(self, mock_hook):
mock_hook.return_value.get_instance_template.side_effect = [
HttpError(resp=httplib2.Response({'status': 404}), content=EMPTY_CONTENT),
GCE_INSTANCE_TEMPLATE_BODY_GET,
GCE_INSTANCE_TEMPLATE_BODY_GET_NEW
]
with self.assertRaises(AirflowException) as cm:
op = GceInstanceTemplateCopyOperator(
project_id=PROJECT_ID,
resource_id=GCE_INSTANCE_TEMPLATE_NAME,
request_id=GCE_INSTANCE_TEMPLATE_REQUEST_ID,
task_id='id',
body_patch={"description": "New description"}
)
op.execute(None)
err = cm.exception
self.assertIn("should contain at least name for the new operator "
"in the 'name' field", str(err))
mock_hook.assert_not_called()
GCE_INSTANCE_GROUP_MANAGER_NAME = "instance-group-test"
GCE_INSTANCE_TEMPLATE_SOURCE_URL = \
"https://www.googleapis.com/compute/beta/projects/project" \
"/global/instanceTemplates/instance-template-test"
GCE_INSTANCE_TEMPLATE_OTHER_URL = \
"https://www.googleapis.com/compute/beta/projects/project" \
"/global/instanceTemplates/instance-template-other"
GCE_INSTANCE_TEMPLATE_NON_EXISTING_URL = \
"https://www.googleapis.com/compute/beta/projects/project" \
"/global/instanceTemplates/instance-template-non-existing"
GCE_INSTANCE_TEMPLATE_DESTINATION_URL = \
"https://www.googleapis.com/compute/beta/projects/project" \
"/global/instanceTemplates/instance-template-new"
GCE_INSTANCE_GROUP_MANAGER_GET = {
"kind": "compute#instanceGroupManager",
"id": "2822359583810032488",
"creationTimestamp": "2018-10-17T05:39:35.793-07:00",
"name": GCE_INSTANCE_GROUP_MANAGER_NAME,
"zone": "https://www.googleapis.com/compute/beta/projects/project/zones/zone",
"instanceTemplate": GCE_INSTANCE_TEMPLATE_SOURCE_URL,
"versions": [
{
"name": "v1",
"instanceTemplate": GCE_INSTANCE_TEMPLATE_SOURCE_URL,
"targetSize": {
"calculated": 1
}
},
{
"name": "v2",
"instanceTemplate": GCE_INSTANCE_TEMPLATE_OTHER_URL,
}
],
"instanceGroup": GCE_INSTANCE_TEMPLATE_SOURCE_URL,
"baseInstanceName": GCE_INSTANCE_GROUP_MANAGER_NAME,
"fingerprint": "BKWB_igCNbQ=",
"currentActions": {
"none": 1,
"creating": 0,
"creatingWithoutRetries": 0,
"verifying": 0,
"recreating": 0,
"deleting": 0,
"abandoning": 0,
"restarting": 0,
"refreshing": 0
},
"pendingActions": {
"creating": 0,
"deleting": 0,
"recreating": 0,
"restarting": 0
},
"targetSize": 1,
"selfLink": "https://www.googleapis.com/compute/beta/projects/project/zones/"
"zone/instanceGroupManagers/" + GCE_INSTANCE_GROUP_MANAGER_NAME,
"autoHealingPolicies": [
{
"initialDelaySec": 300
}
],
"serviceAccount": "198907790164@cloudservices.gserviceaccount.com"
}
GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH = {
"instanceTemplate": GCE_INSTANCE_TEMPLATE_DESTINATION_URL,
"versions": [
{
"name": "v1",
"instanceTemplate": GCE_INSTANCE_TEMPLATE_DESTINATION_URL,
"targetSize": {
"calculated": 1
}
},
{
"name": "v2",
"instanceTemplate": GCE_INSTANCE_TEMPLATE_OTHER_URL,
}
],
}
GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID = "e12d5b48-4826-4ba9-ada6-0cff1e0b36a6"
GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY = {
"type": "OPPORTUNISTIC",
"minimalAction": "RESTART",
"maxSurge": {
"fixed": 1
},
"maxUnavailable": {
"percent": 10
},
"minReadySec": 1800
}
class GceInstanceGroupManagerUpdateTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_instance_group_update(self, mock_hook):
mock_hook.return_value.get_instance_group_manager.return_value = \
deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
op = GceInstanceGroupManagerUpdateTemplateOperator(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
task_id='id',
source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='beta',
gcp_conn_id='google_cloud_default')
mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
body=GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH,
request_id=None
)
self.assertTrue(result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_instance_group_update_no_instance_template_field(self, mock_hook):
instance_group_manager_no_template = deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
del instance_group_manager_no_template['instanceTemplate']
mock_hook.return_value.get_instance_group_manager.return_value = \
instance_group_manager_no_template
op = GceInstanceGroupManagerUpdateTemplateOperator(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
task_id='id',
source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='beta',
gcp_conn_id='google_cloud_default')
expected_patch_no_instance_template = \
deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH)
del expected_patch_no_instance_template['instanceTemplate']
mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
body=expected_patch_no_instance_template,
request_id=None
)
self.assertTrue(result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_instance_group_update_no_versions_field(self, mock_hook):
instance_group_manager_no_versions = deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
del instance_group_manager_no_versions['versions']
mock_hook.return_value.get_instance_group_manager.return_value = \
instance_group_manager_no_versions
op = GceInstanceGroupManagerUpdateTemplateOperator(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
task_id='id',
source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='beta',
gcp_conn_id='google_cloud_default')
expected_patch_no_versions = \
deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH)
del expected_patch_no_versions['versions']
mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
body=expected_patch_no_versions,
request_id=None
)
self.assertTrue(result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_instance_group_update_with_update_policy(self, mock_hook):
mock_hook.return_value.get_instance_group_manager.return_value = \
deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
op = GceInstanceGroupManagerUpdateTemplateOperator(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
task_id='id',
update_policy=GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY,
source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='beta',
gcp_conn_id='google_cloud_default')
expected_patch_with_update_policy = \
deepcopy(GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH)
expected_patch_with_update_policy['updatePolicy'] = \
GCE_INSTANCE_GROUP_MANAGER_UPDATE_POLICY
mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
body=expected_patch_with_update_policy,
request_id=None
)
self.assertTrue(result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_successful_instance_group_update_with_request_id(self, mock_hook):
mock_hook.return_value.get_instance_group_manager.return_value = \
deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
op = GceInstanceGroupManagerUpdateTemplateOperator(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
task_id='id',
source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
request_id=GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID,
destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='beta',
gcp_conn_id='google_cloud_default')
mock_hook.return_value.patch_instance_group_manager.assert_called_once_with(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
body=GCE_INSTANCE_GROUP_MANAGER_EXPECTED_PATCH,
request_id=GCE_INSTANCE_GROUP_MANAGER_REQUEST_ID
)
self.assertTrue(result)
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_try_to_use_api_v1(self, mock_hook):
with self.assertRaises(AirflowException) as cm:
GceInstanceGroupManagerUpdateTemplateOperator(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
task_id='id',
api_version='v1',
source_template=GCE_INSTANCE_TEMPLATE_SOURCE_URL,
destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
)
err = cm.exception
self.assertIn("Use beta api version or above", str(err))
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
def test_try_to_use_non_existing_template(self, mock_hook):
mock_hook.return_value.get_instance_group_manager.return_value = \
deepcopy(GCE_INSTANCE_GROUP_MANAGER_GET)
op = GceInstanceGroupManagerUpdateTemplateOperator(
project_id=PROJECT_ID,
zone=ZONE,
resource_id=GCE_INSTANCE_GROUP_MANAGER_NAME,
task_id='id',
source_template=GCE_INSTANCE_TEMPLATE_NON_EXISTING_URL,
destination_template=GCE_INSTANCE_TEMPLATE_DESTINATION_URL
)
result = op.execute(None)
mock_hook.assert_called_once_with(api_version='beta',
gcp_conn_id='google_cloud_default')
mock_hook.return_value.patch_instance_group_manager.assert_not_called()
self.assertTrue(result)