[AIRFLOW-3446] Add Google Cloud BigTable operators (#4354)

This commit is contained in:
Dariusz Aniszewski 2019-01-04 14:50:15 +01:00 коммит произвёл Kaxil Naik
Родитель 6f616f0dd9
Коммит d0233ba643
7 изменённых файлов: 1539 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# 'License'); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that creates and performs following operations on Cloud Bigtable:
- creates an Instance
- creates a Table
- updates Cluster
- waits for Table replication completeness
- deletes the Table
- deletes the Instance
This DAG relies on the following environment variables
* GCP_PROJECT_ID - Google Cloud Platform project
* CBT_INSTANCE_ID - desired ID of a Cloud Bigtable instance
* CBT_INSTANCE_DISPLAY_NAME - desired human-readable display name of the Instance
* CBT_INSTANCE_TYPE - type of the Instance, e.g. 1 for DEVELOPMENT
See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance
* CBT_INSTANCE_LABELS - labels to add for the Instance
* CBT_CLUSTER_ID - desired ID of the main Cluster created for the Instance
* CBT_CLUSTER_ZONE - zone in which main Cluster will be created. e.g. europe-west1-b
See available zones: https://cloud.google.com/bigtable/docs/locations
* CBT_CLUSTER_NODES - initial amount of nodes of the Cluster
* CBT_CLUSTER_NODES_UPDATED - amount of nodes for BigtableClusterUpdateOperator
* CBT_CLUSTER_STORAGE_TYPE - storage for the Cluster, e.g. 1 for SSD
See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.cluster # noqa: E501
* CBT_TABLE_ID - desired ID of the Table
* CBT_POKE_INTERVAL - number of seconds between every attempt of Sensor check
"""
import datetime
import json
from os import getenv
import airflow
from airflow import models
from airflow.contrib.operators.gcp_bigtable_operator import BigtableInstanceCreateOperator, \
BigtableInstanceDeleteOperator, BigtableClusterUpdateOperator, BigtableTableCreateOperator, \
BigtableTableWaitForReplicationSensor, BigtableTableDeleteOperator
# [START howto_operator_gcp_bigtable_args]
GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project')
CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id')
CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name')
CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2')
CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}')
CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id')
CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b')
CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3')
CBT_CLUSTER_NODES_UPDATED = getenv('CBT_CLUSTER_NODES_UPDATED', '5')
CBT_CLUSTER_STORAGE_TYPE = getenv('CBT_CLUSTER_STORAGE_TYPE', '2')
CBT_TABLE_ID = getenv('CBT_TABLE_ID', 'some-table-id')
CBT_POKE_INTERVAL = getenv('CBT_POKE_INTERVAL', '60')
# [END howto_operator_gcp_bigtable_args]
default_args = {
'start_date': airflow.utils.dates.days_ago(1)
}
with models.DAG(
'example_gcp_bigtable_operators',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)
) as dag:
# [START howto_operator_gcp_bigtable_instance_create]
create_instance_task = BigtableInstanceCreateOperator(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=int(CBT_INSTANCE_TYPE),
instance_labels=json.loads(CBT_INSTANCE_LABELS),
cluster_nodes=int(CBT_CLUSTER_NODES),
cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
task_id='create_instance',
)
# [END howto_operator_gcp_bigtable_instance_create]
# [START howto_operator_gcp_bigtable_cluster_update]
cluster_update_task = BigtableClusterUpdateOperator(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
cluster_id=CBT_CLUSTER_ID,
nodes=int(CBT_CLUSTER_NODES_UPDATED),
task_id='update_cluster',
)
# [END howto_operator_gcp_bigtable_cluster_update]
# [START howto_operator_gcp_bigtable_instance_delete]
delete_instance_task = BigtableInstanceDeleteOperator(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
task_id='delete_instance',
)
# [END howto_operator_gcp_bigtable_instance_delete]
# [START howto_operator_gcp_bigtable_table_create]
create_table_task = BigtableTableCreateOperator(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id='create_table',
)
# [END howto_operator_gcp_bigtable_table_create]
# [START howto_operator_gcp_bigtable_table_wait_for_replication]
wait_for_table_replication_task = BigtableTableWaitForReplicationSensor(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
poke_interval=int(CBT_POKE_INTERVAL),
task_id='wait_for_table_replication',
)
# [END howto_operator_gcp_bigtable_table_wait_for_replication]
# [START howto_operator_gcp_bigtable_table_delete]
delete_table_task = BigtableTableDeleteOperator(
project_id=GCP_PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id='delete_table',
)
# [END howto_operator_gcp_bigtable_table_delete]
wait_for_table_replication_task >> delete_table_task
create_instance_task \
>> create_table_task \
>> cluster_update_task \
>> delete_table_task \
>> delete_instance_task

Просмотреть файл

@ -0,0 +1,231 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from google.cloud.bigtable import Client
from google.cloud.bigtable.cluster import Cluster
from google.cloud.bigtable.instance import Instance
from google.cloud.bigtable.table import Table
from google.cloud.bigtable_admin_v2 import enums
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
class BigtableHook(GoogleCloudBaseHook):
"""
Hook for Google Cloud Bigtable APIs.
"""
_client = None
def __init__(self,
gcp_conn_id='google_cloud_default',
delegate_to=None):
super(BigtableHook, self).__init__(gcp_conn_id, delegate_to)
def get_client(self, project_id):
if not self._client:
self._client = Client(project=project_id, credentials=self._get_credentials(), admin=True)
return self._client
def get_instance(self, project_id, instance_id):
"""
Retrieves and returns the specified Cloud Bigtable instance if it exists.
Otherwise, returns None.
:param project_id: The ID of the GCP project.
:type project_id: str
:param instance_id: The ID of the Cloud Bigtable instance.
:type instance_id: str
"""
client = self.get_client(project_id)
instance = Instance(instance_id, client)
if not instance.exists():
return None
return instance
def delete_instance(self, project_id, instance_id):
"""
Deletes the specified Cloud Bigtable instance.
Raises google.api_core.exceptions.NotFound if the Cloud Bigtable instance does not exist.
:param project_id: The ID of the GCP project.
:type project_id: str
:param instance_id: The ID of the Cloud Bigtable instance.
:type instance_id: str
"""
instance = Instance(instance_id, self.get_client(project_id))
instance.delete()
def create_instance(self,
project_id,
instance_id,
main_cluster_id,
main_cluster_zone,
replica_cluster_id=None,
replica_cluster_zone=None,
instance_display_name=None,
instance_type=enums.Instance.Type.TYPE_UNSPECIFIED,
instance_labels=None,
cluster_nodes=None,
cluster_storage_type=enums.StorageType.STORAGE_TYPE_UNSPECIFIED,
timeout=None):
"""
Creates new instance.
:type project_id: str
:param project_id: The ID of the GCP project.
:type instance_id: str
:param instance_id: The ID for the new instance.
:type main_cluster_id: str
:param main_cluster_id: The ID for main cluster for the new instance.
:type main_cluster_zone: str
:param main_cluster_zone: The zone for main cluster.
See https://cloud.google.com/bigtable/docs/locations for more details.
:type replica_cluster_id: str
:param replica_cluster_id: (optional) The ID for replica cluster for the new instance.
:type replica_cluster_zone: str
:param replica_cluster_zone: (optional) The zone for replica cluster.
:type instance_type: enums.Instance.Type
:param instance_type: (optional) The type of the instance.
:type instance_display_name: str
:param instance_display_name: (optional) Human-readable name of the instance.
Defaults to ``instance_id``.
:type instance_labels: dict
:param instance_labels: (optional) Dictionary of labels to associate with the instance.
:type cluster_nodes: int
:param cluster_nodes: (optional) Number of nodes for cluster.
:type cluster_storage_type: enums.StorageType
:param cluster_storage_type: (optional) The type of storage.
:type timeout: int
:param timeout: (optional) timeout (in seconds) for instance creation.
If None is not specified, Operator will wait indefinitely.
"""
cluster_storage_type = enums.StorageType(cluster_storage_type)
instance_type = enums.Instance.Type(instance_type)
instance = Instance(
instance_id,
self.get_client(project_id),
instance_display_name,
instance_type,
instance_labels,
)
clusters = [
instance.cluster(
main_cluster_id,
main_cluster_zone,
cluster_nodes,
cluster_storage_type
)
]
if replica_cluster_id and replica_cluster_zone:
clusters.append(instance.cluster(
replica_cluster_id,
replica_cluster_zone,
cluster_nodes,
cluster_storage_type
))
operation = instance.create(
clusters=clusters
)
operation.result(timeout)
return instance
# noinspection PyMethodMayBeStatic
def create_table(self, instance, table_id, initial_split_keys, column_families):
"""
Creates the specified Cloud Bigtable table.
Raises google.api_core.exceptions.AlreadyExists if the table exists.
:type instance: Instance
:param instance: The Cloud Bigtable instance that owns the table.
:type table_id: str
:param table_id: The ID of the table to create in Cloud Bigtable.
:type initial_split_keys: list
:param initial_split_keys: (Optional) A list of row keys in bytes to use to initially split the table.
:type column_families: dict
:param column_families: (Optional) A map of columns to create. The key is the column_id str, and the
value is a GarbageCollectionRule.
"""
table = Table(table_id, instance)
table.create(initial_split_keys, column_families)
def delete_table(self, project_id, instance_id, table_id):
"""
Deletes the specified table in Cloud Bigtable.
Raises google.api_core.exceptions.NotFound if the table does not exist.
:type project_id: str
:param project_id: The ID of the GCP project.
:type instance_id: str
:param instance_id: The ID of the Cloud Bigtable instance.
:type table_id: str
:param table_id: The ID of the table in Cloud Bigtable.
"""
instance = Instance(instance_id, self.get_client(project_id))
table = Table(table_id, instance)
table.delete()
@staticmethod
def update_cluster(instance, cluster_id, nodes):
"""
Updates number of nodes in the specified Cloud Bigtable cluster.
Raises google.api_core.exceptions.NotFound if the cluster does not exist.
:type instance: Instance
:param instance: The Cloud Bigtable instance that owns the cluster.
:type cluster_id: str
:param cluster_id: The ID of the cluster.
:type nodes: int
:param nodes: The desired number of nodes.
"""
cluster = Cluster(cluster_id, instance)
cluster.serve_nodes = nodes
cluster.update()
@staticmethod
def get_column_families_for_table(instance, table_id):
"""
Fetches Column Families for the specified table in Cloud Bigtable.
:type instance: Instance
:param instance: The Cloud Bigtable instance that owns the table.
:type table_id: str
:param table_id: The ID of the table in Cloud Bigtable to fetch Column Families from.
"""
table = Table(table_id, instance)
return table.list_column_families()
@staticmethod
def get_cluster_states_for_table(instance, table_id):
"""
Fetches Cluster States for the specified table in Cloud Bigtable.
Raises google.api_core.exceptions.NotFound if the table does not exist.
:type instance: Instance
:param instance: The Cloud Bigtable instance that owns the table.
:type table_id: str
:param table_id: The ID of the table in Cloud Bigtable to fetch Cluster States from.
"""
table = Table(table_id, instance)
return table.get_cluster_states()

Просмотреть файл

@ -0,0 +1,424 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import google.api_core.exceptions
from airflow import AirflowException
from airflow.models import BaseOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.contrib.hooks.gcp_bigtable_hook import BigtableHook
from airflow.utils.decorators import apply_defaults
from google.cloud.bigtable_admin_v2 import enums
from google.cloud.bigtable.table import ClusterState
class BigtableValidationMixin(object):
"""
Common class for Cloud Bigtable operators for validating required fields.
"""
REQUIRED_ATTRIBUTES = []
def _validate_inputs(self):
for attr_name in self.REQUIRED_ATTRIBUTES:
if not getattr(self, attr_name):
raise AirflowException('Empty parameter: {}'.format(attr_name))
class BigtableInstanceCreateOperator(BaseOperator, BigtableValidationMixin):
"""
Creates a new Cloud Bigtable instance.
If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration
and immediately succeeds. No changes are made to the existing instance.
For more details about instance creation have a look at the reference:
https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.create
:type project_id: str
:param project_id: The ID of the GCP project.
:type instance_id: str
:param instance_id: The ID of the Cloud Bigtable instance to create.
:type main_cluster_id: str
:param main_cluster_id: The ID for main cluster for the new instance.
:type main_cluster_zone: str
:param main_cluster_zone: The zone for main cluster
See https://cloud.google.com/bigtable/docs/locations for more details.
:type replica_cluster_id: str
:param replica_cluster_id: (optional) The ID for replica cluster for the new instance.
:type replica_cluster_zone: str
:param replica_cluster_zone: (optional) The zone for replica cluster.
:type instance_type: IntEnum
:param instance_type: (optional) The type of the instance.
:type instance_display_name: str
:param instance_display_name: (optional) Human-readable name of the instance. Defaults to ``instance_id``.
:type instance_labels: dict
:param instance_labels: (optional) Dictionary of labels to associate with the instance.
:type cluster_nodes: int
:param cluster_nodes: (optional) Number of nodes for cluster.
:type cluster_storage_type: IntEnum
:param cluster_storage_type: (optional) The type of storage.
:type timeout: int
:param timeout: (optional) timeout (in seconds) for instance creation.
If None is not specified, Operator will wait indefinitely.
"""
REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'main_cluster_id', 'main_cluster_zone')
template_fields = ['project_id', 'instance_id', 'main_cluster_id', 'main_cluster_zone']
@apply_defaults
def __init__(self,
project_id,
instance_id,
main_cluster_id,
main_cluster_zone,
replica_cluster_id=None,
replica_cluster_zone=None,
instance_display_name=None,
instance_type=None,
instance_labels=None,
cluster_nodes=None,
cluster_storage_type=None,
timeout=None,
*args, **kwargs):
self.project_id = project_id
self.instance_id = instance_id
self.main_cluster_id = main_cluster_id
self.main_cluster_zone = main_cluster_zone
self.replica_cluster_id = replica_cluster_id
self.replica_cluster_zone = replica_cluster_zone
self.instance_display_name = instance_display_name
self.instance_type = instance_type
self.instance_labels = instance_labels
self.cluster_nodes = cluster_nodes
self.cluster_storage_type = cluster_storage_type
self.timeout = timeout
self._validate_inputs()
self.hook = BigtableHook()
super(BigtableInstanceCreateOperator, self).__init__(*args, **kwargs)
def execute(self, context):
instance = self.hook.get_instance(self.project_id, self.instance_id)
if instance:
# Based on Instance.__eq__ instance with the same ID and client is considered as equal.
self.log.info(
"The instance '%s' already exists in this project. Consider it as created",
self.instance_id
)
return
try:
self.hook.create_instance(
self.project_id,
self.instance_id,
self.main_cluster_id,
self.main_cluster_zone,
self.replica_cluster_id,
self.replica_cluster_zone,
self.instance_display_name,
self.instance_type,
self.instance_labels,
self.cluster_nodes,
self.cluster_storage_type,
self.timeout,
)
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error('An error occurred. Exiting.')
raise e
class BigtableInstanceDeleteOperator(BaseOperator, BigtableValidationMixin):
"""
Deletes the Cloud Bigtable instance, including its clusters and all related tables.
For more details about deleting instance have a look at the reference:
https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.delete
:type project_id: str
:param project_id: The ID of the GCP project.
:type instance_id: str
:param instance_id: The ID of the Cloud Bigtable instance to delete.
"""
REQUIRED_ATTRIBUTES = ('project_id', 'instance_id')
template_fields = ['project_id', 'instance_id']
@apply_defaults
def __init__(self,
project_id,
instance_id,
*args, **kwargs):
self.project_id = project_id
self.instance_id = instance_id
self._validate_inputs()
self.hook = BigtableHook()
super(BigtableInstanceDeleteOperator, self).__init__(*args, **kwargs)
def execute(self, context):
try:
self.hook.delete_instance(self.project_id, self.instance_id)
except google.api_core.exceptions.NotFound:
self.log.info(
"The instance '%s' does not exist in project '%s'. Consider it as deleted",
self.instance_id, self.project_id
)
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error('An error occurred. Exiting.')
raise e
class BigtableTableCreateOperator(BaseOperator, BigtableValidationMixin):
"""
Creates the table in the Cloud Bigtable instance.
For more details about creating table have a look at the reference:
https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.create
:type project_id: str
:param project_id: The ID of the GCP project.
:type instance_id: str
:param instance_id: The ID of the Cloud Bigtable instance that will hold the new table.
:type table_id: str
:param table_id: The ID of the table to be created.
:type initial_split_keys: list
:param initial_split_keys: (Optional) list of row keys in bytes that will be used to initially split
the table into several tablets.
:type column_families: dict
:param column_families: (Optional) A map columns to create.
The key is the column_id str and the value is a GarbageCollectionRule
"""
REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'table_id')
template_fields = ['project_id', 'instance_id', 'table_id']
@apply_defaults
def __init__(self,
project_id,
instance_id,
table_id,
initial_split_keys=None,
column_families=None,
*args, **kwargs):
self.project_id = project_id
self.instance_id = instance_id
self.table_id = table_id
self.initial_split_keys = initial_split_keys or list()
self.column_families = column_families or dict()
self._validate_inputs()
self.hook = BigtableHook()
self.instance = None
super(BigtableTableCreateOperator, self).__init__(*args, **kwargs)
def _compare_column_families(self):
table_column_families = self.hook.get_column_families_for_table(self.instance, self.table_id)
if set(table_column_families.keys()) != set(self.column_families.keys()):
self.log.error("Table '%s' has different set of Column Families", self.table_id)
self.log.error("Expected: %s", self.column_families.keys())
self.log.error("Actual: %s", table_column_families.keys())
return False
for key in table_column_families.keys():
# There is difference in structure between local Column Families and remote ones
# Local `self.column_families` is dict with column_id as key and GarbageCollectionRule as value.
# Remote `table_column_families` is list of ColumnFamily objects.
# For more information about ColumnFamily please refer to the documentation:
# https://googleapis.github.io/google-cloud-python/latest/bigtable/column-family.html#google.cloud.bigtable.column_family.ColumnFamily
if table_column_families[key].gc_rule != self.column_families[key]:
self.log.error("Column Family '%s' differs for table '%s'.", key, self.table_id)
return False
return True
def execute(self, context):
self.instance = self.hook.get_instance(self.project_id, self.instance_id)
if not self.instance:
raise AirflowException("Dependency: instance '{}' does not exist in project '{}'.".format(
self.instance_id, self.project_id))
try:
self.hook.create_table(
self.instance,
self.table_id,
self.initial_split_keys,
self.column_families
)
except google.api_core.exceptions.AlreadyExists:
if not self._compare_column_families():
raise AirflowException(
"Table '{}' already exists with different Column Families.".format(self.table_id))
self.log.info("The table '%s' already exists. Consider it as created", self.table_id)
class BigtableTableDeleteOperator(BaseOperator, BigtableValidationMixin):
"""
Deletes the Cloud Bigtable table.
For more details about deleting table have a look at the reference:
https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.delete
:type project_id: str
:param project_id: The ID of the GCP project.
:type instance_id: str
:param instance_id: The ID of the Cloud Bigtable instance.
:type table_id: str
:param table_id: The ID of the table to be deleted.
"""
REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'table_id')
template_fields = ['project_id', 'instance_id', 'table_id']
@apply_defaults
def __init__(self,
project_id,
instance_id,
table_id,
app_profile_id=None,
*args, **kwargs):
self.project_id = project_id
self.instance_id = instance_id
self.table_id = table_id
self.app_profile_id = app_profile_id
self._validate_inputs()
self.hook = BigtableHook()
super(BigtableTableDeleteOperator, self).__init__(*args, **kwargs)
def execute(self, context):
instance = self.hook.get_instance(self.project_id, self.instance_id)
if not instance:
raise AirflowException("Dependency: instance '{}' does not exist.".format(self.instance_id))
try:
self.hook.delete_table(
self.project_id,
self.instance_id,
self.table_id,
)
except google.api_core.exceptions.NotFound:
# It's OK if table doesn't exists.
self.log.info("The table '%s' no longer exists. Consider it as deleted", self.table_id)
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error('An error occurred. Exiting.')
raise e
class BigtableClusterUpdateOperator(BaseOperator, BigtableValidationMixin):
"""
Updates a Cloud Bigtable cluster.
For more details about updating a Cloud Bigtable cluster, have a look at the reference:
https://googleapis.github.io/google-cloud-python/latest/bigtable/cluster.html#google.cloud.bigtable.cluster.Cluster.update
:type project_id: str
:param project_id: The ID of the GCP project.
:type instance_id: str
:param instance_id: The ID of the Cloud Bigtable instance.
:type cluster_id: str
:param cluster_id: The ID of the Cloud Bigtable cluster to update.
:type nodes: int
:param nodes: The desired number of nodes for the Cloud Bigtable cluster.
"""
REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'cluster_id', 'nodes')
template_fields = ['project_id', 'instance_id', 'cluster_id', 'nodes']
@apply_defaults
def __init__(self,
project_id,
instance_id,
cluster_id,
nodes,
*args, **kwargs):
self.project_id = project_id
self.instance_id = instance_id
self.cluster_id = cluster_id
self.nodes = nodes
self._validate_inputs()
self.hook = BigtableHook()
super(BigtableClusterUpdateOperator, self).__init__(*args, **kwargs)
def execute(self, context):
instance = self.hook.get_instance(self.project_id, self.instance_id)
if not instance:
raise AirflowException("Dependency: instance '{}' does not exist.".format(self.instance_id))
try:
self.hook.update_cluster(
instance,
self.cluster_id,
self.nodes
)
except google.api_core.exceptions.NotFound:
raise AirflowException("Dependency: cluster '{}' does not exist for instance '{}'.".format(
self.cluster_id,
self.instance_id
))
except google.api_core.exceptions.GoogleAPICallError as e:
self.log.error('An error occurred. Exiting.')
raise e
class BigtableTableWaitForReplicationSensor(BaseSensorOperator, BigtableValidationMixin):
"""
Sensor that waits for Cloud Bigtable table to be fully replicated to its clusters.
No exception will be raised if the instance or the table does not exist.
For more details about cluster states for a table, have a look at the reference:
https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.get_cluster_states
:type project_id: str
:param project_id: The ID of the GCP project.
:type instance_id: str
:param instance_id: The ID of the Cloud Bigtable instance.
:type table_id: str
:param table_id: The ID of the table to check replication status.
"""
REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'table_id')
template_fields = ['project_id', 'instance_id', 'table_id']
@apply_defaults
def __init__(self,
project_id,
instance_id,
table_id,
*args, **kwargs):
self.project_id = project_id
self.instance_id = instance_id
self.table_id = table_id
self._validate_inputs()
self.hook = BigtableHook()
super(BigtableTableWaitForReplicationSensor, self).__init__(*args, **kwargs)
def poke(self, context):
instance = self.hook.get_instance(self.project_id, self.instance_id)
if not instance:
self.log.info("Dependency: instance '%s' does not exist.", self.instance_id)
return False
try:
cluster_states = self.hook.get_cluster_states_for_table(instance, self.table_id)
except google.api_core.exceptions.NotFound:
self.log.info(
"Dependency: table '%s' does not exist in instance '%s'.", self.table_id, self.instance_id)
return False
ready_state = ClusterState(enums.Table.ClusterState.ReplicationState.READY)
is_table_replicated = True
for cluster_id in cluster_states.keys():
if cluster_states[cluster_id] != ready_state:
self.log.info("Table '%s' is not yet replicated on cluster '%s'.", self.table_id, cluster_id)
is_table_replicated = False
if not is_table_replicated:
return False
self.log.info("Table '%s' is replicated.", self.table_id)
return True

Просмотреть файл

@ -361,6 +361,135 @@ More information
See `Google Compute Engine API documentation
<https://cloud.google.com/compute/docs/reference/rest/v1/instanceGroupManagers>`_.
Google Cloud Bigtable Operators
-------------------------------
Arguments
"""""""""
All examples below rely on the following variables, which can be passed via environment variables.
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
:language: python
:start-after: [START howto_operator_gcp_bigtable_args]
:end-before: [END howto_operator_gcp_bigtable_args]
BigtableInstanceCreateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator`
to create a Google Cloud Bigtable instance.
If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration
and immediately succeeds. No changes are made to the existing instance.
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_instance_create]
:end-before: [END howto_operator_gcp_bigtable_instance_create]
BigtableInstanceDeleteOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator`
to delete a Google Cloud Bigtable instance.
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_instance_delete]
:end-before: [END howto_operator_gcp_bigtable_instance_delete]
BigtableClusterUpdateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator`
to modify number of nodes in a Cloud Bigtable cluster.
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_cluster_update]
:end-before: [END howto_operator_gcp_bigtable_cluster_update]
BigtableTableCreateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Creates a table in a Cloud Bigtable instance.
If the table with given ID exists in the Cloud Bigtable instance, the operator compares the Column Families.
If the Column Families are identical operator succeeds. Otherwise, the operator fails with the appropriate
error message.
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_table_create]
:end-before: [END howto_operator_gcp_bigtable_table_create]
Advanced
""""""""
When creating a table, you can specify the optional ``initial_split_keys`` and ``column_familes``.
Please refer to the Python Client for Google Cloud Bigtable documentation
`for Table <https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html>`_ and `for Column
Families <https://googleapis.github.io/google-cloud-python/latest/bigtable/column-family.html>`_.
BigtableTableDeleteOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^
Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator`
to delete a table in Google Cloud Bigtable.
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_table_delete]
:end-before: [END howto_operator_gcp_bigtable_table_delete]
BigtableTableWaitForReplicationSensor
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableTableWaitForReplicationSensor`
to wait for the table to replicate fully.
The same arguments apply to this sensor as the BigtableTableCreateOperator_.
**Note:** If the table or the Cloud Bigtable instance does not exist, this sensor waits for the table until
timeout hits and does not raise any exception.
Using the operator
""""""""""""""""""
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_bigtable_table_wait_for_replication]
:end-before: [END howto_operator_gcp_bigtable_table_wait_for_replication]
Google Cloud Functions Operators
--------------------------------
@ -1318,3 +1447,4 @@ More information
See `Google Cloud Storage ObjectAccessControls insert documentation
<https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert>`_.

