[AIRFLOW-5013] Add GCP Data Catalog Hook and operators (#7664)

This commit is contained in:
Kamil Breguła 2020-03-09 23:16:32 +01:00 коммит произвёл GitHub
Родитель 29e848d846
Коммит bf9b6b6d70
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
9 изменённых файлов: 6688 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,443 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that interacts with Google Data Catalog service
"""
from google.cloud.datacatalog_v1beta1.proto.tags_pb2 import FieldType, TagField, TagTemplateField
from airflow import models
from airflow.models.baseoperator import chain
from airflow.operators.bash_operator import BashOperator
from airflow.providers.google.cloud.operators.datacatalog import (
CloudDataCatalogCreateEntryGroupOperator, CloudDataCatalogCreateEntryOperator,
CloudDataCatalogCreateTagOperator, CloudDataCatalogCreateTagTemplateFieldOperator,
CloudDataCatalogCreateTagTemplateOperator, CloudDataCatalogDeleteEntryGroupOperator,
CloudDataCatalogDeleteEntryOperator, CloudDataCatalogDeleteTagOperator,
CloudDataCatalogDeleteTagTemplateFieldOperator, CloudDataCatalogDeleteTagTemplateOperator,
CloudDataCatalogGetEntryGroupOperator, CloudDataCatalogGetEntryOperator,
CloudDataCatalogGetTagTemplateOperator, CloudDataCatalogListTagsOperator,
CloudDataCatalogLookupEntryOperator, CloudDataCatalogRenameTagTemplateFieldOperator,
CloudDataCatalogSearchCatalogOperator, CloudDataCatalogUpdateEntryOperator,
CloudDataCatalogUpdateTagOperator, CloudDataCatalogUpdateTagTemplateFieldOperator,
CloudDataCatalogUpdateTagTemplateOperator,
)
from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)}
PROJECT_ID = "polidea-airflow"
LOCATION = "us-central1"
ENTRY_GROUP_ID = "important_data_jan_2019"
ENTRY_ID = "python_files"
TEMPLATE_ID = "template_id"
FIELD_NAME_1 = "first"
FIELD_NAME_2 = "second"
FIELD_NAME_3 = "first-rename"
with models.DAG("example_gcp_datacatalog", default_args=default_args, schedule_interval=None) as dag:
# Create
# [START howto_operator_gcp_datacatalog_create_entry_group]
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
task_id="create_entry_group",
location=LOCATION,
entry_group_id=ENTRY_GROUP_ID,
entry_group={"display_name": "analytics data - jan 2011"},
)
# [END howto_operator_gcp_datacatalog_create_entry_group]
# [START howto_operator_gcp_datacatalog_create_entry_group_result]
create_entry_group_result = BashOperator(
task_id="create_entry_group_result",
bash_command="echo \"{{ task_instance.xcom_pull('create_entry_group', key='entry_group_id') }}\"",
)
# [END howto_operator_gcp_datacatalog_create_entry_group_result]
# [START howto_operator_gcp_datacatalog_create_entry_group_result2]
create_entry_group_result2 = BashOperator(
task_id="create_entry_group_result2",
bash_command="echo \"{{ task_instance.xcom_pull('create_entry_group') }}\"",
)
# [END howto_operator_gcp_datacatalog_create_entry_group_result2]
# [START howto_operator_gcp_datacatalog_create_entry_gcs]
create_entry_gcs = CloudDataCatalogCreateEntryOperator(
task_id="create_entry_gcs",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
entry={
"display_name": "Wizard",
"type": "FILESET",
"gcs_fileset_spec": {"file_patterns": ["gs://test-datacatalog/**"]},
},
)
# [END howto_operator_gcp_datacatalog_create_entry_gcs]
# [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
create_entry_gcs_result = BashOperator(
task_id="create_entry_gcs_result",
bash_command="echo \"{{ task_instance.xcom_pull('create_entry_gcs', key='entry_id') }}\"",
)
# [END howto_operator_gcp_datacatalog_create_entry_gcs_result]
# [START howto_operator_gcp_datacatalog_create_entry_gcs_result2]
create_entry_gcs_result2 = BashOperator(
task_id="create_entry_gcs_result2",
bash_command="echo \"{{ task_instance.xcom_pull('create_entry_gcs') }}\"",
)
# [END howto_operator_gcp_datacatalog_create_entry_gcs_result2]
# [START howto_operator_gcp_datacatalog_create_tag]
create_tag = CloudDataCatalogCreateTagOperator(
task_id="create_tag",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
template_id=TEMPLATE_ID,
tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
)
# [END howto_operator_gcp_datacatalog_create_tag]
# [START howto_operator_gcp_datacatalog_create_tag_result]
create_tag_result = BashOperator(
task_id="create_tag_result",
bash_command="echo \"{{ task_instance.xcom_pull('create_tag', key='tag_id') }}\"",
)
# [END howto_operator_gcp_datacatalog_create_tag_result]
# [START howto_operator_gcp_datacatalog_create_tag_result2]
create_tag_result2 = BashOperator(
task_id="create_tag_result2", bash_command="echo \"{{ task_instance.xcom_pull('create_tag') }}\""
)
# [END howto_operator_gcp_datacatalog_create_tag_result2]
# [START howto_operator_gcp_datacatalog_create_tag_template]
create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
task_id="create_tag_template",
location=LOCATION,
tag_template_id=TEMPLATE_ID,
tag_template={
"display_name": "Awesome Tag Template",
"fields": {
FIELD_NAME_1: TagTemplateField(
display_name="first-field", type=FieldType(primitive_type="STRING")
)
},
},
)
# [END howto_operator_gcp_datacatalog_create_tag_template]
# [START howto_operator_gcp_datacatalog_create_tag_template_result]
create_tag_template_result = BashOperator(
task_id="create_tag_template_result",
bash_command="echo \"{{ task_instance.xcom_pull('create_tag_template', key='tag_template_id') }}\"",
)
# [END howto_operator_gcp_datacatalog_create_tag_template_result]
# [START howto_operator_gcp_datacatalog_create_tag_template_result2]
create_tag_template_result2 = BashOperator(
task_id="create_tag_template_result2",
bash_command="echo \"{{ task_instance.xcom_pull('create_tag_template') }}\"",
)
# [END howto_operator_gcp_datacatalog_create_tag_template_result2]
# [START howto_operator_gcp_datacatalog_create_tag_template_field]
create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
task_id="create_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_2,
tag_template_field=TagTemplateField(
display_name="second-field", type=FieldType(primitive_type="STRING")
),
)
# [END howto_operator_gcp_datacatalog_create_tag_template_field]
# [START howto_operator_gcp_datacatalog_create_tag_template_field_result]
create_tag_template_field_result = BashOperator(
task_id="create_tag_template_field_result",
bash_command=(
"echo \"{{ task_instance.xcom_pull('create_tag_template_field',"
+ " key='tag_template_field_id') }}\""
),
)
# [END howto_operator_gcp_datacatalog_create_tag_template_field_result]
# [START howto_operator_gcp_datacatalog_create_tag_template_field_result2]
create_tag_template_field_result2 = BashOperator(
task_id="create_tag_template_field_result",
bash_command="echo \"{{ task_instance.xcom_pull('create_tag_template_field') }}\"",
)
# [END howto_operator_gcp_datacatalog_create_tag_template_field_result2]
# Delete
# [START howto_operator_gcp_datacatalog_delete_entry]
delete_entry = CloudDataCatalogDeleteEntryOperator(
task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
# [END howto_operator_gcp_datacatalog_delete_entry]
# [START howto_operator_gcp_datacatalog_delete_entry_group]
delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)
# [END howto_operator_gcp_datacatalog_delete_entry_group]
# [START howto_operator_gcp_datacatalog_delete_tag]
delete_tag = CloudDataCatalogDeleteTagOperator(
task_id="delete_tag",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
tag="{{ task_instance.xcom_pull('create_tag', key='tag_id') }}",
)
# [END howto_operator_gcp_datacatalog_delete_tag]
# [START howto_operator_gcp_datacatalog_delete_tag_template_field]
delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
task_id="delete_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_2,
force=True,
)
# [END howto_operator_gcp_datacatalog_delete_tag_template_field]
# [START howto_operator_gcp_datacatalog_delete_tag_template]
delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)
# [END howto_operator_gcp_datacatalog_delete_tag_template]
# Get
# [START howto_operator_gcp_datacatalog_get_entry_group]
get_entry_group = CloudDataCatalogGetEntryGroupOperator(
task_id="get_entry_group",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
read_mask={"paths": ["name", "display_name"]},
)
# [END howto_operator_gcp_datacatalog_get_entry_group]
# [START howto_operator_gcp_datacatalog_get_entry_group_result]
get_entry_group_result = BashOperator(
task_id="get_entry_group_result",
bash_command="echo \"{{ task_instance.xcom_pull('get_entry_group') }}\"",
)
# [END howto_operator_gcp_datacatalog_get_entry_group_result]
# [START howto_operator_gcp_datacatalog_get_entry]
get_entry = CloudDataCatalogGetEntryOperator(
task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
# [END howto_operator_gcp_datacatalog_get_entry]
# [START howto_operator_gcp_datacatalog_get_entry_result]
get_entry_result = BashOperator(
task_id="get_entry_result", bash_command="echo \"{{ task_instance.xcom_pull('get_entry') }}\""
)
# [END howto_operator_gcp_datacatalog_get_entry_result]
# [START howto_operator_gcp_datacatalog_get_tag_template]
get_tag_template = CloudDataCatalogGetTagTemplateOperator(
task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)
# [END howto_operator_gcp_datacatalog_get_tag_template]
# [START howto_operator_gcp_datacatalog_get_tag_template_result]
get_tag_template_result = BashOperator(
task_id="get_tag_template_result",
bash_command="echo \"{{ task_instance.xcom_pull('get_tag_template') }}\"",
)
# [END howto_operator_gcp_datacatalog_get_tag_template_result]
# List
# [START howto_operator_gcp_datacatalog_list_tags]
list_tags = CloudDataCatalogListTagsOperator(
task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
# [END howto_operator_gcp_datacatalog_list_tags]
# [START howto_operator_gcp_datacatalog_list_tags_result]
list_tags_result = BashOperator(
task_id="list_tags_result", bash_command="echo \"{{ task_instance.xcom_pull('list_tags') }}\""
)
# [END howto_operator_gcp_datacatalog_list_tags_result]
# Lookup
# [START howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
current_entry_template = (
"//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
"entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
task_id="lookup_entry",
linked_resource=current_entry_template.format(
project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
)
# [END howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
# [START howto_operator_gcp_datacatalog_lookup_entry_result]
lookup_entry_result = BashOperator(
task_id="lookup_entry_result",
bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['displayName'] }}\"",
)
# [END howto_operator_gcp_datacatalog_lookup_entry_result]
# Rename
# [START howto_operator_gcp_datacatalog_rename_tag_template_field]
rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
task_id="rename_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_1,
new_tag_template_field_id=FIELD_NAME_3,
)
# [END howto_operator_gcp_datacatalog_rename_tag_template_field]
# Search
# [START howto_operator_gcp_datacatalog_search_catalog]
search_catalog = CloudDataCatalogSearchCatalogOperator(
task_id="search_catalog", scope={"include_project_ids": [PROJECT_ID]}, query=f"projectid:{PROJECT_ID}"
)
# [END howto_operator_gcp_datacatalog_search_catalog]
# [START howto_operator_gcp_datacatalog_search_catalog_result]
search_catalog_result = BashOperator(
task_id="search_catalog_result",
bash_command="echo \"{{ task_instance.xcom_pull('search_catalog') }}\"",
)
# [END howto_operator_gcp_datacatalog_search_catalog_result]
# Update
# [START howto_operator_gcp_datacatalog_update_entry]
update_entry = CloudDataCatalogUpdateEntryOperator(
task_id="update_entry",
entry={"display_name": "New Wizard"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
)
# [END howto_operator_gcp_datacatalog_update_entry]
# [START howto_operator_gcp_datacatalog_update_tag]
update_tag = CloudDataCatalogUpdateTagOperator(
task_id="update_tag",
tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
update_mask={"paths": ["fields"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
tag_id="{{ task_instance.xcom_pull('create_tag', key='tag_id') }}",
)
# [END howto_operator_gcp_datacatalog_update_tag]
# [START howto_operator_gcp_datacatalog_update_tag_template]
update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
task_id="update_tag_template",
tag_template={"display_name": "Awesome Tag Template"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template_id=TEMPLATE_ID,
)
# [END howto_operator_gcp_datacatalog_update_tag_template]
# [START howto_operator_gcp_datacatalog_update_tag_template_field]
update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
task_id="update_tag_template_field",
tag_template_field={"display_name": "Updated template field"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_1,
)
# [END howto_operator_gcp_datacatalog_update_tag_template_field]
# Create
create_tasks = [
create_entry_group,
create_entry_gcs,
create_tag_template,
create_tag_template_field,
create_tag,
]
chain(*create_tasks)
create_entry_group >> delete_entry_group
create_entry_group >> create_entry_group_result
create_entry_group >> create_entry_group_result2
create_entry_gcs >> delete_entry
create_entry_gcs >> create_entry_gcs_result
create_entry_gcs >> create_entry_gcs_result2
create_tag_template >> delete_tag_template_field
create_tag_template >> create_tag_template_result
create_tag_template >> create_tag_template_result2
create_tag_template_field >> delete_tag_template_field
create_tag_template_field >> create_tag_template_field_result
create_tag_template_field >> create_tag_template_field_result2
create_tag >> delete_tag
create_tag >> create_tag_result
create_tag >> create_tag_result2
# Delete
delete_tasks = [
delete_tag,
delete_tag_template_field,
delete_tag_template,
delete_entry,
delete_entry_group,
]
chain(*delete_tasks)
# Get
create_tag_template >> get_tag_template >> delete_tag_template
get_tag_template >> get_tag_template_result
create_entry_gcs >> get_entry >> delete_entry
get_entry >> get_entry_result
create_entry_group >> get_entry_group >> delete_entry_group
get_entry_group >> get_entry_group_result
# List
create_tag >> list_tags >> delete_tag
list_tags >> list_tags_result
# Lookup
create_entry_gcs >> lookup_entry_linked_resource >> delete_entry
lookup_entry_linked_resource >> lookup_entry_result
# Rename
create_tag_template_field >> rename_tag_template_field >> delete_tag_template_field
# Search
chain(create_tasks, search_catalog, delete_tasks)
search_catalog >> search_catalog_result
# Update
create_entry_gcs >> update_entry >> delete_entry
create_tag >> update_tag >> delete_tag
create_tag_template >> update_tag_template >> delete_tag_template
create_tag_template_field >> update_tag_template_field >> rename_tag_template_field

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -280,6 +280,7 @@ intersphinx_mapping = {
'google-cloud-bigtable': ('https://googleapis.dev/python/bigtable/latest', None),
'google-cloud-container': ('https://googleapis.dev/python/container/latest', None),
'google-cloud-core': ('https://googleapis.dev/python/google-cloud-core/latest', None),
'google-cloud-datacatalog': ('https://googleapis.dev/python/datacatalog/latest', None),
'google-cloud-datastore': ('https://googleapis.dev/python/datastore/latest', None),
'google-cloud-dlp': ('https://googleapis.dev/python/dlp/latest', None),
'google-cloud-kms': ('https://googleapis.dev/python/cloudkms/latest', None),

Просмотреть файл

@ -0,0 +1,614 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Google Cloud Data Catalog Operators
=======================================
The `Data Catalog <https://cloud.google.com/data-catalog>`__ is a fully managed and scalable metadata
management service that allows organizations to quickly discover, manage and understand all their data in
Google Cloud. It offers:
* A simple and easy to use search interface for data discovery, powered by the same Google search technology that
supports Gmail and Drive
* A flexible and powerful cataloging system for capturing technical and business metadata
* An auto-tagging mechanism for sensitive data with DLP API integration
.. contents::
:depth: 1
:local:
Prerequisite Tasks
^^^^^^^^^^^^^^^^^^
.. include:: _partials/prerequisite_tasks.rst
.. _howto/operator:CloudDataCatalogEntryOperators:
Managing an entries
^^^^^^^^^^^^^^^^^^^
Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.Entry` for representing entry
.. contents::
:depth: 1
:local:
.. _howto/operator:CloudDataCatalogLookupEntryOperator:
.. _howto/operator:CloudDataCatalogGetEntryOperator:
Getting an entry
""""""""""""""""
Getting an entry is performed with the
:class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogGetEntryOperator` and
:class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogLookupEntryOperator`
operators.
The ``CloudDataCatalogGetEntryOperator`` use Project ID, Entry Group ID, Entry ID to get the entry.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_entry]
:end-before: [END howto_operator_gcp_datacatalog_get_entry]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogGetEntryOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_entry_result]
:end-before: [END howto_operator_gcp_datacatalog_get_entry_result]
The ``CloudDataCatalogLookupEntryOperator`` use the resource name to get the entry.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
:end-before: [END howto_operator_gcp_datacatalog_lookup_entry_linked_resource]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogLookupEntryOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_lookup_entry_result]
:end-before: [END howto_operator_gcp_datacatalog_lookup_entry_result]
.. _howto/operator:CloudDataCatalogCreateEntryOperator:
Creating an entry
"""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateEntryOperator`
operator create the entry.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_gcs]
:end-before: [END howto_operator_gcp_datacatalog_create_entry_gcs]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateEntryOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_gcs_result2]
:end-before: [END howto_operator_gcp_datacatalog_create_entry_gcs_result2]
The newly created entry ID can be read with the ``entry_id`` key.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_gcs_result]
:end-before: [END howto_operator_gcp_datacatalog_create_entry_gcs_result]
.. _howto/operator:CloudDataCatalogUpdateEntryOperator:
Updating an entry
"""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateEntryOperator`
operator update the entry.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_update_entry]
:end-before: [END howto_operator_gcp_datacatalog_update_entry]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateEntryOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogDeleteEntryOperator:
Deleting a entry
""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteEntryOperator`
operator delete the entry.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_delete_entry]
:end-before: [END howto_operator_gcp_datacatalog_delete_entry]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteEntryOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogEntryGroupOperators:
Managing a entry groups
^^^^^^^^^^^^^^^^^^^^^^^
Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.Entry` for representing a entry groups.
.. contents::
:depth: 1
:local:
.. _howto/operator:CloudDataCatalogCreateEntryGroupOperator:
Creating an entry group
"""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateEntryGroupOperator`
operator create the entry group.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_group]
:end-before: [END howto_operator_gcp_datacatalog_create_entry_group]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateEntryGroupOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
:end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
The newly created entry group ID can be read with the ``entry_group_id`` key.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
:end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
.. _howto/operator:CloudDataCatalogGetEntryGroupOperator:
Getting an entry group
""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogGetEntryGroupOperator`
operator get the entry group.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_entry_group]
:end-before: [END howto_operator_gcp_datacatalog_get_entry_group]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogGetEntryGroupOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_entry_group_result]
:end-before: [END howto_operator_gcp_datacatalog_get_entry_group_result]
.. _howto/operator:CloudDataCatalogDeleteEntryGroupOperator:
Deleting an entry group
"""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteEntryGroupOperator`
operator delete the entry group.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_delete_entry_group]
:end-before: [END howto_operator_gcp_datacatalog_delete_entry_group]
vYou can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteEntryGroupOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogTagTemplateOperators:
Managing a tag templates
^^^^^^^^^^^^^^^^^^^^^^^^
Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.TagTemplate` for representing a tag templates.
.. contents::
:depth: 1
:local:
.. _howto/operator:CloudDataCatalogCreateTagTemplateOperator:
Creating a tag templates
""""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagTemplateOperator`
operator get the tag template.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag_template]
:end-before: [END howto_operator_gcp_datacatalog_create_tag_template]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagTemplateOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag_template_result2]
:end-before: [END howto_operator_gcp_datacatalog_create_tag_template_result2]
The newly created tag template ID can be read with the ``tag_template_id`` key.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag_template_result]
:end-before: [END howto_operator_gcp_datacatalog_create_tag_template_result]
.. _howto/operator:CloudDataCatalogDeleteTagTemplateOperator:
Deleting a tag template
"""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagTemplateOperator`
operator delete the tag template.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_delete_tag_template]
:end-before: [END howto_operator_gcp_datacatalog_delete_tag_template]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagTemplateOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogGetTagTemplateOperator:
Getting a tag template
""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogGetTagTemplateOperator`
operator get the tag template.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_tag_template]
:end-before: [END howto_operator_gcp_datacatalog_get_tag_template]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogGetTagTemplateOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_get_tag_template_result]
:end-before: [END howto_operator_gcp_datacatalog_get_tag_template_result]
.. _howto/operator:CloudDataCatalogUpdateTagTemplateOperator:
Updating a tag template
"""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagTemplateOperator`
operator update the tag template.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_update_tag_template]
:end-before: [END howto_operator_gcp_datacatalog_update_tag_template]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagTemplateOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogTagOperators:
Managing a tags
^^^^^^^^^^^^^^^
Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.Tag` for representing a tag.
.. contents::
:depth: 1
:local:
.. _howto/operator:CloudDataCatalogCreateTagOperator:
Creating a tag on an entry
""""""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagOperator`
operator get the tag template.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag]
:end-before: [END howto_operator_gcp_datacatalog_create_tag]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag_result2]
:end-before: [END howto_operator_gcp_datacatalog_create_tag_result2]
The newly created tag ID can be read with the ``tag_id`` key.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
:end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
.. _howto/operator:CloudDataCatalogUpdateTagOperator:
Updating an tag
"""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagOperator`
operator update the tag template.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_update_tag_template]
:end-before: [END howto_operator_gcp_datacatalog_update_tag_template]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogDeleteTagOperator:
Deleting an tag
"""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagOperator`
operator delete the tag template.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_delete_tag_template]
:end-before: [END howto_operator_gcp_datacatalog_delete_tag_template]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogListTagsOperator:
Listing an tags on an entry
"""""""""""""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogListTagsOperator`
operator get list of the tags on the entry.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_list_tags]
:end-before: [END howto_operator_gcp_datacatalog_list_tags]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogListTagsOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_list_tags_result]
:end-before: [END howto_operator_gcp_datacatalog_list_tags_result]
.. _howto/operator:CloudDataCatalogTagTemplateFieldssOperators:
Managing a tag template fields
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Operators uses a :class:`~google.cloud.datacatalog_v1beta1.types.TagTemplateField` for representing a tag template fields.
.. contents::
:depth: 1
:local:
.. _howto/operator:CloudDataCatalogCreateTagTemplateFieldOperator:
Creating a field
""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagTemplateFieldOperator`
operator get the tag template field.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag_template_field]
:end-before: [END howto_operator_gcp_datacatalog_create_tag_template_field]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogCreateTagTemplateFieldOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_tag_template_field_result2]
:end-before: [END howto_operator_gcp_datacatalog_create_tag_template_field_result2]
The newly created field ID can be read with the ``tag_template_field_id`` key.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_create_entry_group_result2]
:end-before: [END howto_operator_gcp_datacatalog_create_entry_group_result2]
.. _howto/operator:CloudDataCatalogRenameTagTemplateFieldOperator:
Renaming a field
""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogRenameTagTemplateFieldOperator`
operator rename the tag template field.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_rename_tag_template_field]
:end-before: [END howto_operator_gcp_datacatalog_rename_tag_template_field]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogRenameTagTemplateFieldOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogUpdateTagTemplateFieldOperator:
Updating a field
""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagTemplateFieldOperator`
operator get the tag template field.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_update_tag_template_field]
:end-before: [END howto_operator_gcp_datacatalog_update_tag_template_field]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogUpdateTagTemplateFieldOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogDeleteTagTemplateFieldOperator:
Deleting a field
""""""""""""""""
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagTemplateFieldOperator`
operator delete the tag template field.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_delete_tag_template_field]
:end-before: [END howto_operator_gcp_datacatalog_delete_tag_template_field]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogDeleteTagTemplateFieldOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:CloudDataCatalogSearchCatalogOperator:
Search resources
^^^^^^^^^^^^^^^^
The :class:`~airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogSearchCatalogOperator`
operator searches Data Catalog for multiple resources like entries, tags that match a query.
The ``query`` parameters should defined using `search syntax <https://cloud.google.com/data-catalog/docs/how-to/search-reference>`__.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_search_catalog]
:end-before: [END howto_operator_gcp_datacatalog_search_catalog]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogSearchCatalogOperator`
parameters which allows you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows it to be used by other operators.
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_datacatalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_gcp_datacatalog_search_catalog_result]
:end-before: [END howto_operator_gcp_datacatalog_search_catalog_result]
Reference
^^^^^^^^^
For further information, look at:
* `Client Library Documentation <https://googleapis.dev/python/datacatalog/latest/index.html>`__
* `Product Documentation <https://cloud.google.com/data-catalog/docs/>`__

