diff --git a/airflow/contrib/example_dags/example_gcp_bigtable_operators.py b/airflow/contrib/example_dags/example_gcp_bigtable_operators.py new file mode 100644 index 0000000000..48c4245cba --- /dev/null +++ b/airflow/contrib/example_dags/example_gcp_bigtable_operators.py @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# 'License'); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that creates and performs following operations on Cloud Bigtable: +- creates an Instance +- creates a Table +- updates Cluster +- waits for Table replication completeness +- deletes the Table +- deletes the Instance + +This DAG relies on the following environment variables +* GCP_PROJECT_ID - Google Cloud Platform project +* CBT_INSTANCE_ID - desired ID of a Cloud Bigtable instance +* CBT_INSTANCE_DISPLAY_NAME - desired human-readable display name of the Instance +* CBT_INSTANCE_TYPE - type of the Instance, e.g. 1 for DEVELOPMENT + See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance +* CBT_INSTANCE_LABELS - labels to add for the Instance +* CBT_CLUSTER_ID - desired ID of the main Cluster created for the Instance +* CBT_CLUSTER_ZONE - zone in which main Cluster will be created. e.g. europe-west1-b + See available zones: https://cloud.google.com/bigtable/docs/locations +* CBT_CLUSTER_NODES - initial amount of nodes of the Cluster +* CBT_CLUSTER_NODES_UPDATED - amount of nodes for BigtableClusterUpdateOperator +* CBT_CLUSTER_STORAGE_TYPE - storage for the Cluster, e.g. 1 for SSD + See https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.cluster # noqa: E501 +* CBT_TABLE_ID - desired ID of the Table +* CBT_POKE_INTERVAL - number of seconds between every attempt of Sensor check + +""" + +import datetime +import json + +from os import getenv + +import airflow +from airflow import models +from airflow.contrib.operators.gcp_bigtable_operator import BigtableInstanceCreateOperator, \ + BigtableInstanceDeleteOperator, BigtableClusterUpdateOperator, BigtableTableCreateOperator, \ + BigtableTableWaitForReplicationSensor, BigtableTableDeleteOperator + +# [START howto_operator_gcp_bigtable_args] +GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project') +CBT_INSTANCE_ID = getenv('CBT_INSTANCE_ID', 'some-instance-id') +CBT_INSTANCE_DISPLAY_NAME = getenv('CBT_INSTANCE_DISPLAY_NAME', 'Human-readable name') +CBT_INSTANCE_TYPE = getenv('CBT_INSTANCE_TYPE', '2') +CBT_INSTANCE_LABELS = getenv('CBT_INSTANCE_LABELS', '{}') +CBT_CLUSTER_ID = getenv('CBT_CLUSTER_ID', 'some-cluster-id') +CBT_CLUSTER_ZONE = getenv('CBT_CLUSTER_ZONE', 'europe-west1-b') +CBT_CLUSTER_NODES = getenv('CBT_CLUSTER_NODES', '3') +CBT_CLUSTER_NODES_UPDATED = getenv('CBT_CLUSTER_NODES_UPDATED', '5') +CBT_CLUSTER_STORAGE_TYPE = getenv('CBT_CLUSTER_STORAGE_TYPE', '2') +CBT_TABLE_ID = getenv('CBT_TABLE_ID', 'some-table-id') +CBT_POKE_INTERVAL = getenv('CBT_POKE_INTERVAL', '60') +# [END howto_operator_gcp_bigtable_args] + +default_args = { + 'start_date': airflow.utils.dates.days_ago(1) +} + +with models.DAG( + 'example_gcp_bigtable_operators', + default_args=default_args, + schedule_interval=datetime.timedelta(days=1) +) as dag: + # [START howto_operator_gcp_bigtable_instance_create] + create_instance_task = BigtableInstanceCreateOperator( + project_id=GCP_PROJECT_ID, + instance_id=CBT_INSTANCE_ID, + main_cluster_id=CBT_CLUSTER_ID, + main_cluster_zone=CBT_CLUSTER_ZONE, + instance_display_name=CBT_INSTANCE_DISPLAY_NAME, + instance_type=int(CBT_INSTANCE_TYPE), + instance_labels=json.loads(CBT_INSTANCE_LABELS), + cluster_nodes=int(CBT_CLUSTER_NODES), + cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE), + task_id='create_instance', + ) + # [END howto_operator_gcp_bigtable_instance_create] + + # [START howto_operator_gcp_bigtable_cluster_update] + cluster_update_task = BigtableClusterUpdateOperator( + project_id=GCP_PROJECT_ID, + instance_id=CBT_INSTANCE_ID, + cluster_id=CBT_CLUSTER_ID, + nodes=int(CBT_CLUSTER_NODES_UPDATED), + task_id='update_cluster', + ) + # [END howto_operator_gcp_bigtable_cluster_update] + + # [START howto_operator_gcp_bigtable_instance_delete] + delete_instance_task = BigtableInstanceDeleteOperator( + project_id=GCP_PROJECT_ID, + instance_id=CBT_INSTANCE_ID, + task_id='delete_instance', + ) + # [END howto_operator_gcp_bigtable_instance_delete] + + # [START howto_operator_gcp_bigtable_table_create] + create_table_task = BigtableTableCreateOperator( + project_id=GCP_PROJECT_ID, + instance_id=CBT_INSTANCE_ID, + table_id=CBT_TABLE_ID, + task_id='create_table', + ) + # [END howto_operator_gcp_bigtable_table_create] + + # [START howto_operator_gcp_bigtable_table_wait_for_replication] + wait_for_table_replication_task = BigtableTableWaitForReplicationSensor( + project_id=GCP_PROJECT_ID, + instance_id=CBT_INSTANCE_ID, + table_id=CBT_TABLE_ID, + poke_interval=int(CBT_POKE_INTERVAL), + task_id='wait_for_table_replication', + ) + # [END howto_operator_gcp_bigtable_table_wait_for_replication] + + # [START howto_operator_gcp_bigtable_table_delete] + delete_table_task = BigtableTableDeleteOperator( + project_id=GCP_PROJECT_ID, + instance_id=CBT_INSTANCE_ID, + table_id=CBT_TABLE_ID, + task_id='delete_table', + ) + # [END howto_operator_gcp_bigtable_table_delete] + + wait_for_table_replication_task >> delete_table_task + create_instance_task \ + >> create_table_task \ + >> cluster_update_task \ + >> delete_table_task \ + >> delete_instance_task diff --git a/airflow/contrib/hooks/gcp_bigtable_hook.py b/airflow/contrib/hooks/gcp_bigtable_hook.py new file mode 100644 index 0000000000..5d1b6f01c9 --- /dev/null +++ b/airflow/contrib/hooks/gcp_bigtable_hook.py @@ -0,0 +1,231 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from google.cloud.bigtable import Client +from google.cloud.bigtable.cluster import Cluster +from google.cloud.bigtable.instance import Instance +from google.cloud.bigtable.table import Table +from google.cloud.bigtable_admin_v2 import enums +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + + +class BigtableHook(GoogleCloudBaseHook): + """ + Hook for Google Cloud Bigtable APIs. + """ + + _client = None + + def __init__(self, + gcp_conn_id='google_cloud_default', + delegate_to=None): + super(BigtableHook, self).__init__(gcp_conn_id, delegate_to) + + def get_client(self, project_id): + if not self._client: + self._client = Client(project=project_id, credentials=self._get_credentials(), admin=True) + return self._client + + def get_instance(self, project_id, instance_id): + """ + Retrieves and returns the specified Cloud Bigtable instance if it exists. + Otherwise, returns None. + + :param project_id: The ID of the GCP project. + :type project_id: str + :param instance_id: The ID of the Cloud Bigtable instance. + :type instance_id: str + """ + + client = self.get_client(project_id) + + instance = Instance(instance_id, client) + if not instance.exists(): + return None + return instance + + def delete_instance(self, project_id, instance_id): + """ + Deletes the specified Cloud Bigtable instance. + Raises google.api_core.exceptions.NotFound if the Cloud Bigtable instance does not exist. + + :param project_id: The ID of the GCP project. + :type project_id: str + :param instance_id: The ID of the Cloud Bigtable instance. + :type instance_id: str + """ + instance = Instance(instance_id, self.get_client(project_id)) + instance.delete() + + def create_instance(self, + project_id, + instance_id, + main_cluster_id, + main_cluster_zone, + replica_cluster_id=None, + replica_cluster_zone=None, + instance_display_name=None, + instance_type=enums.Instance.Type.TYPE_UNSPECIFIED, + instance_labels=None, + cluster_nodes=None, + cluster_storage_type=enums.StorageType.STORAGE_TYPE_UNSPECIFIED, + timeout=None): + """ + Creates new instance. + + :type project_id: str + :param project_id: The ID of the GCP project. + :type instance_id: str + :param instance_id: The ID for the new instance. + :type main_cluster_id: str + :param main_cluster_id: The ID for main cluster for the new instance. + :type main_cluster_zone: str + :param main_cluster_zone: The zone for main cluster. + See https://cloud.google.com/bigtable/docs/locations for more details. + :type replica_cluster_id: str + :param replica_cluster_id: (optional) The ID for replica cluster for the new instance. + :type replica_cluster_zone: str + :param replica_cluster_zone: (optional) The zone for replica cluster. + :type instance_type: enums.Instance.Type + :param instance_type: (optional) The type of the instance. + :type instance_display_name: str + :param instance_display_name: (optional) Human-readable name of the instance. + Defaults to ``instance_id``. + :type instance_labels: dict + :param instance_labels: (optional) Dictionary of labels to associate with the instance. + :type cluster_nodes: int + :param cluster_nodes: (optional) Number of nodes for cluster. + :type cluster_storage_type: enums.StorageType + :param cluster_storage_type: (optional) The type of storage. + :type timeout: int + :param timeout: (optional) timeout (in seconds) for instance creation. + If None is not specified, Operator will wait indefinitely. + """ + cluster_storage_type = enums.StorageType(cluster_storage_type) + instance_type = enums.Instance.Type(instance_type) + + instance = Instance( + instance_id, + self.get_client(project_id), + instance_display_name, + instance_type, + instance_labels, + ) + + clusters = [ + instance.cluster( + main_cluster_id, + main_cluster_zone, + cluster_nodes, + cluster_storage_type + ) + ] + if replica_cluster_id and replica_cluster_zone: + clusters.append(instance.cluster( + replica_cluster_id, + replica_cluster_zone, + cluster_nodes, + cluster_storage_type + )) + operation = instance.create( + clusters=clusters + ) + operation.result(timeout) + return instance + + # noinspection PyMethodMayBeStatic + def create_table(self, instance, table_id, initial_split_keys, column_families): + """ + Creates the specified Cloud Bigtable table. + Raises google.api_core.exceptions.AlreadyExists if the table exists. + + :type instance: Instance + :param instance: The Cloud Bigtable instance that owns the table. + :type table_id: str + :param table_id: The ID of the table to create in Cloud Bigtable. + :type initial_split_keys: list + :param initial_split_keys: (Optional) A list of row keys in bytes to use to initially split the table. + :type column_families: dict + :param column_families: (Optional) A map of columns to create. The key is the column_id str, and the + value is a GarbageCollectionRule. + """ + table = Table(table_id, instance) + table.create(initial_split_keys, column_families) + + def delete_table(self, project_id, instance_id, table_id): + """ + Deletes the specified table in Cloud Bigtable. + Raises google.api_core.exceptions.NotFound if the table does not exist. + + :type project_id: str + :param project_id: The ID of the GCP project. + :type instance_id: str + :param instance_id: The ID of the Cloud Bigtable instance. + :type table_id: str + :param table_id: The ID of the table in Cloud Bigtable. + """ + instance = Instance(instance_id, self.get_client(project_id)) + table = Table(table_id, instance) + table.delete() + + @staticmethod + def update_cluster(instance, cluster_id, nodes): + """ + Updates number of nodes in the specified Cloud Bigtable cluster. + Raises google.api_core.exceptions.NotFound if the cluster does not exist. + + :type instance: Instance + :param instance: The Cloud Bigtable instance that owns the cluster. + :type cluster_id: str + :param cluster_id: The ID of the cluster. + :type nodes: int + :param nodes: The desired number of nodes. + """ + cluster = Cluster(cluster_id, instance) + cluster.serve_nodes = nodes + cluster.update() + + @staticmethod + def get_column_families_for_table(instance, table_id): + """ + Fetches Column Families for the specified table in Cloud Bigtable. + + :type instance: Instance + :param instance: The Cloud Bigtable instance that owns the table. + :type table_id: str + :param table_id: The ID of the table in Cloud Bigtable to fetch Column Families from. + """ + + table = Table(table_id, instance) + return table.list_column_families() + + @staticmethod + def get_cluster_states_for_table(instance, table_id): + """ + Fetches Cluster States for the specified table in Cloud Bigtable. + Raises google.api_core.exceptions.NotFound if the table does not exist. + + :type instance: Instance + :param instance: The Cloud Bigtable instance that owns the table. + :type table_id: str + :param table_id: The ID of the table in Cloud Bigtable to fetch Cluster States from. + """ + + table = Table(table_id, instance) + return table.get_cluster_states() diff --git a/airflow/contrib/operators/gcp_bigtable_operator.py b/airflow/contrib/operators/gcp_bigtable_operator.py new file mode 100644 index 0000000000..640851e76e --- /dev/null +++ b/airflow/contrib/operators/gcp_bigtable_operator.py @@ -0,0 +1,424 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import google.api_core.exceptions + +from airflow import AirflowException +from airflow.models import BaseOperator +from airflow.sensors.base_sensor_operator import BaseSensorOperator +from airflow.contrib.hooks.gcp_bigtable_hook import BigtableHook +from airflow.utils.decorators import apply_defaults +from google.cloud.bigtable_admin_v2 import enums +from google.cloud.bigtable.table import ClusterState + + +class BigtableValidationMixin(object): + """ + Common class for Cloud Bigtable operators for validating required fields. + """ + + REQUIRED_ATTRIBUTES = [] + + def _validate_inputs(self): + for attr_name in self.REQUIRED_ATTRIBUTES: + if not getattr(self, attr_name): + raise AirflowException('Empty parameter: {}'.format(attr_name)) + + +class BigtableInstanceCreateOperator(BaseOperator, BigtableValidationMixin): + """ + Creates a new Cloud Bigtable instance. + If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration + and immediately succeeds. No changes are made to the existing instance. + + For more details about instance creation have a look at the reference: + https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.create + + :type project_id: str + :param project_id: The ID of the GCP project. + :type instance_id: str + :param instance_id: The ID of the Cloud Bigtable instance to create. + :type main_cluster_id: str + :param main_cluster_id: The ID for main cluster for the new instance. + :type main_cluster_zone: str + :param main_cluster_zone: The zone for main cluster + See https://cloud.google.com/bigtable/docs/locations for more details. + :type replica_cluster_id: str + :param replica_cluster_id: (optional) The ID for replica cluster for the new instance. + :type replica_cluster_zone: str + :param replica_cluster_zone: (optional) The zone for replica cluster. + :type instance_type: IntEnum + :param instance_type: (optional) The type of the instance. + :type instance_display_name: str + :param instance_display_name: (optional) Human-readable name of the instance. Defaults to ``instance_id``. + :type instance_labels: dict + :param instance_labels: (optional) Dictionary of labels to associate with the instance. + :type cluster_nodes: int + :param cluster_nodes: (optional) Number of nodes for cluster. + :type cluster_storage_type: IntEnum + :param cluster_storage_type: (optional) The type of storage. + :type timeout: int + :param timeout: (optional) timeout (in seconds) for instance creation. + If None is not specified, Operator will wait indefinitely. + """ + + REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'main_cluster_id', 'main_cluster_zone') + template_fields = ['project_id', 'instance_id', 'main_cluster_id', 'main_cluster_zone'] + + @apply_defaults + def __init__(self, + project_id, + instance_id, + main_cluster_id, + main_cluster_zone, + replica_cluster_id=None, + replica_cluster_zone=None, + instance_display_name=None, + instance_type=None, + instance_labels=None, + cluster_nodes=None, + cluster_storage_type=None, + timeout=None, + *args, **kwargs): + self.project_id = project_id + self.instance_id = instance_id + self.main_cluster_id = main_cluster_id + self.main_cluster_zone = main_cluster_zone + self.replica_cluster_id = replica_cluster_id + self.replica_cluster_zone = replica_cluster_zone + self.instance_display_name = instance_display_name + self.instance_type = instance_type + self.instance_labels = instance_labels + self.cluster_nodes = cluster_nodes + self.cluster_storage_type = cluster_storage_type + self.timeout = timeout + self._validate_inputs() + self.hook = BigtableHook() + super(BigtableInstanceCreateOperator, self).__init__(*args, **kwargs) + + def execute(self, context): + instance = self.hook.get_instance(self.project_id, self.instance_id) + if instance: + # Based on Instance.__eq__ instance with the same ID and client is considered as equal. + self.log.info( + "The instance '%s' already exists in this project. Consider it as created", + self.instance_id + ) + return + try: + self.hook.create_instance( + self.project_id, + self.instance_id, + self.main_cluster_id, + self.main_cluster_zone, + self.replica_cluster_id, + self.replica_cluster_zone, + self.instance_display_name, + self.instance_type, + self.instance_labels, + self.cluster_nodes, + self.cluster_storage_type, + self.timeout, + ) + except google.api_core.exceptions.GoogleAPICallError as e: + self.log.error('An error occurred. Exiting.') + raise e + + +class BigtableInstanceDeleteOperator(BaseOperator, BigtableValidationMixin): + """ + Deletes the Cloud Bigtable instance, including its clusters and all related tables. + + For more details about deleting instance have a look at the reference: + https://googleapis.github.io/google-cloud-python/latest/bigtable/instance.html#google.cloud.bigtable.instance.Instance.delete + + :type project_id: str + :param project_id: The ID of the GCP project. + :type instance_id: str + :param instance_id: The ID of the Cloud Bigtable instance to delete. + """ + REQUIRED_ATTRIBUTES = ('project_id', 'instance_id') + template_fields = ['project_id', 'instance_id'] + + @apply_defaults + def __init__(self, + project_id, + instance_id, + *args, **kwargs): + self.project_id = project_id + self.instance_id = instance_id + self._validate_inputs() + self.hook = BigtableHook() + super(BigtableInstanceDeleteOperator, self).__init__(*args, **kwargs) + + def execute(self, context): + try: + self.hook.delete_instance(self.project_id, self.instance_id) + except google.api_core.exceptions.NotFound: + self.log.info( + "The instance '%s' does not exist in project '%s'. Consider it as deleted", + self.instance_id, self.project_id + ) + except google.api_core.exceptions.GoogleAPICallError as e: + self.log.error('An error occurred. Exiting.') + raise e + + +class BigtableTableCreateOperator(BaseOperator, BigtableValidationMixin): + """ + Creates the table in the Cloud Bigtable instance. + + For more details about creating table have a look at the reference: + https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.create + + :type project_id: str + :param project_id: The ID of the GCP project. + :type instance_id: str + :param instance_id: The ID of the Cloud Bigtable instance that will hold the new table. + :type table_id: str + :param table_id: The ID of the table to be created. + :type initial_split_keys: list + :param initial_split_keys: (Optional) list of row keys in bytes that will be used to initially split + the table into several tablets. + :type column_families: dict + :param column_families: (Optional) A map columns to create. + The key is the column_id str and the value is a GarbageCollectionRule + """ + REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'table_id') + template_fields = ['project_id', 'instance_id', 'table_id'] + + @apply_defaults + def __init__(self, + project_id, + instance_id, + table_id, + initial_split_keys=None, + column_families=None, + *args, **kwargs): + self.project_id = project_id + self.instance_id = instance_id + self.table_id = table_id + self.initial_split_keys = initial_split_keys or list() + self.column_families = column_families or dict() + self._validate_inputs() + self.hook = BigtableHook() + self.instance = None + super(BigtableTableCreateOperator, self).__init__(*args, **kwargs) + + def _compare_column_families(self): + table_column_families = self.hook.get_column_families_for_table(self.instance, self.table_id) + if set(table_column_families.keys()) != set(self.column_families.keys()): + self.log.error("Table '%s' has different set of Column Families", self.table_id) + self.log.error("Expected: %s", self.column_families.keys()) + self.log.error("Actual: %s", table_column_families.keys()) + return False + + for key in table_column_families.keys(): + # There is difference in structure between local Column Families and remote ones + # Local `self.column_families` is dict with column_id as key and GarbageCollectionRule as value. + # Remote `table_column_families` is list of ColumnFamily objects. + # For more information about ColumnFamily please refer to the documentation: + # https://googleapis.github.io/google-cloud-python/latest/bigtable/column-family.html#google.cloud.bigtable.column_family.ColumnFamily + if table_column_families[key].gc_rule != self.column_families[key]: + self.log.error("Column Family '%s' differs for table '%s'.", key, self.table_id) + return False + return True + + def execute(self, context): + self.instance = self.hook.get_instance(self.project_id, self.instance_id) + if not self.instance: + raise AirflowException("Dependency: instance '{}' does not exist in project '{}'.".format( + self.instance_id, self.project_id)) + try: + self.hook.create_table( + self.instance, + self.table_id, + self.initial_split_keys, + self.column_families + ) + except google.api_core.exceptions.AlreadyExists: + if not self._compare_column_families(): + raise AirflowException( + "Table '{}' already exists with different Column Families.".format(self.table_id)) + self.log.info("The table '%s' already exists. Consider it as created", self.table_id) + + +class BigtableTableDeleteOperator(BaseOperator, BigtableValidationMixin): + """ + Deletes the Cloud Bigtable table. + + For more details about deleting table have a look at the reference: + https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.delete + + :type project_id: str + :param project_id: The ID of the GCP project. + :type instance_id: str + :param instance_id: The ID of the Cloud Bigtable instance. + :type table_id: str + :param table_id: The ID of the table to be deleted. + """ + REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'table_id') + template_fields = ['project_id', 'instance_id', 'table_id'] + + @apply_defaults + def __init__(self, + project_id, + instance_id, + table_id, + app_profile_id=None, + *args, **kwargs): + self.project_id = project_id + self.instance_id = instance_id + self.table_id = table_id + self.app_profile_id = app_profile_id + self._validate_inputs() + self.hook = BigtableHook() + super(BigtableTableDeleteOperator, self).__init__(*args, **kwargs) + + def execute(self, context): + instance = self.hook.get_instance(self.project_id, self.instance_id) + if not instance: + raise AirflowException("Dependency: instance '{}' does not exist.".format(self.instance_id)) + + try: + self.hook.delete_table( + self.project_id, + self.instance_id, + self.table_id, + ) + except google.api_core.exceptions.NotFound: + # It's OK if table doesn't exists. + self.log.info("The table '%s' no longer exists. Consider it as deleted", self.table_id) + except google.api_core.exceptions.GoogleAPICallError as e: + self.log.error('An error occurred. Exiting.') + raise e + + +class BigtableClusterUpdateOperator(BaseOperator, BigtableValidationMixin): + """ + Updates a Cloud Bigtable cluster. + + For more details about updating a Cloud Bigtable cluster, have a look at the reference: + https://googleapis.github.io/google-cloud-python/latest/bigtable/cluster.html#google.cloud.bigtable.cluster.Cluster.update + + :type project_id: str + :param project_id: The ID of the GCP project. + :type instance_id: str + :param instance_id: The ID of the Cloud Bigtable instance. + :type cluster_id: str + :param cluster_id: The ID of the Cloud Bigtable cluster to update. + :type nodes: int + :param nodes: The desired number of nodes for the Cloud Bigtable cluster. + """ + REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'cluster_id', 'nodes') + template_fields = ['project_id', 'instance_id', 'cluster_id', 'nodes'] + + @apply_defaults + def __init__(self, + project_id, + instance_id, + cluster_id, + nodes, + *args, **kwargs): + self.project_id = project_id + self.instance_id = instance_id + self.cluster_id = cluster_id + self.nodes = nodes + self._validate_inputs() + self.hook = BigtableHook() + super(BigtableClusterUpdateOperator, self).__init__(*args, **kwargs) + + def execute(self, context): + instance = self.hook.get_instance(self.project_id, self.instance_id) + if not instance: + raise AirflowException("Dependency: instance '{}' does not exist.".format(self.instance_id)) + + try: + self.hook.update_cluster( + instance, + self.cluster_id, + self.nodes + ) + except google.api_core.exceptions.NotFound: + raise AirflowException("Dependency: cluster '{}' does not exist for instance '{}'.".format( + self.cluster_id, + self.instance_id + )) + except google.api_core.exceptions.GoogleAPICallError as e: + self.log.error('An error occurred. Exiting.') + raise e + + +class BigtableTableWaitForReplicationSensor(BaseSensorOperator, BigtableValidationMixin): + """ + Sensor that waits for Cloud Bigtable table to be fully replicated to its clusters. + No exception will be raised if the instance or the table does not exist. + + For more details about cluster states for a table, have a look at the reference: + https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html#google.cloud.bigtable.table.Table.get_cluster_states + + :type project_id: str + :param project_id: The ID of the GCP project. + :type instance_id: str + :param instance_id: The ID of the Cloud Bigtable instance. + :type table_id: str + :param table_id: The ID of the table to check replication status. + """ + REQUIRED_ATTRIBUTES = ('project_id', 'instance_id', 'table_id') + template_fields = ['project_id', 'instance_id', 'table_id'] + + @apply_defaults + def __init__(self, + project_id, + instance_id, + table_id, + *args, **kwargs): + self.project_id = project_id + self.instance_id = instance_id + self.table_id = table_id + self._validate_inputs() + self.hook = BigtableHook() + super(BigtableTableWaitForReplicationSensor, self).__init__(*args, **kwargs) + + def poke(self, context): + instance = self.hook.get_instance(self.project_id, self.instance_id) + if not instance: + self.log.info("Dependency: instance '%s' does not exist.", self.instance_id) + return False + + try: + cluster_states = self.hook.get_cluster_states_for_table(instance, self.table_id) + except google.api_core.exceptions.NotFound: + self.log.info( + "Dependency: table '%s' does not exist in instance '%s'.", self.table_id, self.instance_id) + return False + + ready_state = ClusterState(enums.Table.ClusterState.ReplicationState.READY) + + is_table_replicated = True + for cluster_id in cluster_states.keys(): + if cluster_states[cluster_id] != ready_state: + self.log.info("Table '%s' is not yet replicated on cluster '%s'.", self.table_id, cluster_id) + is_table_replicated = False + + if not is_table_replicated: + return False + + self.log.info("Table '%s' is replicated.", self.table_id) + return True diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst index 221913dec0..3a88d66339 100644 --- a/docs/howto/operator.rst +++ b/docs/howto/operator.rst @@ -361,6 +361,135 @@ More information See `Google Compute Engine API documentation `_. +Google Cloud Bigtable Operators +------------------------------- + +Arguments +""""""""" + +All examples below rely on the following variables, which can be passed via environment variables. + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py + :language: python + :start-after: [START howto_operator_gcp_bigtable_args] + :end-before: [END howto_operator_gcp_bigtable_args] + + +BigtableInstanceCreateOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator` +to create a Google Cloud Bigtable instance. + +If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration +and immediately succeeds. No changes are made to the existing instance. + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gcp_bigtable_instance_create] + :end-before: [END howto_operator_gcp_bigtable_instance_create] + + +BigtableInstanceDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator` +to delete a Google Cloud Bigtable instance. + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gcp_bigtable_instance_delete] + :end-before: [END howto_operator_gcp_bigtable_instance_delete] + +BigtableClusterUpdateOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator` +to modify number of nodes in a Cloud Bigtable cluster. + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gcp_bigtable_cluster_update] + :end-before: [END howto_operator_gcp_bigtable_cluster_update] + + +BigtableTableCreateOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Creates a table in a Cloud Bigtable instance. + +If the table with given ID exists in the Cloud Bigtable instance, the operator compares the Column Families. +If the Column Families are identical operator succeeds. Otherwise, the operator fails with the appropriate +error message. + + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gcp_bigtable_table_create] + :end-before: [END howto_operator_gcp_bigtable_table_create] + +Advanced +"""""""" + +When creating a table, you can specify the optional ``initial_split_keys`` and ``column_familes``. +Please refer to the Python Client for Google Cloud Bigtable documentation +`for Table `_ and `for Column +Families `_. + + +BigtableTableDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator` +to delete a table in Google Cloud Bigtable. + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gcp_bigtable_table_delete] + :end-before: [END howto_operator_gcp_bigtable_table_delete] + +BigtableTableWaitForReplicationSensor +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Use the :class:`~airflow.contrib.operators.gcp_bigtable_operator.BigtableTableWaitForReplicationSensor` +to wait for the table to replicate fully. + +The same arguments apply to this sensor as the BigtableTableCreateOperator_. + +**Note:** If the table or the Cloud Bigtable instance does not exist, this sensor waits for the table until +timeout hits and does not raise any exception. + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_bigtable_operators.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gcp_bigtable_table_wait_for_replication] + :end-before: [END howto_operator_gcp_bigtable_table_wait_for_replication] + + + Google Cloud Functions Operators -------------------------------- @@ -1318,3 +1447,4 @@ More information See `Google Cloud Storage ObjectAccessControls insert documentation `_. + diff --git a/docs/integration.rst b/docs/integration.rst index f35c8e87ea..dd104cf790 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -1,3 +1,4 @@ + .. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information @@ -814,6 +815,69 @@ Cloud SQL Hooks :members: +Cloud Bigtable +'''''''''''''' + +Cloud Bigtable Operators +"""""""""""""""""""""""" + +- :ref:`BigtableInstanceCreateOperator` : creates a Cloud Bigtable instance. +- :ref:`BigtableInstanceDeleteOperator` : deletes a Google Cloud Bigtable instance. +- :ref:`BigtableClusterUpdateOperator` : updates the number of nodes in a Google Cloud Bigtable cluster. +- :ref:`BigtableTableCreateOperator` : creates a table in a Google Cloud Bigtable instance. +- :ref:`BigtableTableDeleteOperator` : deletes a table in a Google Cloud Bigtable instance. +- :ref:`BigtableTableWaitForReplicationSensor` : (sensor) waits for a table to be fully replicated. + +.. _BigtableInstanceCreateOperator: + +BigtableInstanceCreateOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator + +.. _BigtableInstanceDeleteOperator: + +BigtableInstanceDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator + +.. _BigtableClusterUpdateOperator: + +BigtableClusterUpdateOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator + +.. _BigtableTableCreateOperator: + +BigtableTableCreateOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableTableCreateOperator + +.. _BigtableTableDeleteOperator: + +BigtableTableDeleteOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator + +.. _BigtableTableWaitForReplicationSensor: + +BigtableTableWaitForReplicationSensor +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcp_bigtable_operator.BigtableTableWaitForReplicationSensor + +.. _BigtableHook: + +Cloud Bigtable Hook +"""""""""""""""""""" + +.. autoclass:: airflow.contrib.hooks.gcp_bigtable_hook.BigtableHook + :members: + Compute Engine '''''''''''''' diff --git a/setup.py b/setup.py index 6dc452302f..410502c302 100644 --- a/setup.py +++ b/setup.py @@ -190,6 +190,7 @@ gcp_api = [ 'google-auth>=1.0.0, <2.0.0dev', 'google-auth-httplib2>=0.0.1', 'google-cloud-container>=0.1.1', + 'google-cloud-bigtable==0.31.0', 'google-cloud-spanner>=1.6.0', 'grpcio-gcp>=0.2.2', 'PyOpenSSL', diff --git a/tests/contrib/operators/test_gcp_bigtable_operator.py b/tests/contrib/operators/test_gcp_bigtable_operator.py new file mode 100644 index 0000000000..8417efc997 --- /dev/null +++ b/tests/contrib/operators/test_gcp_bigtable_operator.py @@ -0,0 +1,540 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +import google.api_core.exceptions +from google.cloud.bigtable.column_family import MaxVersionsGCRule +from google.cloud.bigtable.instance import Instance +from google.cloud.bigtable.table import ClusterState +from parameterized import parameterized + +from airflow import AirflowException +from airflow.contrib.operators.gcp_bigtable_operator import BigtableInstanceDeleteOperator, \ + BigtableTableDeleteOperator, BigtableTableCreateOperator, BigtableTableWaitForReplicationSensor, \ + BigtableClusterUpdateOperator, BigtableInstanceCreateOperator + +try: + # noinspection PyProtectedMember + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +PROJECT_ID = 'test_project_id' +INSTANCE_ID = 'test-instance-id' +CLUSTER_ID = 'test-cluster-id' +CLUSTER_ZONE = 'us-central1-f' +NODES = 5 +TABLE_ID = 'test-table-id' +INITIAL_SPLIT_KEYS = [] +EMPTY_COLUMN_FAMILIES = {} + + +class BigtableInstanceCreateTest(unittest.TestCase): + @parameterized.expand([ + ('project_id', '', INSTANCE_ID, CLUSTER_ID, CLUSTER_ZONE), + ('instance_id', PROJECT_ID, '', CLUSTER_ID, CLUSTER_ZONE), + ('main_cluster_id', PROJECT_ID, INSTANCE_ID, '', CLUSTER_ZONE), + ('main_cluster_zone', PROJECT_ID, INSTANCE_ID, CLUSTER_ID, ''), + ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0]) + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_empty_attribute(self, missing_attribute, project_id, instance_id, main_cluster_id, + main_cluster_zone, mock_hook): + with self.assertRaises(AirflowException) as e: + BigtableInstanceCreateOperator( + project_id=project_id, + instance_id=instance_id, + main_cluster_id=main_cluster_id, + main_cluster_zone=main_cluster_zone, + task_id="id" + ) + err = e.exception + self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute)) + mock_hook.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_create_instance_that_exists(self, mock_hook): + mock_hook.return_value.get_instance.return_value = mock.Mock(Instance) + + op = BigtableInstanceCreateOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + main_cluster_id=CLUSTER_ID, + main_cluster_zone=CLUSTER_ZONE, + task_id="id" + ) + op.execute(None) + + mock_hook.assert_called_once_with() + mock_hook.return_value.create_instance.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_different_error_reraised(self, mock_hook): + mock_hook.return_value.get_instance.return_value = None + op = BigtableInstanceCreateOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + main_cluster_id=CLUSTER_ID, + main_cluster_zone=CLUSTER_ZONE, + task_id="id" + ) + + mock_hook.return_value.create_instance.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.GoogleAPICallError('error')) + + with self.assertRaises(google.api_core.exceptions.GoogleAPICallError): + op.execute(None) + + mock_hook.assert_called_once_with() + mock_hook.return_value.create_instance.assert_called_once_with( + PROJECT_ID, INSTANCE_ID, CLUSTER_ID, CLUSTER_ZONE, None, None, None, None, None, None, None, None + ) + + +class BigtableClusterUpdateTest(unittest.TestCase): + @parameterized.expand([ + ('project_id', '', INSTANCE_ID, CLUSTER_ID, NODES), + ('instance_id', PROJECT_ID, '', CLUSTER_ID, NODES), + ('cluster_id', PROJECT_ID, INSTANCE_ID, '', NODES), + ('nodes', PROJECT_ID, INSTANCE_ID, CLUSTER_ID, ''), + ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0]) + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_empty_attribute(self, missing_attribute, project_id, instance_id, cluster_id, nodes, mock_hook): + with self.assertRaises(AirflowException) as e: + BigtableClusterUpdateOperator( + project_id=project_id, + instance_id=instance_id, + cluster_id=cluster_id, + nodes=nodes, + task_id="id" + ) + err = e.exception + self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute)) + mock_hook.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_updating_cluster_but_instance_does_not_exists(self, mock_hook): + mock_hook.return_value.get_instance.return_value = None + + with self.assertRaises(AirflowException) as e: + op = BigtableClusterUpdateOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + cluster_id=CLUSTER_ID, + nodes=NODES, + task_id="id" + ) + op.execute(None) + + err = e.exception + self.assertEqual(str(err), "Dependency: instance '{}' does not exist.".format(INSTANCE_ID)) + mock_hook.assert_called_once_with() + mock_hook.return_value.update_cluster.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_updating_cluster_that_does_not_exists(self, mock_hook): + instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance) + mock_hook.return_value.update_cluster.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.NotFound("Cluster not found.")) + + with self.assertRaises(AirflowException) as e: + op = BigtableClusterUpdateOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + cluster_id=CLUSTER_ID, + nodes=NODES, + task_id="id" + ) + op.execute(None) + + err = e.exception + self.assertEqual( + str(err), + "Dependency: cluster '{}' does not exist for instance '{}'.".format(CLUSTER_ID, INSTANCE_ID) + ) + mock_hook.assert_called_once_with() + mock_hook.return_value.update_cluster.assert_called_once_with(instance, CLUSTER_ID, NODES) + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_different_error_reraised(self, mock_hook): + op = BigtableClusterUpdateOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + cluster_id=CLUSTER_ID, + nodes=NODES, + task_id="id" + ) + instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance) + mock_hook.return_value.update_cluster.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.GoogleAPICallError('error')) + + with self.assertRaises(google.api_core.exceptions.GoogleAPICallError): + op.execute(None) + + mock_hook.assert_called_once_with() + mock_hook.return_value.update_cluster.assert_called_once_with(instance, CLUSTER_ID, NODES) + + +class BigtableInstanceDeleteTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_delete_execute(self, mock_hook): + op = BigtableInstanceDeleteOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with() + mock_hook.return_value.delete_instance.assert_called_once_with(PROJECT_ID, INSTANCE_ID) + + @parameterized.expand([ + ('project_id', '', INSTANCE_ID), + ('instance_id', PROJECT_ID, ''), + ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0]) + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_empty_attribute(self, missing_attribute, project_id, instance_id, mock_hook): + with self.assertRaises(AirflowException) as e: + BigtableInstanceDeleteOperator( + project_id=project_id, + instance_id=instance_id, + task_id="id" + ) + err = e.exception + self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute)) + mock_hook.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_deleting_instance_that_doesnt_exists(self, mock_hook): + op = BigtableInstanceDeleteOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + task_id="id" + ) + mock_hook.return_value.delete_instance.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.NotFound("Instance not found.")) + op.execute(None) + mock_hook.assert_called_once_with() + mock_hook.return_value.delete_instance.assert_called_once_with(PROJECT_ID, INSTANCE_ID) + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_different_error_reraised(self, mock_hook): + op = BigtableInstanceDeleteOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + task_id="id" + ) + mock_hook.return_value.delete_instance.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.GoogleAPICallError('error')) + + with self.assertRaises(google.api_core.exceptions.GoogleAPICallError): + op.execute(None) + + mock_hook.assert_called_once_with() + mock_hook.return_value.delete_instance.assert_called_once_with(PROJECT_ID, INSTANCE_ID) + + +class BigtableTableDeleteTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_delete_execute(self, mock_hook): + op = BigtableTableDeleteOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + task_id="id" + ) + op.execute(None) + mock_hook.assert_called_once_with() + mock_hook.return_value.delete_table.assert_called_once_with(PROJECT_ID, INSTANCE_ID, TABLE_ID) + + @parameterized.expand([ + ('project_id', '', INSTANCE_ID, TABLE_ID), + ('instance_id', PROJECT_ID, '', TABLE_ID), + ('table_id', PROJECT_ID, INSTANCE_ID, ''), + ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0]) + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id, mock_hook): + with self.assertRaises(AirflowException) as e: + BigtableTableDeleteOperator( + project_id=project_id, + instance_id=instance_id, + table_id=table_id, + task_id="id" + ) + err = e.exception + self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute)) + mock_hook.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_deleting_table_that_doesnt_exists(self, mock_hook): + op = BigtableTableDeleteOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + task_id="id" + ) + + mock_hook.return_value.delete_table.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.NotFound("Table not found.")) + op.execute(None) + mock_hook.assert_called_once_with() + mock_hook.return_value.delete_table.assert_called_once_with(PROJECT_ID, INSTANCE_ID, TABLE_ID) + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_deleting_table_when_instance_doesnt_exists(self, mock_hook): + op = BigtableTableDeleteOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + task_id="id" + ) + + mock_hook.return_value.get_instance.return_value = None + with self.assertRaises(AirflowException) as e: + op.execute(None) + err = e.exception + self.assertEqual(str(err), "Dependency: instance '{}' does not exist.".format(INSTANCE_ID)) + mock_hook.assert_called_once_with() + mock_hook.return_value.delete_table.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_different_error_reraised(self, mock_hook): + op = BigtableTableDeleteOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + task_id="id" + ) + mock_hook.return_value.delete_table.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.GoogleAPICallError('error')) + + with self.assertRaises(google.api_core.exceptions.GoogleAPICallError): + op.execute(None) + + mock_hook.assert_called_once_with() + mock_hook.return_value.delete_table.assert_called_once_with(PROJECT_ID, INSTANCE_ID, TABLE_ID) + + +class BigtableTableCreateTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_create_execute(self, mock_hook): + op = BigtableTableCreateOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + initial_split_keys=INITIAL_SPLIT_KEYS, + column_families=EMPTY_COLUMN_FAMILIES, + task_id="id" + ) + instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance) + op.execute(None) + mock_hook.assert_called_once_with() + mock_hook.return_value.create_table.assert_called_once_with( + instance, TABLE_ID, INITIAL_SPLIT_KEYS, EMPTY_COLUMN_FAMILIES) + + @parameterized.expand([ + ('project_id', '', INSTANCE_ID, TABLE_ID), + ('instance_id', PROJECT_ID, '', TABLE_ID), + ('table_id', PROJECT_ID, INSTANCE_ID, ''), + ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0]) + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id, mock_hook): + with self.assertRaises(AirflowException) as e: + BigtableTableCreateOperator( + project_id=project_id, + instance_id=instance_id, + table_id=table_id, + task_id="id" + ) + err = e.exception + self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute)) + mock_hook.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_instance_not_exists(self, mock_hook): + op = BigtableTableCreateOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + initial_split_keys=INITIAL_SPLIT_KEYS, + column_families=EMPTY_COLUMN_FAMILIES, + task_id="id" + ) + mock_hook.return_value.get_instance.return_value = None + with self.assertRaises(AirflowException) as e: + op.execute(None) + err = e.exception + self.assertEqual( + str(err), + "Dependency: instance '{}' does not exist in project '{}'.".format(INSTANCE_ID, PROJECT_ID) + ) + mock_hook.assert_called_once_with() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_creating_table_that_exists(self, mock_hook): + op = BigtableTableCreateOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + initial_split_keys=INITIAL_SPLIT_KEYS, + column_families=EMPTY_COLUMN_FAMILIES, + task_id="id" + ) + + mock_hook.return_value.get_column_families_for_table.return_value = EMPTY_COLUMN_FAMILIES + instance = mock_hook.return_value.get_instance.return_value = mock.Mock(Instance) + mock_hook.return_value.create_table.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.AlreadyExists("Table already exists.")) + op.execute(None) + + mock_hook.assert_called_once_with() + mock_hook.return_value.create_table.assert_called_once_with( + instance, TABLE_ID, INITIAL_SPLIT_KEYS, EMPTY_COLUMN_FAMILIES) + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_creating_table_that_exists_with_different_column_families_ids_in_the_table(self, mock_hook): + op = BigtableTableCreateOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + initial_split_keys=INITIAL_SPLIT_KEYS, + column_families=EMPTY_COLUMN_FAMILIES, + task_id="id" + ) + + mock_hook.return_value.get_column_families_for_table.return_value = {"existing_family": None} + mock_hook.return_value.create_table.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.AlreadyExists("Table already exists.")) + + with self.assertRaises(AirflowException) as e: + op.execute(None) + err = e.exception + self.assertEqual( + str(err), + "Table '{}' already exists with different Column Families.".format(TABLE_ID) + ) + mock_hook.assert_called_once_with() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_creating_table_that_exists_with_different_column_families_gc_rule_in_the_table(self, mock_hook): + op = BigtableTableCreateOperator( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + initial_split_keys=INITIAL_SPLIT_KEYS, + column_families={"cf-id": MaxVersionsGCRule(1)}, + task_id="id" + ) + + cf_mock = mock.Mock() + cf_mock.gc_rule = mock.Mock(return_value=MaxVersionsGCRule(2)) + + mock_hook.return_value.get_column_families_for_table.return_value = { + "cf-id": cf_mock + } + mock_hook.return_value.create_table.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.AlreadyExists("Table already exists.")) + + with self.assertRaises(AirflowException) as e: + op.execute(None) + err = e.exception + self.assertEqual( + str(err), + "Table '{}' already exists with different Column Families.".format(TABLE_ID) + ) + mock_hook.assert_called_once_with() + + +class BigtableWaitForTableReplicationTest(unittest.TestCase): + @parameterized.expand([ + ('project_id', '', INSTANCE_ID, TABLE_ID), + ('instance_id', PROJECT_ID, '', TABLE_ID), + ('table_id', PROJECT_ID, INSTANCE_ID, ''), + ], testcase_func_name=lambda f, n, p: 'test_empty_attribute.empty_' + p.args[0]) + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id, mock_hook): + with self.assertRaises(AirflowException) as e: + BigtableTableWaitForReplicationSensor( + project_id=project_id, + instance_id=instance_id, + table_id=table_id, + task_id="id" + ) + err = e.exception + self.assertEqual(str(err), 'Empty parameter: {}'.format(missing_attribute)) + mock_hook.assert_not_called() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_wait_no_instance(self, mock_hook): + mock_hook.return_value.get_instance.return_value = None + + op = BigtableTableWaitForReplicationSensor( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + task_id="id" + ) + self.assertFalse(op.poke(None)) + mock_hook.assert_called_once_with() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_wait_no_table(self, mock_hook): + mock_hook.return_value.get_instance.return_value = mock.Mock(Instance) + mock_hook.return_value.get_cluster_states_for_table.side_effect = mock.Mock( + side_effect=google.api_core.exceptions.NotFound("Table not found.")) + + op = BigtableTableWaitForReplicationSensor( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + task_id="id" + ) + self.assertFalse(op.poke(None)) + mock_hook.assert_called_once_with() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_wait_not_ready(self, mock_hook): + mock_hook.return_value.get_instance.return_value = mock.Mock(Instance) + mock_hook.return_value.get_cluster_states_for_table.return_value = { + "cl-id": ClusterState(0) + } + op = BigtableTableWaitForReplicationSensor( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + task_id="id" + ) + self.assertFalse(op.poke(None)) + mock_hook.assert_called_once_with() + + @mock.patch('airflow.contrib.operators.gcp_bigtable_operator.BigtableHook') + def test_wait_ready(self, mock_hook): + mock_hook.return_value.get_instance.return_value = mock.Mock(Instance) + mock_hook.return_value.get_cluster_states_for_table.return_value = { + "cl-id": ClusterState(4) + } + op = BigtableTableWaitForReplicationSensor( + project_id=PROJECT_ID, + instance_id=INSTANCE_ID, + table_id=TABLE_ID, + task_id="id" + ) + self.assertTrue(op.poke(None)) + mock_hook.assert_called_once_with()