Просмотреть файл

@ -1,3 +1,4 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
@ -814,6 +815,69 @@ Cloud SQL Hooks
:members:
Cloud Bigtable
''''''''''''''
Cloud Bigtable Operators
""""""""""""""""""""""""
- :ref:`BigtableInstanceCreateOperator` : creates a Cloud Bigtable instance.
- :ref:`BigtableInstanceDeleteOperator` : deletes a Google Cloud Bigtable instance.
- :ref:`BigtableClusterUpdateOperator` : updates the number of nodes in a Google Cloud Bigtable cluster.
- :ref:`BigtableTableCreateOperator` : creates a table in a Google Cloud Bigtable instance.
- :ref:`BigtableTableDeleteOperator` : deletes a table in a Google Cloud Bigtable instance.
- :ref:`BigtableTableWaitForReplicationSensor` : (sensor) waits for a table to be fully replicated.
.. _BigtableInstanceCreateOperator:
BigtableInstanceCreateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator
.. _BigtableInstanceDeleteOperator:
BigtableInstanceDeleteOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator
.. _BigtableClusterUpdateOperator:
BigtableClusterUpdateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator
.. _BigtableTableCreateOperator:
BigtableTableCreateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableTableCreateOperator
.. _BigtableTableDeleteOperator:
BigtableTableDeleteOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator
.. _BigtableTableWaitForReplicationSensor:
BigtableTableWaitForReplicationSensor
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableTableWaitForReplicationSensor
.. _BigtableHook:
Cloud Bigtable Hook
""""""""""""""""""""
.. autoclass:: airflow.contrib.hooks.gcp_bigtable_hook.BigtableHook
:members:
Compute Engine
''''''''''''''

