[AIRFLOW-6105] [AIP-21] Rename Bigtable operators (#6862)
This commit is contained in:
Родитель
4912a8653b
Коммит
19939c92f8
12
UPDATING.md
12
UPDATING.md
|
@ -380,12 +380,12 @@ The following table shows changes in import paths.
|
|||
|airflow.contrib.operators.datastore_export_operator.DatastoreExportOperator |airflow.gcp.operators.datastore.DatastoreExportOperator |
|
||||
|airflow.contrib.operators.datastore_import_operator.DatastoreImportOperator |airflow.gcp.operators.datastore.DatastoreImportOperator |
|
||||
|airflow.contrib.operators.file_to_gcs.FileToGoogleCloudStorageOperator |airflow.operators.local_to_gcs.FileToGoogleCloudStorageOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator |airflow.gcp.operators.bigtable.BigtableClusterUpdateOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator |airflow.gcp.operators.bigtable.BigtableInstanceCreateOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator |airflow.gcp.operators.bigtable.BigtableInstanceDeleteOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableTableCreateOperator |airflow.gcp.operators.bigtable.BigtableTableCreateOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator |airflow.gcp.operators.bigtable.BigtableTableDeleteOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableTableWaitForReplicationSensor |airflow.gcp.sensors.bigtable.BigtableTableWaitForReplicationSensor |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator |airflow.gcp.operators.bigtable.BigtableUpdateClusterOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator |airflow.gcp.operators.bigtable.BigtableCreateInstanceOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator |airflow.gcp.operators.bigtable.BigtableDeleteInstanceOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableTableCreateOperator |airflow.gcp.operators.bigtable.BigtableCreateTableOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator |airflow.gcp.operators.bigtable.BigtableDeleteTableOperator |
|
||||
|airflow.contrib.operators.gcp_bigtable_operator.BigtableTableWaitForReplicationSensor |airflow.gcp.sensors.bigtable.BigtableTableReplicationCompletedSensor |
|
||||
|airflow.contrib.operators.gcp_cloud_build_operator.CloudBuildCreateBuildOperator |airflow.gcp.operators.cloud_build.CloudBuildCreateOperator
|
||||
|airflow.contrib.operators.gcp_compute_operator.GceBaseOperator |airflow.gcp.operators.compute.GceBaseOperator |
|
||||
|airflow.contrib.operators.gcp_compute_operator.GceInstanceGroupManagerUpdateTemplateOperator |airflow.gcp.operators.compute.GceInstanceGroupManagerUpdateTemplateOperator |
|
||||
|
|
|
@ -23,15 +23,105 @@ or `airflow.gcp.sensors.bigtable`.
|
|||
|
||||
import warnings
|
||||
|
||||
# pylint: disable=unused-import
|
||||
from airflow.gcp.operators.bigtable import ( # noqa
|
||||
BigtableClusterUpdateOperator, BigtableInstanceCreateOperator, BigtableInstanceDeleteOperator,
|
||||
BigtableTableCreateOperator, BigtableTableDeleteOperator, BigtableValidationMixin,
|
||||
from airflow.gcp.operators.bigtable import (
|
||||
BigtableCreateInstanceOperator, BigtableCreateTableOperator, BigtableDeleteInstanceOperator,
|
||||
BigtableDeleteTableOperator, BigtableUpdateClusterOperator,
|
||||
)
|
||||
from airflow.gcp.sensors.bigtable import BigtableTableWaitForReplicationSensor # noqa
|
||||
from airflow.gcp.sensors.bigtable import BigtableTableReplicationCompletedSensor
|
||||
|
||||
warnings.warn(
|
||||
"This module is deprecated. Please use `airflow.gcp.operators.bigtable`"
|
||||
" or `airflow.gcp.sensors.bigtable`.",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
|
||||
|
||||
class BigtableClusterUpdateOperator(BigtableUpdateClusterOperator):
|
||||
"""
|
||||
This class is deprecated.
|
||||
Please use `airflow.gcp.operators.bigtable.BigtableUpdateClusterOperator`.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
warnings.warn(
|
||||
"""This class is deprecated.
|
||||
Please use `airflow.gcp.operators.bigtable.BigtableUpdateClusterOperator`.""",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class BigtableInstanceCreateOperator(BigtableCreateInstanceOperator):
|
||||
"""
|
||||
This class is deprecated.
|
||||
Please use `airflow.gcp.operators.bigtable.BigtableCreateInstanceOperator`.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
warnings.warn(
|
||||
"""This class is deprecated.
|
||||
Please use `airflow.gcp.operators.bigtable.BigtableCreateInstanceOperator`.""",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class BigtableInstanceDeleteOperator(BigtableDeleteInstanceOperator):
|
||||
"""
|
||||
This class is deprecated.
|
||||
Please use `airflow.gcp.operators.bigtable.BigtableDeleteInstanceOperator`.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
warnings.warn(
|
||||
"""This class is deprecated.
|
||||
Please use `airflow.gcp.operators.bigtable.BigtableDeleteInstanceOperator`.""",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class BigtableTableCreateOperator(BigtableCreateTableOperator):
|
||||
"""
|
||||
This class is deprecated.
|
||||
Please use `airflow.gcp.operators.bigtable.BigtableCreateTableOperator`.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
warnings.warn(
|
||||
"""This class is deprecated.
|
||||
Please use `airflow.gcp.operators.bigtable.BigtableCreateTableOperator`.""",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class BigtableTableDeleteOperator(BigtableDeleteTableOperator):
|
||||
"""
|
||||
This class is deprecated.
|
||||
Please use `airflow.gcp.operators.bigtable.BigtableDeleteTableOperator`.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
warnings.warn(
|
||||
"""This class is deprecated.
|
||||
Please use `airflow.gcp.operators.bigtable.BigtableDeleteTableOperator`.""",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
# pylint: disable=too-many-ancestors
|
||||
class BigtableTableWaitForReplicationSensor(BigtableTableReplicationCompletedSensor):
|
||||
"""
|
||||
This class is deprecated.
|
||||
Please use `airflow.gcp.sensors.bigtable.BigtableTableReplicationCompletedSensor`.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
warnings.warn(
|
||||
"""This class is deprecated.
|
||||
Please use `airflow.gcp.sensors.bigtable.BigtableTableReplicationCompletedSensor`.""",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
|
|
@ -53,10 +53,10 @@ from os import getenv
|
|||
import airflow
|
||||
from airflow import models
|
||||
from airflow.gcp.operators.bigtable import (
|
||||
BigtableClusterUpdateOperator, BigtableInstanceCreateOperator, BigtableInstanceDeleteOperator,
|
||||
BigtableTableCreateOperator, BigtableTableDeleteOperator,
|
||||
BigtableCreateInstanceOperator, BigtableCreateTableOperator, BigtableDeleteInstanceOperator,
|
||||
BigtableDeleteTableOperator, BigtableUpdateClusterOperator,
|
||||
)
|
||||
from airflow.gcp.sensors.bigtable import BigtableTableWaitForReplicationSensor
|
||||
from airflow.gcp.sensors.bigtable import BigtableTableReplicationCompletedSensor
|
||||
|
||||
# [START howto_operator_gcp_bigtable_args]
|
||||
GCP_PROJECT_ID = getenv('GCP_PROJECT_ID', 'example-project')
|
||||
|
@ -83,7 +83,7 @@ with models.DAG(
|
|||
schedule_interval=None # Override to match your needs
|
||||
) as dag:
|
||||
# [START howto_operator_gcp_bigtable_instance_create]
|
||||
create_instance_task = BigtableInstanceCreateOperator(
|
||||
create_instance_task = BigtableCreateInstanceOperator(
|
||||
project_id=GCP_PROJECT_ID,
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
main_cluster_id=CBT_CLUSTER_ID,
|
||||
|
@ -95,7 +95,7 @@ with models.DAG(
|
|||
cluster_storage_type=int(CBT_CLUSTER_STORAGE_TYPE),
|
||||
task_id='create_instance_task',
|
||||
)
|
||||
create_instance_task2 = BigtableInstanceCreateOperator(
|
||||
create_instance_task2 = BigtableCreateInstanceOperator(
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
main_cluster_id=CBT_CLUSTER_ID,
|
||||
main_cluster_zone=CBT_CLUSTER_ZONE,
|
||||
|
@ -110,14 +110,14 @@ with models.DAG(
|
|||
# [END howto_operator_gcp_bigtable_instance_create]
|
||||
|
||||
# [START howto_operator_gcp_bigtable_cluster_update]
|
||||
cluster_update_task = BigtableClusterUpdateOperator(
|
||||
cluster_update_task = BigtableUpdateClusterOperator(
|
||||
project_id=GCP_PROJECT_ID,
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
cluster_id=CBT_CLUSTER_ID,
|
||||
nodes=int(CBT_CLUSTER_NODES_UPDATED),
|
||||
task_id='update_cluster_task',
|
||||
)
|
||||
cluster_update_task2 = BigtableClusterUpdateOperator(
|
||||
cluster_update_task2 = BigtableUpdateClusterOperator(
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
cluster_id=CBT_CLUSTER_ID,
|
||||
nodes=int(CBT_CLUSTER_NODES_UPDATED),
|
||||
|
@ -127,25 +127,25 @@ with models.DAG(
|
|||
# [END howto_operator_gcp_bigtable_cluster_update]
|
||||
|
||||
# [START howto_operator_gcp_bigtable_instance_delete]
|
||||
delete_instance_task = BigtableInstanceDeleteOperator(
|
||||
delete_instance_task = BigtableDeleteInstanceOperator(
|
||||
project_id=GCP_PROJECT_ID,
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
task_id='delete_instance_task',
|
||||
)
|
||||
delete_instance_task2 = BigtableInstanceDeleteOperator(
|
||||
delete_instance_task2 = BigtableDeleteInstanceOperator(
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
task_id='delete_instance_task2',
|
||||
)
|
||||
# [END howto_operator_gcp_bigtable_instance_delete]
|
||||
|
||||
# [START howto_operator_gcp_bigtable_table_create]
|
||||
create_table_task = BigtableTableCreateOperator(
|
||||
create_table_task = BigtableCreateTableOperator(
|
||||
project_id=GCP_PROJECT_ID,
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
table_id=CBT_TABLE_ID,
|
||||
task_id='create_table',
|
||||
)
|
||||
create_table_task2 = BigtableTableCreateOperator(
|
||||
create_table_task2 = BigtableCreateTableOperator(
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
table_id=CBT_TABLE_ID,
|
||||
task_id='create_table_task2',
|
||||
|
@ -154,7 +154,7 @@ with models.DAG(
|
|||
# [END howto_operator_gcp_bigtable_table_create]
|
||||
|
||||
# [START howto_operator_gcp_bigtable_table_wait_for_replication]
|
||||
wait_for_table_replication_task = BigtableTableWaitForReplicationSensor(
|
||||
wait_for_table_replication_task = BigtableTableReplicationCompletedSensor(
|
||||
project_id=GCP_PROJECT_ID,
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
table_id=CBT_TABLE_ID,
|
||||
|
@ -162,7 +162,7 @@ with models.DAG(
|
|||
timeout=180,
|
||||
task_id='wait_for_table_replication_task',
|
||||
)
|
||||
wait_for_table_replication_task2 = BigtableTableWaitForReplicationSensor(
|
||||
wait_for_table_replication_task2 = BigtableTableReplicationCompletedSensor(
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
table_id=CBT_TABLE_ID,
|
||||
poke_interval=int(CBT_POKE_INTERVAL),
|
||||
|
@ -172,13 +172,13 @@ with models.DAG(
|
|||
# [END howto_operator_gcp_bigtable_table_wait_for_replication]
|
||||
|
||||
# [START howto_operator_gcp_bigtable_table_delete]
|
||||
delete_table_task = BigtableTableDeleteOperator(
|
||||
delete_table_task = BigtableDeleteTableOperator(
|
||||
project_id=GCP_PROJECT_ID,
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
table_id=CBT_TABLE_ID,
|
||||
task_id='delete_table_task',
|
||||
)
|
||||
delete_table_task2 = BigtableTableDeleteOperator(
|
||||
delete_table_task2 = BigtableDeleteTableOperator(
|
||||
instance_id=CBT_INSTANCE_ID,
|
||||
table_id=CBT_TABLE_ID,
|
||||
task_id='delete_table_task2',
|
||||
|
|
|
@ -44,7 +44,7 @@ class BigtableValidationMixin:
|
|||
raise AirflowException('Empty parameter: {}'.format(attr_name))
|
||||
|
||||
|
||||
class BigtableInstanceCreateOperator(BaseOperator, BigtableValidationMixin):
|
||||
class BigtableCreateInstanceOperator(BaseOperator, BigtableValidationMixin):
|
||||
"""
|
||||
Creates a new Cloud Bigtable instance.
|
||||
If the Cloud Bigtable instance with the given ID exists, the operator does not
|
||||
|
@ -56,7 +56,7 @@ class BigtableInstanceCreateOperator(BaseOperator, BigtableValidationMixin):
|
|||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:BigtableInstanceCreateOperator`
|
||||
:ref:`howto/operator:BigtableCreateInstanceOperator`
|
||||
|
||||
:type instance_id: str
|
||||
:param instance_id: The ID of the Cloud Bigtable instance to create.
|
||||
|
@ -161,7 +161,7 @@ class BigtableInstanceCreateOperator(BaseOperator, BigtableValidationMixin):
|
|||
raise e
|
||||
|
||||
|
||||
class BigtableInstanceDeleteOperator(BaseOperator, BigtableValidationMixin):
|
||||
class BigtableDeleteInstanceOperator(BaseOperator, BigtableValidationMixin):
|
||||
"""
|
||||
Deletes the Cloud Bigtable instance, including its clusters and all related tables.
|
||||
|
||||
|
@ -170,7 +170,7 @@ class BigtableInstanceDeleteOperator(BaseOperator, BigtableValidationMixin):
|
|||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:BigtableInstanceDeleteOperator`
|
||||
:ref:`howto/operator:BigtableDeleteInstanceOperator`
|
||||
|
||||
:type instance_id: str
|
||||
:param instance_id: The ID of the Cloud Bigtable instance to delete.
|
||||
|
@ -211,7 +211,7 @@ class BigtableInstanceDeleteOperator(BaseOperator, BigtableValidationMixin):
|
|||
raise e
|
||||
|
||||
|
||||
class BigtableTableCreateOperator(BaseOperator, BigtableValidationMixin):
|
||||
class BigtableCreateTableOperator(BaseOperator, BigtableValidationMixin):
|
||||
"""
|
||||
Creates the table in the Cloud Bigtable instance.
|
||||
|
||||
|
@ -220,7 +220,7 @@ class BigtableTableCreateOperator(BaseOperator, BigtableValidationMixin):
|
|||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:BigtableTableCreateOperator`
|
||||
:ref:`howto/operator:BigtableCreateTableOperator`
|
||||
|
||||
:type instance_id: str
|
||||
:param instance_id: The ID of the Cloud Bigtable instance that will
|
||||
|
@ -307,7 +307,7 @@ class BigtableTableCreateOperator(BaseOperator, BigtableValidationMixin):
|
|||
self.table_id)
|
||||
|
||||
|
||||
class BigtableTableDeleteOperator(BaseOperator, BigtableValidationMixin):
|
||||
class BigtableDeleteTableOperator(BaseOperator, BigtableValidationMixin):
|
||||
"""
|
||||
Deletes the Cloud Bigtable table.
|
||||
|
||||
|
@ -316,7 +316,7 @@ class BigtableTableDeleteOperator(BaseOperator, BigtableValidationMixin):
|
|||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:BigtableTableDeleteOperator`
|
||||
:ref:`howto/operator:BigtableDeleteTableOperator`
|
||||
|
||||
:type instance_id: str
|
||||
:param instance_id: The ID of the Cloud Bigtable instance.
|
||||
|
@ -372,7 +372,7 @@ class BigtableTableDeleteOperator(BaseOperator, BigtableValidationMixin):
|
|||
raise e
|
||||
|
||||
|
||||
class BigtableClusterUpdateOperator(BaseOperator, BigtableValidationMixin):
|
||||
class BigtableUpdateClusterOperator(BaseOperator, BigtableValidationMixin):
|
||||
"""
|
||||
Updates a Cloud Bigtable cluster.
|
||||
|
||||
|
@ -382,7 +382,7 @@ class BigtableClusterUpdateOperator(BaseOperator, BigtableValidationMixin):
|
|||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:BigtableClusterUpdateOperator`
|
||||
:ref:`howto/operator:BigtableUpdateClusterOperator`
|
||||
|
||||
:type instance_id: str
|
||||
:param instance_id: The ID of the Cloud Bigtable instance.
|
||||
|
|
|
@ -31,7 +31,7 @@ from airflow.sensors.base_sensor_operator import BaseSensorOperator
|
|||
from airflow.utils.decorators import apply_defaults
|
||||
|
||||
|
||||
class BigtableTableWaitForReplicationSensor(BaseSensorOperator, BigtableValidationMixin):
|
||||
class BigtableTableReplicationCompletedSensor(BaseSensorOperator, BigtableValidationMixin):
|
||||
"""
|
||||
Sensor that waits for Cloud Bigtable table to be fully replicated to its clusters.
|
||||
No exception will be raised if the instance or the table does not exist.
|
||||
|
@ -41,7 +41,7 @@ class BigtableTableWaitForReplicationSensor(BaseSensorOperator, BigtableValidati
|
|||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:BigtableTableWaitForReplicationSensor`
|
||||
:ref:`howto/operator:BigtableTableReplicationCompletedSensor`
|
||||
|
||||
:type instance_id: str
|
||||
:param instance_id: The ID of the Cloud Bigtable instance.
|
||||
|
|
|
@ -40,12 +40,12 @@ All examples below rely on the following variables, which can be passed via envi
|
|||
:start-after: [START howto_operator_gcp_bigtable_args]
|
||||
:end-before: [END howto_operator_gcp_bigtable_args]
|
||||
|
||||
.. _howto/operator:BigtableInstanceCreateOperator:
|
||||
.. _howto/operator:BigtableCreateInstanceOperator:
|
||||
|
||||
BigtableInstanceCreateOperator
|
||||
BigtableCreateInstanceOperator
|
||||
------------------------------
|
||||
|
||||
Use the :class:`~airflow.gcp.operators.bigtable.BigtableInstanceCreateOperator`
|
||||
Use the :class:`~airflow.gcp.operators.bigtable.BigtableCreateInstanceOperator`
|
||||
to create a Google Cloud Bigtable instance.
|
||||
|
||||
If the Cloud Bigtable instance with the given ID exists, the operator does not compare its configuration
|
||||
|
@ -63,12 +63,12 @@ it will be retrieved from the GCP connection used. Both variants are shown:
|
|||
:start-after: [START howto_operator_gcp_bigtable_instance_create]
|
||||
:end-before: [END howto_operator_gcp_bigtable_instance_create]
|
||||
|
||||
.. _howto/operator:BigtableInstanceDeleteOperator:
|
||||
.. _howto/operator:BigtableDeleteInstanceOperator:
|
||||
|
||||
BigtableInstanceDeleteOperator
|
||||
BigtableDeleteInstanceOperator
|
||||
------------------------------
|
||||
|
||||
Use the :class:`~airflow.gcp.operators.bigtable.BigtableInstanceDeleteOperator`
|
||||
Use the :class:`~airflow.gcp.operators.bigtable.BigtableDeleteInstanceOperator`
|
||||
to delete a Google Cloud Bigtable instance.
|
||||
|
||||
Using the operator
|
||||
|
@ -83,12 +83,12 @@ it will be retrieved from the GCP connection used. Both variants are shown:
|
|||
:start-after: [START howto_operator_gcp_bigtable_instance_delete]
|
||||
:end-before: [END howto_operator_gcp_bigtable_instance_delete]
|
||||
|
||||
.. _howto/operator:BigtableClusterUpdateOperator:
|
||||
.. _howto/operator:BigtableUpdateClusterOperator:
|
||||
|
||||
BigtableClusterUpdateOperator
|
||||
BigtableUpdateClusterOperator
|
||||
-----------------------------
|
||||
|
||||
Use the :class:`~airflow.gcp.operators.bigtable.BigtableClusterUpdateOperator`
|
||||
Use the :class:`~airflow.gcp.operators.bigtable.BigtableUpdateClusterOperator`
|
||||
to modify number of nodes in a Cloud Bigtable cluster.
|
||||
|
||||
Using the operator
|
||||
|
@ -103,9 +103,9 @@ it will be retrieved from the GCP connection used. Both variants are shown:
|
|||
:start-after: [START howto_operator_gcp_bigtable_cluster_update]
|
||||
:end-before: [END howto_operator_gcp_bigtable_cluster_update]
|
||||
|
||||
.. _howto/operator:BigtableTableCreateOperator:
|
||||
.. _howto/operator:BigtableCreateTableOperator:
|
||||
|
||||
BigtableTableCreateOperator
|
||||
BigtableCreateTableOperator
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Creates a table in a Cloud Bigtable instance.
|
||||
|
@ -135,12 +135,12 @@ Please refer to the Python Client for Google Cloud Bigtable documentation
|
|||
`for Table <https://googleapis.github.io/google-cloud-python/latest/bigtable/table.html>`_ and `for Column
|
||||
Families <https://googleapis.github.io/google-cloud-python/latest/bigtable/column-family.html>`_.
|
||||
|
||||
.. _howto/operator:BigtableTableDeleteOperator:
|
||||
.. _howto/operator:BigtableDeleteTableOperator:
|
||||
|
||||
BigtableTableDeleteOperator
|
||||
BigtableDeleteTableOperator
|
||||
---------------------------
|
||||
|
||||
Use the :class:`~airflow.gcp.operators.bigtable.BigtableTableDeleteOperator`
|
||||
Use the :class:`~airflow.gcp.operators.bigtable.BigtableDeleteTableOperator`
|
||||
to delete a table in Google Cloud Bigtable.
|
||||
|
||||
Using the operator
|
||||
|
@ -155,18 +155,18 @@ it will be retrieved from the GCP connection used. Both variants are shown:
|
|||
:start-after: [START howto_operator_gcp_bigtable_table_delete]
|
||||
:end-before: [END howto_operator_gcp_bigtable_table_delete]
|
||||
|
||||
.. _howto/operator:BigtableTableWaitForReplicationSensor:
|
||||
.. _howto/operator:BigtableTableReplicationCompletedSensor:
|
||||
|
||||
BigtableTableWaitForReplicationSensor
|
||||
-------------------------------------
|
||||
BigtableTableReplicationCompletedSensor
|
||||
---------------------------------------
|
||||
|
||||
You can create the operator with or without project id. If project id is missing
|
||||
it will be retrieved from the GCP connection used. Both variants are shown:
|
||||
|
||||
Use the :class:`~airflow.gcp.sensors.bigtable.BigtableTableWaitForReplicationSensor`
|
||||
Use the :class:`~airflow.gcp.sensors.bigtable.BigtableTableReplicationCompletedSensor`
|
||||
to wait for the table to replicate fully.
|
||||
|
||||
The same arguments apply to this sensor as the BigtableTableCreateOperator_.
|
||||
The same arguments apply to this sensor as the BigtableCreateTableOperator.
|
||||
|
||||
**Note:** If the table or the Cloud Bigtable instance does not exist, this sensor waits for the table until
|
||||
timeout hits and does not raise any exception.
|
||||
|
|
|
@ -27,8 +27,8 @@ from parameterized import parameterized
|
|||
|
||||
from airflow import AirflowException
|
||||
from airflow.gcp.operators.bigtable import (
|
||||
BigtableClusterUpdateOperator, BigtableInstanceCreateOperator, BigtableInstanceDeleteOperator,
|
||||
BigtableTableCreateOperator, BigtableTableDeleteOperator,
|
||||
BigtableCreateInstanceOperator, BigtableCreateTableOperator, BigtableDeleteInstanceOperator,
|
||||
BigtableDeleteTableOperator, BigtableUpdateClusterOperator,
|
||||
)
|
||||
from tests.compat import mock
|
||||
|
||||
|
@ -54,7 +54,7 @@ class TestBigtableInstanceCreate(unittest.TestCase):
|
|||
main_cluster_id,
|
||||
main_cluster_zone, mock_hook):
|
||||
with self.assertRaises(AirflowException) as e:
|
||||
BigtableInstanceCreateOperator(
|
||||
BigtableCreateInstanceOperator(
|
||||
project_id=project_id,
|
||||
instance_id=instance_id,
|
||||
main_cluster_id=main_cluster_id,
|
||||
|
@ -70,7 +70,7 @@ class TestBigtableInstanceCreate(unittest.TestCase):
|
|||
def test_create_instance_that_exists(self, mock_hook):
|
||||
mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
|
||||
|
||||
op = BigtableInstanceCreateOperator(
|
||||
op = BigtableCreateInstanceOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
main_cluster_id=CLUSTER_ID,
|
||||
|
@ -87,7 +87,7 @@ class TestBigtableInstanceCreate(unittest.TestCase):
|
|||
def test_create_instance_that_exists_empty_project_id(self, mock_hook):
|
||||
mock_hook.return_value.get_instance.return_value = mock.Mock(Instance)
|
||||
|
||||
op = BigtableInstanceCreateOperator(
|
||||
op = BigtableCreateInstanceOperator(
|
||||
instance_id=INSTANCE_ID,
|
||||
main_cluster_id=CLUSTER_ID,
|
||||
main_cluster_zone=CLUSTER_ZONE,
|
||||
|
@ -102,7 +102,7 @@ class TestBigtableInstanceCreate(unittest.TestCase):
|
|||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_different_error_reraised(self, mock_hook):
|
||||
mock_hook.return_value.get_instance.return_value = None
|
||||
op = BigtableInstanceCreateOperator(
|
||||
op = BigtableCreateInstanceOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
main_cluster_id=CLUSTER_ID,
|
||||
|
@ -144,7 +144,7 @@ class TestBigtableClusterUpdate(unittest.TestCase):
|
|||
def test_empty_attribute(self, missing_attribute, project_id, instance_id,
|
||||
cluster_id, nodes, mock_hook):
|
||||
with self.assertRaises(AirflowException) as e:
|
||||
BigtableClusterUpdateOperator(
|
||||
BigtableUpdateClusterOperator(
|
||||
project_id=project_id,
|
||||
instance_id=instance_id,
|
||||
cluster_id=cluster_id,
|
||||
|
@ -161,7 +161,7 @@ class TestBigtableClusterUpdate(unittest.TestCase):
|
|||
mock_hook.return_value.get_instance.return_value = None
|
||||
|
||||
with self.assertRaises(AirflowException) as e:
|
||||
op = BigtableClusterUpdateOperator(
|
||||
op = BigtableUpdateClusterOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
cluster_id=CLUSTER_ID,
|
||||
|
@ -183,7 +183,7 @@ class TestBigtableClusterUpdate(unittest.TestCase):
|
|||
mock_hook.return_value.get_instance.return_value = None
|
||||
|
||||
with self.assertRaises(AirflowException) as e:
|
||||
op = BigtableClusterUpdateOperator(
|
||||
op = BigtableUpdateClusterOperator(
|
||||
instance_id=INSTANCE_ID,
|
||||
cluster_id=CLUSTER_ID,
|
||||
nodes=NODES,
|
||||
|
@ -205,7 +205,7 @@ class TestBigtableClusterUpdate(unittest.TestCase):
|
|||
side_effect=google.api_core.exceptions.NotFound("Cluster not found."))
|
||||
|
||||
with self.assertRaises(AirflowException) as e:
|
||||
op = BigtableClusterUpdateOperator(
|
||||
op = BigtableUpdateClusterOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
cluster_id=CLUSTER_ID,
|
||||
|
@ -232,7 +232,7 @@ class TestBigtableClusterUpdate(unittest.TestCase):
|
|||
side_effect=google.api_core.exceptions.NotFound("Cluster not found."))
|
||||
|
||||
with self.assertRaises(AirflowException) as e:
|
||||
op = BigtableClusterUpdateOperator(
|
||||
op = BigtableUpdateClusterOperator(
|
||||
instance_id=INSTANCE_ID,
|
||||
cluster_id=CLUSTER_ID,
|
||||
nodes=NODES,
|
||||
|
@ -253,7 +253,7 @@ class TestBigtableClusterUpdate(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_different_error_reraised(self, mock_hook):
|
||||
op = BigtableClusterUpdateOperator(
|
||||
op = BigtableUpdateClusterOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
cluster_id=CLUSTER_ID,
|
||||
|
@ -276,7 +276,7 @@ class TestBigtableClusterUpdate(unittest.TestCase):
|
|||
class TestBigtableInstanceDelete(unittest.TestCase):
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_delete_execute(self, mock_hook):
|
||||
op = BigtableInstanceDeleteOperator(
|
||||
op = BigtableDeleteInstanceOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
task_id="id",
|
||||
|
@ -290,7 +290,7 @@ class TestBigtableInstanceDelete(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_delete_execute_empty_project_id(self, mock_hook):
|
||||
op = BigtableInstanceDeleteOperator(
|
||||
op = BigtableDeleteInstanceOperator(
|
||||
instance_id=INSTANCE_ID,
|
||||
task_id="id",
|
||||
gcp_conn_id=GCP_CONN_ID
|
||||
|
@ -307,7 +307,7 @@ class TestBigtableInstanceDelete(unittest.TestCase):
|
|||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_empty_attribute(self, missing_attribute, project_id, instance_id, mock_hook):
|
||||
with self.assertRaises(AirflowException) as e:
|
||||
BigtableInstanceDeleteOperator(
|
||||
BigtableDeleteInstanceOperator(
|
||||
project_id=project_id,
|
||||
instance_id=instance_id,
|
||||
task_id="id"
|
||||
|
@ -318,7 +318,7 @@ class TestBigtableInstanceDelete(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_deleting_instance_that_doesnt_exists(self, mock_hook):
|
||||
op = BigtableInstanceDeleteOperator(
|
||||
op = BigtableDeleteInstanceOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
task_id="id",
|
||||
|
@ -334,7 +334,7 @@ class TestBigtableInstanceDelete(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_deleting_instance_that_doesnt_exists_empty_project_id(self, mock_hook):
|
||||
op = BigtableInstanceDeleteOperator(
|
||||
op = BigtableDeleteInstanceOperator(
|
||||
instance_id=INSTANCE_ID,
|
||||
task_id="id",
|
||||
gcp_conn_id=GCP_CONN_ID
|
||||
|
@ -349,7 +349,7 @@ class TestBigtableInstanceDelete(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_different_error_reraised(self, mock_hook):
|
||||
op = BigtableInstanceDeleteOperator(
|
||||
op = BigtableDeleteInstanceOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
task_id="id",
|
||||
|
@ -370,7 +370,7 @@ class TestBigtableInstanceDelete(unittest.TestCase):
|
|||
class TestBigtableTableDelete(unittest.TestCase):
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_delete_execute(self, mock_hook):
|
||||
op = BigtableTableDeleteOperator(
|
||||
op = BigtableDeleteTableOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -392,7 +392,7 @@ class TestBigtableTableDelete(unittest.TestCase):
|
|||
def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id,
|
||||
mock_hook):
|
||||
with self.assertRaises(AirflowException) as e:
|
||||
BigtableTableDeleteOperator(
|
||||
BigtableDeleteTableOperator(
|
||||
project_id=project_id,
|
||||
instance_id=instance_id,
|
||||
table_id=table_id,
|
||||
|
@ -405,7 +405,7 @@ class TestBigtableTableDelete(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_deleting_table_that_doesnt_exists(self, mock_hook):
|
||||
op = BigtableTableDeleteOperator(
|
||||
op = BigtableDeleteTableOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -424,7 +424,7 @@ class TestBigtableTableDelete(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_deleting_table_that_doesnt_exists_empty_project_id(self, mock_hook):
|
||||
op = BigtableTableDeleteOperator(
|
||||
op = BigtableDeleteTableOperator(
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
task_id="id",
|
||||
|
@ -442,7 +442,7 @@ class TestBigtableTableDelete(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_deleting_table_when_instance_doesnt_exists(self, mock_hook):
|
||||
op = BigtableTableDeleteOperator(
|
||||
op = BigtableDeleteTableOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -461,7 +461,7 @@ class TestBigtableTableDelete(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_different_error_reraised(self, mock_hook):
|
||||
op = BigtableTableDeleteOperator(
|
||||
op = BigtableDeleteTableOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -484,7 +484,7 @@ class TestBigtableTableDelete(unittest.TestCase):
|
|||
class TestBigtableTableCreate(unittest.TestCase):
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_create_execute(self, mock_hook):
|
||||
op = BigtableTableCreateOperator(
|
||||
op = BigtableCreateTableOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -510,7 +510,7 @@ class TestBigtableTableCreate(unittest.TestCase):
|
|||
def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id,
|
||||
mock_hook):
|
||||
with self.assertRaises(AirflowException) as e:
|
||||
BigtableTableCreateOperator(
|
||||
BigtableCreateTableOperator(
|
||||
project_id=project_id,
|
||||
instance_id=instance_id,
|
||||
table_id=table_id,
|
||||
|
@ -523,7 +523,7 @@ class TestBigtableTableCreate(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_instance_not_exists(self, mock_hook):
|
||||
op = BigtableTableCreateOperator(
|
||||
op = BigtableCreateTableOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -545,7 +545,7 @@ class TestBigtableTableCreate(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_creating_table_that_exists(self, mock_hook):
|
||||
op = BigtableTableCreateOperator(
|
||||
op = BigtableCreateTableOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -571,7 +571,7 @@ class TestBigtableTableCreate(unittest.TestCase):
|
|||
|
||||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_creating_table_that_exists_empty_project_id(self, mock_hook):
|
||||
op = BigtableTableCreateOperator(
|
||||
op = BigtableCreateTableOperator(
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
initial_split_keys=INITIAL_SPLIT_KEYS,
|
||||
|
@ -597,7 +597,7 @@ class TestBigtableTableCreate(unittest.TestCase):
|
|||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_creating_table_that_exists_with_different_column_families_ids_in_the_table(
|
||||
self, mock_hook):
|
||||
op = BigtableTableCreateOperator(
|
||||
op = BigtableCreateTableOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -624,7 +624,7 @@ class TestBigtableTableCreate(unittest.TestCase):
|
|||
@mock.patch('airflow.gcp.operators.bigtable.BigtableHook')
|
||||
def test_creating_table_that_exists_with_different_column_families_gc_rule_in__table(
|
||||
self, mock_hook):
|
||||
op = BigtableTableCreateOperator(
|
||||
op = BigtableCreateTableOperator(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
|
|
@ -25,7 +25,7 @@ from google.cloud.bigtable.table import ClusterState
|
|||
from parameterized import parameterized
|
||||
|
||||
from airflow import AirflowException
|
||||
from airflow.gcp.sensors.bigtable import BigtableTableWaitForReplicationSensor
|
||||
from airflow.gcp.sensors.bigtable import BigtableTableReplicationCompletedSensor
|
||||
from tests.compat import mock
|
||||
|
||||
PROJECT_ID = 'test_project_id'
|
||||
|
@ -43,7 +43,7 @@ class BigtableWaitForTableReplicationTest(unittest.TestCase):
|
|||
def test_empty_attribute(self, missing_attribute, project_id, instance_id, table_id,
|
||||
mock_hook):
|
||||
with self.assertRaises(AirflowException) as e:
|
||||
BigtableTableWaitForReplicationSensor(
|
||||
BigtableTableReplicationCompletedSensor(
|
||||
project_id=project_id,
|
||||
instance_id=instance_id,
|
||||
table_id=table_id,
|
||||
|
@ -58,7 +58,7 @@ class BigtableWaitForTableReplicationTest(unittest.TestCase):
|
|||
def test_wait_no_instance(self, mock_hook):
|
||||
mock_hook.return_value.get_instance.return_value = None
|
||||
|
||||
op = BigtableTableWaitForReplicationSensor(
|
||||
op = BigtableTableReplicationCompletedSensor(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -74,7 +74,7 @@ class BigtableWaitForTableReplicationTest(unittest.TestCase):
|
|||
mock_hook.return_value.get_cluster_states_for_table.side_effect = mock.Mock(
|
||||
side_effect=google.api_core.exceptions.NotFound("Table not found."))
|
||||
|
||||
op = BigtableTableWaitForReplicationSensor(
|
||||
op = BigtableTableReplicationCompletedSensor(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -90,7 +90,7 @@ class BigtableWaitForTableReplicationTest(unittest.TestCase):
|
|||
mock_hook.return_value.get_cluster_states_for_table.return_value = {
|
||||
"cl-id": ClusterState(0)
|
||||
}
|
||||
op = BigtableTableWaitForReplicationSensor(
|
||||
op = BigtableTableReplicationCompletedSensor(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
@ -106,7 +106,7 @@ class BigtableWaitForTableReplicationTest(unittest.TestCase):
|
|||
mock_hook.return_value.get_cluster_states_for_table.return_value = {
|
||||
"cl-id": ClusterState(4)
|
||||
}
|
||||
op = BigtableTableWaitForReplicationSensor(
|
||||
op = BigtableTableReplicationCompletedSensor(
|
||||
project_id=PROJECT_ID,
|
||||
instance_id=INSTANCE_ID,
|
||||
table_id=TABLE_ID,
|
||||
|
|
|
@ -183,23 +183,23 @@ OPERATOR = [
|
|||
"airflow.contrib.operators.file_to_gcs.FileToGoogleCloudStorageOperator",
|
||||
),
|
||||
(
|
||||
"airflow.gcp.operators.bigtable.BigtableClusterUpdateOperator",
|
||||
"airflow.gcp.operators.bigtable.BigtableUpdateClusterOperator",
|
||||
"airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator",
|
||||
),
|
||||
(
|
||||
"airflow.gcp.operators.bigtable.BigtableInstanceCreateOperator",
|
||||
"airflow.gcp.operators.bigtable.BigtableCreateInstanceOperator",
|
||||
"airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator",
|
||||
),
|
||||
(
|
||||
"airflow.gcp.operators.bigtable.BigtableInstanceDeleteOperator",
|
||||
"airflow.gcp.operators.bigtable.BigtableDeleteInstanceOperator",
|
||||
"airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator",
|
||||
),
|
||||
(
|
||||
"airflow.gcp.operators.bigtable.BigtableTableCreateOperator",
|
||||
"airflow.gcp.operators.bigtable.BigtableCreateTableOperator",
|
||||
"airflow.contrib.operators.gcp_bigtable_operator.BigtableTableCreateOperator",
|
||||
),
|
||||
(
|
||||
"airflow.gcp.operators.bigtable.BigtableTableDeleteOperator",
|
||||
"airflow.gcp.operators.bigtable.BigtableDeleteTableOperator",
|
||||
"airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator",
|
||||
),
|
||||
(
|
||||
|
@ -768,7 +768,7 @@ OPERATOR = [
|
|||
]
|
||||
SENSOR = [
|
||||
(
|
||||
"airflow.gcp.sensors.bigtable.BigtableTableWaitForReplicationSensor",
|
||||
"airflow.gcp.sensors.bigtable.BigtableTableReplicationCompletedSensor",
|
||||
"airflow.contrib.operators.gcp_bigtable_operator."
|
||||
"BigtableTableWaitForReplicationSensor",
|
||||
),
|
||||
|
|
Загрузка…
Ссылка в новой задаче