Просмотреть файл

@ -602,6 +602,12 @@ These integrations allow you to perform various operations within the Google Clo
- :mod:`airflow.providers.google.cloud.operators.datafusion`
-
* - `Datacatalog <https://cloud.google.com/data-catalog>`__
-
- :mod:`airflow.providers.google.cloud.hooks.datacatalog`
- :mod:`airflow.providers.google.cloud.operators.datacatalog`
-
* - `Dataflow <https://cloud.google.com/dataflow/>`__
-
- :mod:`airflow.providers.google.cloud.hooks.dataflow`

Просмотреть файл

@ -227,6 +227,7 @@ gcp = [
'google-cloud-bigquery-datatransfer>=0.4.0',
'google-cloud-bigtable>=1.0.0',
'google-cloud-container>=0.1.1',
'google-cloud-datacatalog>=0.5.0',
'google-cloud-dataproc>=0.5.0',
'google-cloud-dlp>=0.11.0',
'google-cloud-kms>=1.2.1',

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,755 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Dict, Sequence, Tuple
from unittest import TestCase, mock
from google.api_core.exceptions import AlreadyExists
from google.api_core.retry import Retry
from google.cloud.datacatalog_v1beta1.types import Entry, EntryGroup, Tag, TagTemplate, TagTemplateField
from airflow.providers.google.cloud.operators.datacatalog import (
CloudDataCatalogCreateEntryGroupOperator, CloudDataCatalogCreateEntryOperator,
CloudDataCatalogCreateTagOperator, CloudDataCatalogCreateTagTemplateFieldOperator,
CloudDataCatalogCreateTagTemplateOperator, CloudDataCatalogDeleteEntryGroupOperator,
CloudDataCatalogDeleteEntryOperator, CloudDataCatalogDeleteTagOperator,
CloudDataCatalogDeleteTagTemplateFieldOperator, CloudDataCatalogDeleteTagTemplateOperator,
CloudDataCatalogGetEntryGroupOperator, CloudDataCatalogGetEntryOperator,
CloudDataCatalogGetTagTemplateOperator, CloudDataCatalogListTagsOperator,
CloudDataCatalogLookupEntryOperator, CloudDataCatalogRenameTagTemplateFieldOperator,
CloudDataCatalogSearchCatalogOperator, CloudDataCatalogUpdateEntryOperator,
CloudDataCatalogUpdateTagOperator, CloudDataCatalogUpdateTagTemplateFieldOperator,
CloudDataCatalogUpdateTagTemplateOperator,
)
TEST_PROJECT_ID: str = "example_id"
TEST_LOCATION: str = "en-west-3"
TEST_ENTRY_ID: str = "test-entry-id"
TEST_TAG_ID: str = "test-tag-id"
TEST_RETRY: Retry = Retry()
TEST_TIMEOUT: float = 0.5
TEST_METADATA: Sequence[Tuple[str, str]] = []
TEST_GCP_CONN_ID: str = "test-gcp-conn-id"
TEST_ENTRY_GROUP_ID: str = "test-entry-group-id"
TEST_TAG_TEMPLATE_ID: str = "test-tag-template-id"
TEST_TAG_TEMPLATE_FIELD_ID: str = "test-tag-template-field-id"
TEST_TAG_TEMPLATE_NAME: str = "test-tag-template-field-name"
TEST_FORCE: bool = False
TEST_READ_MASK: Dict = {"fields": ["name"]}
TEST_RESOURCE: str = "test-resource"
TEST_OPTIONS_: Dict = {}
TEST_PAGE_SIZE: int = 50
TEST_LINKED_RESOURCE: str = "test-linked-resource"
TEST_SQL_RESOURCE: str = "test-sql-resource"
TEST_NEW_TAG_TEMPLATE_FIELD_ID: str = "test-new-tag-template-field-id"
TEST_SCOPE: Dict = dict(include_project_ids=["example-scope-project"])
TEST_QUERY: str = "test-query"
TEST_ORDER_BY: str = "test-order-by"
TEST_UPDATE_MASK: Dict = {"fields": ["name"]}
TEST_ENTRY_PATH: str = (
f"projects/{TEST_PROJECT_ID}/locations/{TEST_LOCATION}"
+ f"/entryGroups/{TEST_ENTRY_GROUP_ID}/entries/{TEST_ENTRY_ID}"
)
TEST_ENTRY_GROUP_PATH: str = (
f"projects/{TEST_PROJECT_ID}/locations/{TEST_LOCATION}/entryGroups/{TEST_ENTRY_GROUP_ID}"
)
TEST_TAG_TEMPLATE_PATH: str = (
f"projects/{TEST_PROJECT_ID}/locations/{TEST_LOCATION}/tagTemplates/{TEST_TAG_TEMPLATE_ID}"
)
TEST_TAG_PATH: str = (
f"projects/{TEST_PROJECT_ID}/locations/{TEST_LOCATION}/entryGroups/"
+ f"{TEST_ENTRY_GROUP_ID}/entries/{TEST_ENTRY_ID}/tags/{TEST_TAG_ID}"
)
TEST_ENTRY: Entry = Entry(name=TEST_ENTRY_PATH)
TEST_ENTRY_DICT: Dict = dict(name=TEST_ENTRY_PATH)
TEST_ENTRY_GROUP: EntryGroup = EntryGroup(name=TEST_ENTRY_GROUP_PATH)
TEST_ENTRY_GROUP_DICT: Dict = dict(name=TEST_ENTRY_GROUP_PATH)
TEST_TAG: EntryGroup = Tag(name=TEST_TAG_PATH)
TEST_TAG_DICT: Dict = dict(name=TEST_TAG_PATH)
TEST_TAG_TEMPLATE: TagTemplate = TagTemplate(name=TEST_TAG_TEMPLATE_PATH)
TEST_TAG_TEMPLATE_DICT: Dict = dict(name=TEST_TAG_TEMPLATE_PATH)
TEST_TAG_TEMPLATE_FIELD: Dict = TagTemplateField(name=TEST_TAG_TEMPLATE_FIELD_ID)
TEST_TAG_TEMPLATE_FIELD_DICT: Dict = dict(name=TEST_TAG_TEMPLATE_FIELD_ID)
class TestCloudDataCatalogCreateEntryOperator(TestCase):
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.create_entry.return_value": TEST_ENTRY},
)
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogCreateEntryOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry_id=TEST_ENTRY_ID,
entry=TEST_ENTRY,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
ti = mock.MagicMock()
result = task.execute(context={"task_instance": ti})
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.create_entry.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry_id=TEST_ENTRY_ID,
entry=TEST_ENTRY,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
ti.xcom_push.assert_called_once_with(key="entry_id", value=TEST_ENTRY_ID)
self.assertEqual(TEST_ENTRY_DICT, result)
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{
"return_value.create_entry.side_effect": AlreadyExists(message="message"),
"return_value.get_entry.return_value": TEST_ENTRY,
},
)
def test_assert_valid_hook_call_when_exists(self, mock_hook) -> None:
task = CloudDataCatalogCreateEntryOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry_id=TEST_ENTRY_ID,
entry=TEST_ENTRY,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
ti = mock.MagicMock()
result = task.execute(context={"task_instance": ti})
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.create_entry.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry_id=TEST_ENTRY_ID,
entry=TEST_ENTRY,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
mock_hook.return_value.get_entry.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
ti.xcom_push.assert_called_once_with(key="entry_id", value=TEST_ENTRY_ID)
self.assertEqual(TEST_ENTRY_DICT, result)
class TestCloudDataCatalogCreateEntryGroupOperator(TestCase):
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.create_entry_group.return_value": TEST_ENTRY_GROUP},
)
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogCreateEntryGroupOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group_id=TEST_ENTRY_GROUP_ID,
entry_group=TEST_ENTRY_GROUP,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
ti = mock.MagicMock()
result = task.execute(context={"task_instance": ti})
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.create_entry_group.assert_called_once_with(
location=TEST_LOCATION,
entry_group_id=TEST_ENTRY_GROUP_ID,
entry_group=TEST_ENTRY_GROUP,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
ti.xcom_push.assert_called_once_with(key="entry_group_id", value=TEST_ENTRY_GROUP_ID)
self.assertEqual(result, TEST_ENTRY_GROUP_DICT)
class TestCloudDataCatalogCreateTagOperator(TestCase):
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.create_tag.return_value": TEST_TAG},
)
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogCreateTagOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
tag=TEST_TAG,
template_id=TEST_TAG_TEMPLATE_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
ti = mock.MagicMock()
result = task.execute(context={"task_instance": ti})
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.create_tag.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
tag=TEST_TAG,
template_id=TEST_TAG_TEMPLATE_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
ti.xcom_push.assert_called_once_with(key="tag_id", value=TEST_TAG_ID)
self.assertEqual(TEST_TAG_DICT, result)
class TestCloudDataCatalogCreateTagTemplateOperator(TestCase):
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.create_tag_template.return_value": TEST_TAG_TEMPLATE},
)
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogCreateTagTemplateOperator(
task_id="task_id",
location=TEST_LOCATION,
tag_template_id=TEST_TAG_TEMPLATE_ID,
tag_template=TEST_TAG_TEMPLATE,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
ti = mock.MagicMock()
result = task.execute(context={"task_instance": ti})
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.create_tag_template.assert_called_once_with(
location=TEST_LOCATION,
tag_template_id=TEST_TAG_TEMPLATE_ID,
tag_template=TEST_TAG_TEMPLATE,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
ti.xcom_push.assert_called_once_with(key="tag_template_id", value=TEST_TAG_TEMPLATE_ID)
self.assertEqual(TEST_TAG_TEMPLATE_DICT, result)
class TestCloudDataCatalogCreateTagTemplateFieldOperator(TestCase):
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.create_tag_template_field.return_value": TEST_TAG_TEMPLATE_FIELD}, # type: ignore
)
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogCreateTagTemplateFieldOperator(
task_id="task_id",
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
tag_template_field_id=TEST_TAG_TEMPLATE_FIELD_ID,
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
ti = mock.MagicMock()
result = task.execute(context={"task_instance": ti})
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.create_tag_template_field.assert_called_once_with(
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
tag_template_field_id=TEST_TAG_TEMPLATE_FIELD_ID,
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
ti.xcom_push.assert_called_once_with(key="tag_template_field_id", value=TEST_TAG_TEMPLATE_FIELD_ID)
self.assertEqual(TEST_TAG_TEMPLATE_FIELD_DICT, result)
class TestCloudDataCatalogDeleteEntryOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogDeleteEntryOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.delete_entry.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogDeleteEntryGroupOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogDeleteEntryGroupOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.delete_entry_group.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogDeleteTagOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogDeleteTagOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
tag=TEST_TAG_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.delete_tag.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
tag=TEST_TAG_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogDeleteTagTemplateOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogDeleteTagTemplateOperator(
task_id="task_id",
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
force=TEST_FORCE,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.delete_tag_template.assert_called_once_with(
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
force=TEST_FORCE,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogDeleteTagTemplateFieldOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogDeleteTagTemplateFieldOperator(
task_id="task_id",
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
field=TEST_TAG_TEMPLATE_FIELD_ID,
force=TEST_FORCE,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.delete_tag_template_field.assert_called_once_with(
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
field=TEST_TAG_TEMPLATE_FIELD_ID,
force=TEST_FORCE,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogGetEntryOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogGetEntryOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.get_entry.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogGetEntryGroupOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogGetEntryGroupOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
read_mask=TEST_READ_MASK,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.get_entry_group.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
read_mask=TEST_READ_MASK,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogGetTagTemplateOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogGetTagTemplateOperator(
task_id="task_id",
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.get_tag_template.assert_called_once_with(
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogListTagsOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogListTagsOperator(
task_id="task_id",
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
page_size=TEST_PAGE_SIZE,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.list_tags.assert_called_once_with(
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
page_size=TEST_PAGE_SIZE,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogLookupEntryOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogLookupEntryOperator(
task_id="task_id",
linked_resource=TEST_LINKED_RESOURCE,
sql_resource=TEST_SQL_RESOURCE,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.lookup_entry.assert_called_once_with(
linked_resource=TEST_LINKED_RESOURCE,
sql_resource=TEST_SQL_RESOURCE,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogRenameTagTemplateFieldOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogRenameTagTemplateFieldOperator(
task_id="task_id",
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
field=TEST_TAG_TEMPLATE_FIELD_ID,
new_tag_template_field_id=TEST_NEW_TAG_TEMPLATE_FIELD_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.rename_tag_template_field.assert_called_once_with(
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
field=TEST_TAG_TEMPLATE_FIELD_ID,
new_tag_template_field_id=TEST_NEW_TAG_TEMPLATE_FIELD_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogSearchCatalogOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogSearchCatalogOperator(
task_id="task_id",
scope=TEST_SCOPE,
query=TEST_QUERY,
page_size=TEST_PAGE_SIZE,
order_by=TEST_ORDER_BY,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.search_catalog.assert_called_once_with(
scope=TEST_SCOPE,
query=TEST_QUERY,
page_size=TEST_PAGE_SIZE,
order_by=TEST_ORDER_BY,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogUpdateEntryOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogUpdateEntryOperator(
task_id="task_id",
entry=TEST_ENTRY,
update_mask=TEST_UPDATE_MASK,
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry_id=TEST_ENTRY_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.update_entry.assert_called_once_with(
entry=TEST_ENTRY,
update_mask=TEST_UPDATE_MASK,
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry_id=TEST_ENTRY_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogUpdateTagOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogUpdateTagOperator(
task_id="task_id",
tag=TEST_TAG_ID,
update_mask=TEST_UPDATE_MASK,
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
tag_id=TEST_TAG_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.update_tag.assert_called_once_with(
tag=TEST_TAG_ID,
update_mask=TEST_UPDATE_MASK,
location=TEST_LOCATION,
entry_group=TEST_ENTRY_GROUP_ID,
entry=TEST_ENTRY_ID,
tag_id=TEST_TAG_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogUpdateTagTemplateOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogUpdateTagTemplateOperator(
task_id="task_id",
tag_template=TEST_TAG_TEMPLATE_ID,
update_mask=TEST_UPDATE_MASK,
location=TEST_LOCATION,
tag_template_id=TEST_TAG_TEMPLATE_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.update_tag_template.assert_called_once_with(
tag_template=TEST_TAG_TEMPLATE_ID,
update_mask=TEST_UPDATE_MASK,
location=TEST_LOCATION,
tag_template_id=TEST_TAG_TEMPLATE_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
class TestCloudDataCatalogUpdateTagTemplateFieldOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogUpdateTagTemplateFieldOperator(
task_id="task_id",
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
update_mask=TEST_UPDATE_MASK,
tag_template_field_name=TEST_TAG_TEMPLATE_NAME,
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
tag_template_field_id=TEST_TAG_TEMPLATE_FIELD_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
gcp_conn_id=TEST_GCP_CONN_ID,
)
task.execute(context=mock.MagicMock())
mock_hook.assert_called_once_with(gcp_conn_id=TEST_GCP_CONN_ID)
mock_hook.return_value.update_tag_template_field.assert_called_once_with(
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
update_mask=TEST_UPDATE_MASK,
tag_template_field_name=TEST_TAG_TEMPLATE_NAME,
location=TEST_LOCATION,
tag_template=TEST_TAG_TEMPLATE_ID,
tag_template_field_id=TEST_TAG_TEMPLATE_FIELD_ID,
project_id=TEST_PROJECT_ID,
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)