Просмотреть файл

@ -190,6 +190,7 @@ gcp_api = [
'google-auth>=1.0.0, <2.0.0dev',
'google-auth-httplib2>=0.0.1',
'google-cloud-container>=0.1.1',
'google-cloud-bigtable==0.31.0',
'google-cloud-spanner>=1.6.0',
'grpcio-gcp>=0.2.2',
'PyOpenSSL',

Просмотреть файл

@ -0,0 +1,540 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
import google.api_core.exceptions
from google.cloud.bigtable.column_family import MaxVersionsGCRule
from google.cloud.bigtable.instance import Instance
from google.cloud.bigtable.table import ClusterState
from parameterized import parameterized
from airflow import AirflowException
from airflow.contrib.operators.gcp_bigtable_operator import BigtableInstanceDeleteOperator, \
BigtableTableDeleteOperator, BigtableTableCreateOperator, BigtableTableWaitForReplicationSensor, \
BigtableClusterUpdateOperator, BigtableInstanceCreateOperator
try:
# noinspection PyProtectedMember
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None
PROJECT_ID = 'test_project_id'
INSTANCE_ID = 'test-instance-id'
CLUSTER_ID = 'test-cluster-id'
CLUSTER_ZONE = 'us-central1-f'
NODES = 5
TABLE_ID = 'test-table-id'
INITIAL_SPLIT_KEYS = []
EMPTY_COLUMN_FAMILIES = {}
class BigtableInstanceCreateTest(unittest.TestCase):
@parameterized.expand([
('project_id', '', INSTANCE_ID, CLUSTER_ID, CLUSTER_ZONE),
('instance_id', PROJECT_ID, '', CLUSTER_ID, CLUSTER_ZONE),
('main_cluster_id', PROJECT_ID, INSTANCE_ID, '', CLUSTER_ZONE),
('main_cluster_zone', PROJECT_ID, INSTANCE_ID, CLUSTER_ID, ''),
], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_empty_attribute(self, missing_attribute, project_id, instance_id, main_cluster_id,
main_cluster_zone, mock_hook):
with self.assertRaises(AirflowException) as e:
BigtableInstanceCreateOperator(
project_id=project_id,
instance_id=instance_id,
main_cluster_id=main_cluster_id,
main_cluster_zone=main_cluster_zone,
task_id="id"
)
err = e.exception
self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
mock_hook.assert_not_called()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_create_instance_that_exists(self, mock_hook):
mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
op = BigtableInstanceCreateOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
main_cluster_id=CLUSTER_ID,
main_cluster_zone=CLUSTER_ZONE,
task_id="id"
)
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.create_instance.assert_not_called()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_different_error_reraised(self, mock_hook):
mock_hook.return_value.get_instance.return_value = None
op = BigtableInstanceCreateOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
main_cluster_id=CLUSTER_ID,
main_cluster_zone=CLUSTER_ZONE,
task_id="id"
)
mock_hook.return_value.create_instance.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.GoogleAPICallError('error'))
with self.assertRaises(google.api_core.exceptions.GoogleAPICallError):
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.create_instance.assert_called_once_with(
PROJECT_ID, INSTANCE_ID, CLUSTER_ID, CLUSTER_ZONE, None, None, None, None, None, None, None, None
)
class BigtableClusterUpdateTest(unittest.TestCase):
@parameterized.expand([
('project_id', '', INSTANCE_ID, CLUSTER_ID, NODES),
('instance_id', PROJECT_ID, '', CLUSTER_ID, NODES),
('cluster_id', PROJECT_ID, INSTANCE_ID, '', NODES),
('nodes', PROJECT_ID, INSTANCE_ID, CLUSTER_ID, ''),
], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_empty_attribute(self, missing_attribute, project_id, instance_id, cluster_id, nodes, mock_hook):
with self.assertRaises(AirflowException) as e:
BigtableClusterUpdateOperator(
project_id=project_id,
instance_id=instance_id,
cluster_id=cluster_id,
nodes=nodes,
task_id="id"
)
err = e.exception
self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
mock_hook.assert_not_called()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_updating_cluster_but_instance_does_not_exists(self, mock_hook):
mock_hook.return_value.get_instance.return_value = None
with self.assertRaises(AirflowException) as e:
op = BigtableClusterUpdateOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
cluster_id=CLUSTER_ID,
nodes=NODES,
task_id="id"
)
op.execute(None)
err = e.exception
self.assertEqual(str(err), "Dependency: instance '{}' does not exist.".format(INSTANCE_ID))
mock_hook.assert_called_once_with()
mock_hook.return_value.update_cluster.assert_not_called()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_updating_cluster_that_does_not_exists(self, mock_hook):
instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
mock_hook.return_value.update_cluster.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.NotFound("Cluster not found."))
with self.assertRaises(AirflowException) as e:
op = BigtableClusterUpdateOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
cluster_id=CLUSTER_ID,
nodes=NODES,
task_id="id"
)
op.execute(None)
err = e.exception
self.assertEqual(
str(err),
"Dependency: cluster '{}' does not exist for instance '{}'.".format(CLUSTER_ID, INSTANCE_ID)
)
mock_hook.assert_called_once_with()
mock_hook.return_value.update_cluster.assert_called_once_with(instance, CLUSTER_ID, NODES)
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_different_error_reraised(self, mock_hook):
op = BigtableClusterUpdateOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
cluster_id=CLUSTER_ID,
nodes=NODES,
task_id="id"
)
instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
mock_hook.return_value.update_cluster.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.GoogleAPICallError('error'))
with self.assertRaises(google.api_core.exceptions.GoogleAPICallError):
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.update_cluster.assert_called_once_with(instance, CLUSTER_ID, NODES)
class BigtableInstanceDeleteTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_delete_execute(self, mock_hook):
op = BigtableInstanceDeleteOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
task_id="id"
)
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.delete_instance.assert_called_once_with(PROJECT_ID, INSTANCE_ID)
@parameterized.expand([
('project_id', '', INSTANCE_ID),
('instance_id', PROJECT_ID, ''),
], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_empty_attribute(self, missing_attribute, project_id, instance_id, mock_hook):
with self.assertRaises(AirflowException) as e:
BigtableInstanceDeleteOperator(
project_id=project_id,
instance_id=instance_id,
task_id="id"
)
err = e.exception
self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
mock_hook.assert_not_called()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_deleting_instance_that_doesnt_exists(self, mock_hook):
op = BigtableInstanceDeleteOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
task_id="id"
)
mock_hook.return_value.delete_instance.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.NotFound("Instance not found."))
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.delete_instance.assert_called_once_with(PROJECT_ID, INSTANCE_ID)
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_different_error_reraised(self, mock_hook):
op = BigtableInstanceDeleteOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
task_id="id"
)
mock_hook.return_value.delete_instance.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.GoogleAPICallError('error'))
with self.assertRaises(google.api_core.exceptions.GoogleAPICallError):
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.delete_instance.assert_called_once_with(PROJECT_ID, INSTANCE_ID)
class BigtableTableDeleteTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_delete_execute(self, mock_hook):
op = BigtableTableDeleteOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
task_id="id"
)
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.delete_table.assert_called_once_with(PROJECT_ID, INSTANCE_ID, TABLE_ID)
@parameterized.expand([
('project_id', '', INSTANCE_ID, TABLE_ID),
('instance_id', PROJECT_ID, '', TABLE_ID),
('table_id', PROJECT_ID, INSTANCE_ID, ''),
], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id, mock_hook):
with self.assertRaises(AirflowException) as e:
BigtableTableDeleteOperator(
project_id=project_id,
instance_id=instance_id,
table_id=table_id,
task_id="id"
)
err = e.exception
self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
mock_hook.assert_not_called()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_deleting_table_that_doesnt_exists(self, mock_hook):
op = BigtableTableDeleteOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
task_id="id"
)
mock_hook.return_value.delete_table.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.NotFound("Table not found."))
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.delete_table.assert_called_once_with(PROJECT_ID, INSTANCE_ID, TABLE_ID)
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_deleting_table_when_instance_doesnt_exists(self, mock_hook):
op = BigtableTableDeleteOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
task_id="id"
)
mock_hook.return_value.get_instance.return_value = None
with self.assertRaises(AirflowException) as e:
op.execute(None)
err = e.exception
self.assertEqual(str(err), "Dependency: instance '{}' does not exist.".format(INSTANCE_ID))
mock_hook.assert_called_once_with()
mock_hook.return_value.delete_table.assert_not_called()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_different_error_reraised(self, mock_hook):
op = BigtableTableDeleteOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
task_id="id"
)
mock_hook.return_value.delete_table.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.GoogleAPICallError('error'))
with self.assertRaises(google.api_core.exceptions.GoogleAPICallError):
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.delete_table.assert_called_once_with(PROJECT_ID, INSTANCE_ID, TABLE_ID)
class BigtableTableCreateTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_create_execute(self, mock_hook):
op = BigtableTableCreateOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
initial_split_keys=INITIAL_SPLIT_KEYS,
column_families=EMPTY_COLUMN_FAMILIES,
task_id="id"
)
instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.create_table.assert_called_once_with(
instance, TABLE_ID, INITIAL_SPLIT_KEYS, EMPTY_COLUMN_FAMILIES)
@parameterized.expand([
('project_id', '', INSTANCE_ID, TABLE_ID),
('instance_id', PROJECT_ID, '', TABLE_ID),
('table_id', PROJECT_ID, INSTANCE_ID, ''),
], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id, mock_hook):
with self.assertRaises(AirflowException) as e:
BigtableTableCreateOperator(
project_id=project_id,
instance_id=instance_id,
table_id=table_id,
task_id="id"
)
err = e.exception
self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
mock_hook.assert_not_called()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_instance_not_exists(self, mock_hook):
op = BigtableTableCreateOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
initial_split_keys=INITIAL_SPLIT_KEYS,
column_families=EMPTY_COLUMN_FAMILIES,
task_id="id"
)
mock_hook.return_value.get_instance.return_value = None
with self.assertRaises(AirflowException) as e:
op.execute(None)
err = e.exception
self.assertEqual(
str(err),
"Dependency: instance '{}' does not exist in project '{}'.".format(INSTANCE_ID, PROJECT_ID)
)
mock_hook.assert_called_once_with()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_creating_table_that_exists(self, mock_hook):
op = BigtableTableCreateOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
initial_split_keys=INITIAL_SPLIT_KEYS,
column_families=EMPTY_COLUMN_FAMILIES,
task_id="id"
)
mock_hook.return_value.get_column_families_for_table.return_value = EMPTY_COLUMN_FAMILIES
instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
mock_hook.return_value.create_table.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.AlreadyExists("Table already exists."))
op.execute(None)
mock_hook.assert_called_once_with()
mock_hook.return_value.create_table.assert_called_once_with(
instance, TABLE_ID, INITIAL_SPLIT_KEYS, EMPTY_COLUMN_FAMILIES)
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_creating_table_that_exists_with_different_column_families_ids_in_the_table(self, mock_hook):
op = BigtableTableCreateOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
initial_split_keys=INITIAL_SPLIT_KEYS,
column_families=EMPTY_COLUMN_FAMILIES,
task_id="id"
)
mock_hook.return_value.get_column_families_for_table.return_value = {"existing_family": None}
mock_hook.return_value.create_table.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.AlreadyExists("Table already exists."))
with self.assertRaises(AirflowException) as e:
op.execute(None)
err = e.exception
self.assertEqual(
str(err),
"Table '{}' already exists with different Column Families.".format(TABLE_ID)
)
mock_hook.assert_called_once_with()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_creating_table_that_exists_with_different_column_families_gc_rule_in_the_table(self, mock_hook):
op = BigtableTableCreateOperator(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
initial_split_keys=INITIAL_SPLIT_KEYS,
column_families={"cf-id": MaxVersionsGCRule(1)},
task_id="id"
)
cf_mock = mock.Mock()
cf_mock.gc_rule = mock.Mock(return_value=MaxVersionsGCRule(2))
mock_hook.return_value.get_column_families_for_table.return_value = {
"cf-id": cf_mock
}
mock_hook.return_value.create_table.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.AlreadyExists("Table already exists."))
with self.assertRaises(AirflowException) as e:
op.execute(None)
err = e.exception
self.assertEqual(
str(err),
"Table '{}' already exists with different Column Families.".format(TABLE_ID)
)
mock_hook.assert_called_once_with()
class BigtableWaitForTableReplicationTest(unittest.TestCase):
@parameterized.expand([
('project_id', '', INSTANCE_ID, TABLE_ID),
('instance_id', PROJECT_ID, '', TABLE_ID),
('table_id', PROJECT_ID, INSTANCE_ID, ''),
], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0])
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id, mock_hook):
with self.assertRaises(AirflowException) as e:
BigtableTableWaitForReplicationSensor(
project_id=project_id,
instance_id=instance_id,
table_id=table_id,
task_id="id"
)
err = e.exception
self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute))
mock_hook.assert_not_called()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_wait_no_instance(self, mock_hook):
mock_hook.return_value.get_instance.return_value = None
op = BigtableTableWaitForReplicationSensor(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
task_id="id"
)
self.assertFalse(op.poke(None))
mock_hook.assert_called_once_with()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_wait_no_table(self, mock_hook):
mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
mock_hook.return_value.get_cluster_states_for_table.side_effect = mock.Mock(
side_effect=google.api_core.exceptions.NotFound("Table not found."))
op = BigtableTableWaitForReplicationSensor(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
task_id="id"
)
self.assertFalse(op.poke(None))
mock_hook.assert_called_once_with()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_wait_not_ready(self, mock_hook):
mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
mock_hook.return_value.get_cluster_states_for_table.return_value = {
"cl-id": ClusterState(0)
}
op = BigtableTableWaitForReplicationSensor(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
task_id="id"
)
self.assertFalse(op.poke(None))
mock_hook.assert_called_once_with()
@mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook')
def test_wait_ready(self, mock_hook):
mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
mock_hook.return_value.get_cluster_states_for_table.return_value = {
"cl-id": ClusterState(4)
}
op = BigtableTableWaitForReplicationSensor(
project_id=PROJECT_ID,
instance_id=INSTANCE_ID,
table_id=TABLE_ID,
task_id="id"
)
self.assertTrue(op.poke(None))
mock_hook.assert_called_once_with()