Support google-cloud-datacatalog>=3.0.0 (#13534)

This commit is contained in:
Kamil Breguła 2021-01-11 09:39:19 +01:00 коммит произвёл GitHub
Родитель 87a7557f8b
Коммит 947dbb73bb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 357 добавлений и 210 удалений

Просмотреть файл

@ -30,7 +30,7 @@ Details are covered in the UPDATING.md files for each library, but there are som
| Library name | Previous constraints | Current constraints | |
| --- | --- | --- | --- |
| [``google-cloud-bigquery-datatransfer``](https://pypi.org/project/google-cloud-bigquery-datatransfer/) | ``>=0.4.0,<2.0.0`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-bigquery-datatransfer/blob/master/UPGRADING.md) |
| [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=1.0.0,<2.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
| [``google-cloud-datacatalog``](https://pypi.org/project/google-cloud-datacatalog/) | ``>=0.5.0,<0.8`` | ``>=3.0.0,<4.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-datacatalog/blob/master/UPGRADING.md) |
| [``google-cloud-os-login``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-oslogin/blob/master/UPGRADING.md) |
| [``google-cloud-pubsub``](https://pypi.org/project/google-cloud-pubsub/) | ``>=1.0.0,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-pubsub/blob/master/UPGRADING.md) |
| [``google-cloud-kms``](https://pypi.org/project/google-cloud-os-login/) | ``>=1.2.1,<2.0.0`` | ``>=2.0.0,<3.0.0`` | [`UPGRADING.md`](https://github.com/googleapis/python-kms/blob/master/UPGRADING.md) |

Просмотреть файл

@ -19,7 +19,7 @@
"""
Example Airflow DAG that interacts with Google Data Catalog service
"""
from google.cloud.datacatalog_v1beta1.proto.tags_pb2 import FieldType, TagField, TagTemplateField
from google.cloud.datacatalog_v1beta1 import FieldType, TagField, TagTemplateField
from airflow import models
from airflow.operators.bash_operator import BashOperator
@ -91,7 +91,7 @@ with models.DAG("example_gcp_datacatalog", start_date=days_ago(1), schedule_inte
entry_id=ENTRY_ID,
entry={
"display_name": "Wizard",
"type": "FILESET",
"type_": "FILESET",
"gcs_fileset_spec": {"file_patterns": ["gs://test-datacatalog/**"]},
},
)
@ -144,7 +144,7 @@ with models.DAG("example_gcp_datacatalog", start_date=days_ago(1), schedule_inte
"display_name": "Awesome Tag Template",
"fields": {
FIELD_NAME_1: TagTemplateField(
display_name="first-field", type=FieldType(primitive_type="STRING")
display_name="first-field", type_=dict(primitive_type="STRING")
)
},
},
@ -172,7 +172,7 @@ with models.DAG("example_gcp_datacatalog", start_date=days_ago(1), schedule_inte
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_2,
tag_template_field=TagTemplateField(
display_name="second-field", type=FieldType(primitive_type="STRING")
display_name="second-field", type_=FieldType(primitive_type="STRING")
),
)
# [END howto_operator_gcp_datacatalog_create_tag_template_field]
@ -305,7 +305,7 @@ with models.DAG("example_gcp_datacatalog", start_date=days_ago(1), schedule_inte
# [START howto_operator_gcp_datacatalog_lookup_entry_result]
lookup_entry_result = BashOperator(
task_id="lookup_entry_result",
bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['displayName'] }}\"",
bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)
# [END howto_operator_gcp_datacatalog_lookup_entry_result]

Просмотреть файл

@ -18,16 +18,18 @@
from typing import Dict, Optional, Sequence, Tuple, Union
from google.api_core.retry import Retry
from google.cloud.datacatalog_v1beta1 import DataCatalogClient
from google.cloud.datacatalog_v1beta1.types import (
from google.cloud import datacatalog
from google.cloud.datacatalog_v1beta1 import (
CreateTagRequest,
DataCatalogClient,
Entry,
EntryGroup,
FieldMask,
SearchCatalogRequest,
Tag,
TagTemplate,
TagTemplateField,
)
from google.protobuf.field_mask_pb2 import FieldMask
from airflow import AirflowException
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
@ -115,10 +117,13 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
parent = DataCatalogClient.entry_group_path(project_id, location, entry_group)
parent = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}"
self.log.info('Creating a new entry: parent=%s', parent)
result = client.create_entry(
parent=parent, entry_id=entry_id, entry=entry, retry=retry, timeout=timeout, metadata=metadata
request={'parent': parent, 'entry_id': entry_id, 'entry': entry},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)
self.log.info('Created a entry: name=%s', result.name)
return result
@ -161,16 +166,14 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
parent = DataCatalogClient.location_path(project_id, location)
parent = f"projects/{project_id}/locations/{location}"
self.log.info('Creating a new entry group: parent=%s', parent)
result = client.create_entry_group(
parent=parent,
entry_group_id=entry_group_id,
entry_group=entry_group,
request={'parent': parent, 'entry_group_id': entry_group_id, 'entry_group': entry_group},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
self.log.info('Created a entry group: name=%s', result.name)
@ -218,15 +221,34 @@ class CloudDataCatalogHook(GoogleBaseHook):
"""
client = self.get_conn()
if template_id:
template_path = DataCatalogClient.tag_template_path(project_id, location, template_id)
template_path = f"projects/{project_id}/locations/{location}/tagTemplates/{template_id}"
if isinstance(tag, Tag):
tag.template = template_path
else:
tag["template"] = template_path
parent = DataCatalogClient.entry_path(project_id, location, entry_group, entry)
parent = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}/entries/{entry}"
self.log.info('Creating a new tag: parent=%s', parent)
result = client.create_tag(parent=parent, tag=tag, retry=retry, timeout=timeout, metadata=metadata)
# HACK: google-cloud-datacatalog has problems with mapping messages where the value is not a
# primitive type, so we need to convert it manually.
# See: https://github.com/googleapis/python-datacatalog/issues/84
if isinstance(tag, dict):
tag = Tag(
name=tag.get('name'),
template=tag.get('template'),
template_display_name=tag.get('template_display_name'),
column=tag.get('column'),
fields={
k: datacatalog.TagField(**v) if isinstance(v, dict) else v
for k, v in tag.get("fields", {}).items()
},
)
request = CreateTagRequest(
parent=parent,
tag=tag,
)
result = client.create_tag(request=request, retry=retry, timeout=timeout, metadata=metadata or ())
self.log.info('Created a tag: name=%s', result.name)
return result
@ -267,17 +289,30 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
parent = DataCatalogClient.location_path(project_id, location)
parent = f"projects/{project_id}/locations/{location}"
self.log.info('Creating a new tag template: parent=%s', parent)
# HACK: google-cloud-datacatalog has problems with mapping messages where the value is not a
# primitive type, so we need to convert it manually.
# See: https://github.com/googleapis/python-datacatalog/issues/84
if isinstance(tag_template, dict):
tag_template = datacatalog.TagTemplate(
name=tag_template.get("name"),
display_name=tag_template.get("display_name"),
fields={
k: datacatalog.TagTemplateField(**v) if isinstance(v, dict) else v
for k, v in tag_template.get("fields", {}).items()
},
)
request = datacatalog.CreateTagTemplateRequest(
parent=parent, tag_template_id=tag_template_id, tag_template=tag_template
)
result = client.create_tag_template(
parent=parent,
tag_template_id=tag_template_id,
tag_template=tag_template,
request=request,
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
self.log.info('Created a tag template: name=%s', result.name)
@ -325,17 +360,19 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
parent = DataCatalogClient.tag_template_path(project_id, location, tag_template)
parent = f"projects/{project_id}/locations/{location}/tagTemplates/{tag_template}"
self.log.info('Creating a new tag template field: parent=%s', parent)
result = client.create_tag_template_field(
parent=parent,
tag_template_field_id=tag_template_field_id,
tag_template_field=tag_template_field,
request={
'parent': parent,
'tag_template_field_id': tag_template_field_id,
'tag_template_field': tag_template_field,
},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
self.log.info('Created a tag template field: name=%s', result.name)
@ -375,9 +412,9 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
name = DataCatalogClient.entry_path(project_id, location, entry_group, entry)
name = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}/entries/{entry}"
self.log.info('Deleting a entry: name=%s', name)
client.delete_entry(name=name, retry=retry, timeout=timeout, metadata=metadata)
client.delete_entry(request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ())
self.log.info('Deleted a entry: name=%s', name)
@GoogleBaseHook.fallback_to_default_project_id
@ -412,10 +449,12 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
name = DataCatalogClient.entry_group_path(project_id, location, entry_group)
name = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}"
self.log.info('Deleting a entry group: name=%s', name)
client.delete_entry_group(name=name, retry=retry, timeout=timeout, metadata=metadata)
client.delete_entry_group(
request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
self.log.info('Deleted a entry group: name=%s', name)
@GoogleBaseHook.fallback_to_default_project_id
@ -454,10 +493,12 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
name = DataCatalogClient.tag_path(project_id, location, entry_group, entry, tag)
name = (
f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}/entries/{entry}/tags/{tag}"
)
self.log.info('Deleting a tag: name=%s', name)
client.delete_tag(name=name, retry=retry, timeout=timeout, metadata=metadata)
client.delete_tag(request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ())
self.log.info('Deleted a tag: name=%s', name)
@GoogleBaseHook.fallback_to_default_project_id
@ -495,10 +536,12 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
name = DataCatalogClient.tag_template_path(project_id, location, tag_template)
name = f"projects/{project_id}/locations/{location}/tagTemplates/{tag_template}"
self.log.info('Deleting a tag template: name=%s', name)
client.delete_tag_template(name=name, force=force, retry=retry, timeout=timeout, metadata=metadata)
client.delete_tag_template(
request={'name': name, 'force': force}, retry=retry, timeout=timeout, metadata=metadata or ()
)
self.log.info('Deleted a tag template: name=%s', name)
@GoogleBaseHook.fallback_to_default_project_id
@ -537,11 +580,11 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
name = DataCatalogClient.tag_template_field_path(project_id, location, tag_template, field)
name = f"projects/{project_id}/locations/{location}/tagTemplates/{tag_template}/fields/{field}"
self.log.info('Deleting a tag template field: name=%s', name)
client.delete_tag_template_field(
name=name, force=force, retry=retry, timeout=timeout, metadata=metadata
request={'name': name, 'force': force}, retry=retry, timeout=timeout, metadata=metadata or ()
)
self.log.info('Deleted a tag template field: name=%s', name)
@ -578,10 +621,12 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
name = DataCatalogClient.entry_path(project_id, location, entry_group, entry)
name = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}/entries/{entry}"
self.log.info('Getting a entry: name=%s', name)
result = client.get_entry(name=name, retry=retry, timeout=timeout, metadata=metadata)
result = client.get_entry(
request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
self.log.info('Received a entry: name=%s', result.name)
return result
@ -607,8 +652,8 @@ class CloudDataCatalogHook(GoogleBaseHook):
:param read_mask: The fields to return. If not set or empty, all fields are returned.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.datacatalog_v1beta1.types.FieldMask`
:type read_mask: Union[Dict, google.cloud.datacatalog_v1beta1.types.FieldMask]
:class:`~google.protobuf.field_mask_pb2.FieldMask`
:type read_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
:param project_id: The ID of the Google Cloud project that owns the entry group.
If set to ``None`` or missing, the default project_id from the Google Cloud connection is used.
:type project_id: str
@ -622,12 +667,15 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
name = DataCatalogClient.entry_group_path(project_id, location, entry_group)
name = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}"
self.log.info('Getting a entry group: name=%s', name)
result = client.get_entry_group(
name=name, read_mask=read_mask, retry=retry, timeout=timeout, metadata=metadata
request={'name': name, 'read_mask': read_mask},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)
self.log.info('Received a entry group: name=%s', result.name)
@ -664,11 +712,13 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
name = DataCatalogClient.tag_template_path(project_id, location, tag_template)
name = f"projects/{project_id}/locations/{location}/tagTemplates/{tag_template}"
self.log.info('Getting a tag template: name=%s', name)
result = client.get_tag_template(name=name, retry=retry, timeout=timeout, metadata=metadata)
result = client.get_tag_template(
request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
self.log.info('Received a tag template: name=%s', result.name)
@ -712,12 +762,15 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
parent = DataCatalogClient.entry_path(project_id, location, entry_group, entry)
parent = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}/entries/{entry}"
self.log.info('Listing tag on entry: entry_name=%s', parent)
result = client.list_tags(
parent=parent, page_size=page_size, retry=retry, timeout=timeout, metadata=metadata
request={'parent': parent, 'page_size': page_size},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)
self.log.info('Received tags.')
@ -811,12 +864,18 @@ class CloudDataCatalogHook(GoogleBaseHook):
if linked_resource:
self.log.info('Getting entry: linked_resource=%s', linked_resource)
result = client.lookup_entry(
linked_resource=linked_resource, retry=retry, timeout=timeout, metadata=metadata
request={'linked_resource': linked_resource},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)
else:
self.log.info('Getting entry: sql_resource=%s', sql_resource)
result = client.lookup_entry(
sql_resource=sql_resource, retry=retry, timeout=timeout, metadata=metadata
request={'sql_resource': sql_resource},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)
self.log.info('Received entry. name=%s', result.name)
@ -860,18 +919,17 @@ class CloudDataCatalogHook(GoogleBaseHook):
:type metadata: Sequence[Tuple[str, str]]
"""
client = self.get_conn()
name = DataCatalogClient.tag_template_field_path(project_id, location, tag_template, field)
name = f"projects/{project_id}/locations/{location}/tagTemplates/{tag_template}/fields/{field}"
self.log.info(
'Renaming field: old_name=%s, new_tag_template_field_id=%s', name, new_tag_template_field_id
)
result = client.rename_tag_template_field(
name=name,
new_tag_template_field_id=new_tag_template_field_id,
request={'name': name, 'new_tag_template_field_id': new_tag_template_field_id},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
self.log.info('Renamed tag template field.')
@ -946,13 +1004,10 @@ class CloudDataCatalogHook(GoogleBaseHook):
order_by,
)
result = client.search_catalog(
scope=scope,
query=query,
page_size=page_size,
order_by=order_by,
request={'scope': scope, 'query': query, 'page_size': page_size, 'order_by': order_by},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
self.log.info('Received items.')
@ -984,8 +1039,8 @@ class CloudDataCatalogHook(GoogleBaseHook):
updated.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.datacatalog_v1beta1.types.FieldMask`
:type update_mask: Union[Dict, google.cloud.datacatalog_v1beta1.types.FieldMask]
:class:`~google.protobuf.field_mask_pb2.FieldMask`
:type update_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
:param location: Required. The location of the entry to update.
:type location: str
:param entry_group: The entry group ID for the entry that is being updated.
@ -1006,7 +1061,9 @@ class CloudDataCatalogHook(GoogleBaseHook):
"""
client = self.get_conn()
if project_id and location and entry_group and entry_id:
full_entry_name = DataCatalogClient.entry_path(project_id, location, entry_group, entry_id)
full_entry_name = (
f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}/entries/{entry_id}"
)
if isinstance(entry, Entry):
entry.name = full_entry_name
elif isinstance(entry, dict):
@ -1025,7 +1082,10 @@ class CloudDataCatalogHook(GoogleBaseHook):
if isinstance(entry, dict):
entry = Entry(**entry)
result = client.update_entry(
entry=entry, update_mask=update_mask, retry=retry, timeout=timeout, metadata=metadata
request={'entry': entry, 'update_mask': update_mask},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)
self.log.info('Updated entry.')
@ -1059,7 +1119,7 @@ class CloudDataCatalogHook(GoogleBaseHook):
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.datacatalog_v1beta1.types.FieldMask`
:type update_mask: Union[Dict, google.cloud.datacatalog_v1beta1.types.FieldMask]
:type update_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
:param location: Required. The location of the tag to rename.
:type location: str
:param entry_group: The entry group ID for the tag that is being updated.
@ -1082,7 +1142,10 @@ class CloudDataCatalogHook(GoogleBaseHook):
"""
client = self.get_conn()
if project_id and location and entry_group and entry and tag_id:
full_tag_name = DataCatalogClient.tag_path(project_id, location, entry_group, entry, tag_id)
full_tag_name = (
f"projects/{project_id}/locations/{location}/entryGroups/{entry_group}/entries/{entry}"
f"/tags/{tag_id}"
)
if isinstance(tag, Tag):
tag.name = full_tag_name
elif isinstance(tag, dict):
@ -1102,7 +1165,10 @@ class CloudDataCatalogHook(GoogleBaseHook):
if isinstance(tag, dict):
tag = Tag(**tag)
result = client.update_tag(
tag=tag, update_mask=update_mask, retry=retry, timeout=timeout, metadata=metadata
request={'tag': tag, 'update_mask': update_mask},
retry=retry,
timeout=timeout,
metadata=metadata or (),
)
self.log.info('Updated tag.')
@ -1137,8 +1203,8 @@ class CloudDataCatalogHook(GoogleBaseHook):
If absent or empty, all of the allowed fields above will be updated.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.datacatalog_v1beta1.types.FieldMask`
:type update_mask: Union[Dict, google.cloud.datacatalog_v1beta1.types.FieldMask]
:class:`~google.protobuf.field_mask_pb2.FieldMask`
:type update_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
:param location: Required. The location of the tag template to rename.
:type location: str
:param tag_template_id: Optional. The tag template ID for the entry that is being updated.
@ -1157,8 +1223,8 @@ class CloudDataCatalogHook(GoogleBaseHook):
"""
client = self.get_conn()
if project_id and location and tag_template:
full_tag_template_name = DataCatalogClient.tag_template_path(
project_id, location, tag_template_id
full_tag_template_name = (
f"projects/{project_id}/locations/{location}/tagTemplates/{tag_template_id}"
)
if isinstance(tag_template, TagTemplate):
tag_template.name = full_tag_template_name
@ -1179,11 +1245,10 @@ class CloudDataCatalogHook(GoogleBaseHook):
if isinstance(tag_template, dict):
tag_template = TagTemplate(**tag_template)
result = client.update_tag_template(
tag_template=tag_template,
update_mask=update_mask,
request={'tag_template': tag_template, 'update_mask': update_mask},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
self.log.info('Updated tag template.')
@ -1222,8 +1287,8 @@ class CloudDataCatalogHook(GoogleBaseHook):
Therefore, enum values can only be added, existing enum values cannot be deleted nor renamed.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.datacatalog_v1beta1.types.FieldMask`
:type update_mask: Union[Dict, google.cloud.datacatalog_v1beta1.types.FieldMask]
:class:`~google.protobuf.field_mask_pb2.FieldMask`
:type update_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
:param tag_template_field_name: Optional. The name of the tag template field to rename.
:type tag_template_field_name: str
:param location: Optional. The location of the tag to rename.
@ -1246,19 +1311,22 @@ class CloudDataCatalogHook(GoogleBaseHook):
"""
client = self.get_conn()
if project_id and location and tag_template and tag_template_field_id:
tag_template_field_name = DataCatalogClient.tag_template_field_path(
project_id, location, tag_template, tag_template_field_id
tag_template_field_name = (
f"projects/{project_id}/locations/{location}/tagTemplates/{tag_template}"
f"/fields/{tag_template_field_id}"
)
self.log.info("Updating tag template field: name=%s", tag_template_field_name)
result = client.update_tag_template_field(
name=tag_template_field_name,
tag_template_field=tag_template_field,
update_mask=update_mask,
request={
'name': tag_template_field_name,
'tag_template_field': tag_template_field,
'update_mask': update_mask,
},
retry=retry,
timeout=timeout,
metadata=metadata,
metadata=metadata or (),
)
self.log.info('Updated tag template field.')

Просмотреть файл

@ -19,17 +19,16 @@ from typing import Dict, Optional, Sequence, Tuple, Union
from google.api_core.exceptions import AlreadyExists, NotFound
from google.api_core.retry import Retry
from google.cloud.datacatalog_v1beta1 import DataCatalogClient
from google.cloud.datacatalog_v1beta1 import DataCatalogClient, SearchCatalogResult
from google.cloud.datacatalog_v1beta1.types import (
Entry,
EntryGroup,
FieldMask,
SearchCatalogRequest,
Tag,
TagTemplate,
TagTemplateField,
)
from google.protobuf.json_format import MessageToDict
from google.protobuf.field_mask_pb2 import FieldMask
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.datacatalog import CloudDataCatalogHook
@ -153,7 +152,7 @@ class CloudDataCatalogCreateEntryOperator(BaseOperator):
_, _, entry_id = result.name.rpartition("/")
self.log.info("Current entry_id ID: %s", entry_id)
context["task_instance"].xcom_push(key="entry_id", value=entry_id)
return MessageToDict(result)
return Entry.to_dict(result)
class CloudDataCatalogCreateEntryGroupOperator(BaseOperator):
@ -268,7 +267,7 @@ class CloudDataCatalogCreateEntryGroupOperator(BaseOperator):
_, _, entry_group_id = result.name.rpartition("/")
self.log.info("Current entry group ID: %s", entry_group_id)
context["task_instance"].xcom_push(key="entry_group_id", value=entry_group_id)
return MessageToDict(result)
return EntryGroup.to_dict(result)
class CloudDataCatalogCreateTagOperator(BaseOperator):
@ -404,7 +403,7 @@ class CloudDataCatalogCreateTagOperator(BaseOperator):
_, _, tag_id = tag.name.rpartition("/")
self.log.info("Current Tag ID: %s", tag_id)
context["task_instance"].xcom_push(key="tag_id", value=tag_id)
return MessageToDict(tag)
return Tag.to_dict(tag)
class CloudDataCatalogCreateTagTemplateOperator(BaseOperator):
@ -516,7 +515,7 @@ class CloudDataCatalogCreateTagTemplateOperator(BaseOperator):
_, _, tag_template = result.name.rpartition("/")
self.log.info("Current Tag ID: %s", tag_template)
context["task_instance"].xcom_push(key="tag_template_id", value=tag_template)
return MessageToDict(result)
return TagTemplate.to_dict(result)
class CloudDataCatalogCreateTagTemplateFieldOperator(BaseOperator):
@ -638,7 +637,7 @@ class CloudDataCatalogCreateTagTemplateFieldOperator(BaseOperator):
self.log.info("Current Tag ID: %s", self.tag_template_field_id)
context["task_instance"].xcom_push(key="tag_template_field_id", value=self.tag_template_field_id)
return MessageToDict(result)
return TagTemplateField.to_dict(result)
class CloudDataCatalogDeleteEntryOperator(BaseOperator):
@ -1216,7 +1215,7 @@ class CloudDataCatalogGetEntryOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
return MessageToDict(result)
return Entry.to_dict(result)
class CloudDataCatalogGetEntryGroupOperator(BaseOperator):
@ -1234,8 +1233,8 @@ class CloudDataCatalogGetEntryGroupOperator(BaseOperator):
:param read_mask: The fields to return. If not set or empty, all fields are returned.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.datacatalog_v1beta1.types.FieldMask`
:type read_mask: Union[Dict, google.cloud.datacatalog_v1beta1.types.FieldMask]
:class:`~google.protobuf.field_mask_pb2.FieldMask`
:type read_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
:param project_id: The ID of the Google Cloud project that owns the entry group.
If set to ``None`` or missing, the default project_id from the Google Cloud connection is used.
:type project_id: Optional[str]
@ -1312,7 +1311,7 @@ class CloudDataCatalogGetEntryGroupOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
return MessageToDict(result)
return EntryGroup.to_dict(result)
class CloudDataCatalogGetTagTemplateOperator(BaseOperator):
@ -1399,7 +1398,7 @@ class CloudDataCatalogGetTagTemplateOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
return MessageToDict(result)
return TagTemplate.to_dict(result)
class CloudDataCatalogListTagsOperator(BaseOperator):
@ -1501,7 +1500,7 @@ class CloudDataCatalogListTagsOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
return [MessageToDict(item) for item in result]
return [Tag.to_dict(item) for item in result]
class CloudDataCatalogLookupEntryOperator(BaseOperator):
@ -1589,7 +1588,7 @@ class CloudDataCatalogLookupEntryOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
return MessageToDict(result)
return Entry.to_dict(result)
class CloudDataCatalogRenameTagTemplateFieldOperator(BaseOperator):
@ -1809,7 +1808,7 @@ class CloudDataCatalogSearchCatalogOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
return [MessageToDict(item) for item in result]
return [SearchCatalogResult.to_dict(item) for item in result]
class CloudDataCatalogUpdateEntryOperator(BaseOperator):
@ -1829,8 +1828,8 @@ class CloudDataCatalogUpdateEntryOperator(BaseOperator):
updated.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.datacatalog_v1beta1.types.FieldMask`
:type update_mask: Union[Dict, google.cloud.datacatalog_v1beta1.types.FieldMask]
:class:`~google.protobuf.field_mask_pb2.FieldMask`
:type update_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
:param location: Required. The location of the entry to update.
:type location: str
:param entry_group: The entry group ID for the entry that is being updated.
@ -1940,8 +1939,8 @@ class CloudDataCatalogUpdateTagOperator(BaseOperator):
updated. Currently the only modifiable field is the field ``fields``.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.datacatalog_v1beta1.types.FieldMask`
:type update_mask: Union[Dict, google.cloud.datacatalog_v1beta1.types.FieldMask]
:class:`~google.protobuf.field_mask_pb2.FieldMask`
:type update_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
:param location: Required. The location of the tag to rename.
:type location: str
:param entry_group: The entry group ID for the tag that is being updated.
@ -2060,8 +2059,8 @@ class CloudDataCatalogUpdateTagTemplateOperator(BaseOperator):
If absent or empty, all of the allowed fields above will be updated.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.datacatalog_v1beta1.types.FieldMask`
:type update_mask: Union[Dict, google.cloud.datacatalog_v1beta1.types.FieldMask]
:class:`~google.protobuf.field_mask_pb2.FieldMask`
:type update_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
:param location: Required. The location of the tag template to rename.
:type location: str
:param tag_template_id: Optional. The tag template ID for the entry that is being updated.
@ -2172,8 +2171,8 @@ class CloudDataCatalogUpdateTagTemplateFieldOperator(BaseOperator):
Therefore, enum values can only be added, existing enum values cannot be deleted nor renamed.
If a dict is provided, it must be of the same form as the protobuf message
:class:`~google.cloud.datacatalog_v1beta1.types.FieldMask`
:type update_mask: Union[Dict, google.cloud.datacatalog_v1beta1.types.FieldMask]
:class:`~google.protobuf.field_mask_pb2.FieldMask`
:type update_mask: Union[Dict, google.protobuf.field_mask_pb2.FieldMask]
:param tag_template_field_name: Optional. The name of the tag template field to rename.
:type tag_template_field_name: str
:param location: Optional. The location of the tag to rename.

Просмотреть файл

@ -285,7 +285,7 @@ google = [
'google-cloud-bigquery-datatransfer>=3.0.0,<4.0.0',
'google-cloud-bigtable>=1.0.0,<2.0.0',
'google-cloud-container>=0.1.1,<2.0.0',
'google-cloud-datacatalog>=1.0.0,<2.0.0',
'google-cloud-datacatalog>=3.0.0,<4.0.0',
'google-cloud-dataproc>=1.0.1,<2.0.0',
'google-cloud-dlp>=0.11.0,<2.0.0',
'google-cloud-kms>=2.0.0,<3.0.0',

Просмотреть файл

@ -21,6 +21,7 @@ from typing import Dict, Sequence, Tuple
from unittest import TestCase, mock
from google.api_core.retry import Retry
from google.cloud.datacatalog_v1beta1 import CreateTagRequest, CreateTagTemplateRequest
from google.cloud.datacatalog_v1beta1.types import Entry, Tag, TagTemplate
from airflow import AirflowException
@ -37,7 +38,7 @@ TEST_ENTRY_ID: str = "test-entry-id"
TEST_ENTRY: Dict = {}
TEST_RETRY: Retry = Retry()
TEST_TIMEOUT: float = 4
TEST_METADATA: Sequence[Tuple[str, str]] = []
TEST_METADATA: Sequence[Tuple[str, str]] = ()
TEST_ENTRY_GROUP_ID: str = "test-entry-group-id"
TEST_ENTRY_GROUP: Dict = {}
TEST_TAG: Dict = {}
@ -101,7 +102,7 @@ class TestCloudDataCatalog(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.lookup_entry.assert_called_once_with(
linked_resource=TEST_LINKED_RESOURCE,
request=dict(linked_resource=TEST_LINKED_RESOURCE),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -117,7 +118,10 @@ class TestCloudDataCatalog(TestCase):
sql_resource=TEST_SQL_RESOURCE, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
)
mock_get_conn.return_value.lookup_entry.assert_called_once_with(
sql_resource=TEST_SQL_RESOURCE, retry=TEST_RETRY, timeout=TEST_TIMEOUT, metadata=TEST_METADATA
request=dict(sql_resource=TEST_SQL_RESOURCE),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)
@mock.patch(
@ -147,10 +151,9 @@ class TestCloudDataCatalog(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.search_catalog.assert_called_once_with(
scope=TEST_SCOPE,
query=TEST_QUERY,
page_size=TEST_PAGE_SIZE,
order_by=TEST_ORDER_BY,
request=dict(
scope=TEST_SCOPE, query=TEST_QUERY, page_size=TEST_PAGE_SIZE, order_by=TEST_ORDER_BY
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -183,9 +186,11 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_entry.assert_called_once_with(
parent=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_1),
entry_id=TEST_ENTRY_ID,
entry=TEST_ENTRY,
request=dict(
parent=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_1),
entry_id=TEST_ENTRY_ID,
entry=TEST_ENTRY,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -206,9 +211,11 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_entry_group.assert_called_once_with(
parent=TEST_LOCATION_PATH.format(TEST_PROJECT_ID_1),
entry_group_id=TEST_ENTRY_GROUP_ID,
entry_group=TEST_ENTRY_GROUP,
request=dict(
parent=TEST_LOCATION_PATH.format(TEST_PROJECT_ID_1),
entry_group_id=TEST_ENTRY_GROUP_ID,
entry_group=TEST_ENTRY_GROUP,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -231,8 +238,10 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_tag.assert_called_once_with(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
tag={"template": TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1)},
request=CreateTagRequest(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
tag=Tag(template=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1)),
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -255,8 +264,10 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_tag.assert_called_once_with(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
tag=Tag(template=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1)),
request=CreateTagRequest(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
tag=Tag(template=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1)),
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -277,9 +288,11 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_tag_template.assert_called_once_with(
parent=TEST_LOCATION_PATH.format(TEST_PROJECT_ID_1),
tag_template_id=TEST_TAG_TEMPLATE_ID,
tag_template=TEST_TAG_TEMPLATE,
request=CreateTagTemplateRequest(
parent=TEST_LOCATION_PATH.format(TEST_PROJECT_ID_1),
tag_template_id=TEST_TAG_TEMPLATE_ID,
tag_template=TEST_TAG_TEMPLATE,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -301,9 +314,11 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_tag_template_field.assert_called_once_with(
parent=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1),
tag_template_field_id=TEST_TAG_TEMPLATE_FIELD_ID,
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
request=dict(
parent=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1),
tag_template_field_id=TEST_TAG_TEMPLATE_FIELD_ID,
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -324,7 +339,9 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.delete_entry.assert_called_once_with(
name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
request=dict(
name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -344,7 +361,9 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.delete_entry_group.assert_called_once_with(
name=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_1),
request=dict(
name=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_1),
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -366,7 +385,9 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.delete_tag.assert_called_once_with(
name=TEST_TAG_PATH.format(TEST_PROJECT_ID_1),
request=dict(
name=TEST_TAG_PATH.format(TEST_PROJECT_ID_1),
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -387,8 +408,7 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.delete_tag_template.assert_called_once_with(
name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1),
force=TEST_FORCE,
request=dict(name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1), force=TEST_FORCE),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -410,8 +430,10 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.delete_tag_template_field.assert_called_once_with(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_1),
force=TEST_FORCE,
request=dict(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_1),
force=TEST_FORCE,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -432,7 +454,9 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.get_entry.assert_called_once_with(
name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
request=dict(
name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -453,8 +477,10 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.get_entry_group.assert_called_once_with(
name=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_1),
read_mask=TEST_READ_MASK,
request=dict(
name=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_1),
read_mask=TEST_READ_MASK,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -474,7 +500,9 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.get_tag_template.assert_called_once_with(
name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1),
request=dict(
name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1),
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -496,8 +524,10 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.list_tags.assert_called_once_with(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
page_size=TEST_PAGE_SIZE,
request=dict(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
page_size=TEST_PAGE_SIZE,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -523,8 +553,10 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.list_tags.assert_called_once_with(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
page_size=100,
request=dict(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1),
page_size=100,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -547,8 +579,10 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.rename_tag_template_field.assert_called_once_with(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_1),
new_tag_template_field_id=TEST_NEW_TAG_TEMPLATE_FIELD_ID,
request=dict(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_1),
new_tag_template_field_id=TEST_NEW_TAG_TEMPLATE_FIELD_ID,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -571,8 +605,10 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.update_entry.assert_called_once_with(
entry=Entry(name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1)),
update_mask=TEST_UPDATE_MASK,
request=dict(
entry=Entry(name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_1)),
update_mask=TEST_UPDATE_MASK,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -596,8 +632,7 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.update_tag.assert_called_once_with(
tag=Tag(name=TEST_TAG_PATH.format(TEST_PROJECT_ID_1)),
update_mask=TEST_UPDATE_MASK,
request=dict(tag=Tag(name=TEST_TAG_PATH.format(TEST_PROJECT_ID_1)), update_mask=TEST_UPDATE_MASK),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -619,8 +654,10 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.update_tag_template.assert_called_once_with(
tag_template=TagTemplate(name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1)),
update_mask=TEST_UPDATE_MASK,
request=dict(
tag_template=TagTemplate(name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_1)),
update_mask=TEST_UPDATE_MASK,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -643,9 +680,11 @@ class TestCloudDataCatalogWithDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.update_tag_template_field.assert_called_once_with(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_1),
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
update_mask=TEST_UPDATE_MASK,
request=dict(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_1),
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
update_mask=TEST_UPDATE_MASK,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -679,9 +718,11 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_entry.assert_called_once_with(
parent=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_2),
entry_id=TEST_ENTRY_ID,
entry=TEST_ENTRY,
request=dict(
parent=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_2),
entry_id=TEST_ENTRY_ID,
entry=TEST_ENTRY,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -703,9 +744,11 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_entry_group.assert_called_once_with(
parent=TEST_LOCATION_PATH.format(TEST_PROJECT_ID_2),
entry_group_id=TEST_ENTRY_GROUP_ID,
entry_group=TEST_ENTRY_GROUP,
request=dict(
parent=TEST_LOCATION_PATH.format(TEST_PROJECT_ID_2),
entry_group_id=TEST_ENTRY_GROUP_ID,
entry_group=TEST_ENTRY_GROUP,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -729,8 +772,10 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_tag.assert_called_once_with(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2),
tag={"template": TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2)},
request=CreateTagRequest(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2),
tag=Tag(template=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2)),
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -754,8 +799,10 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_tag.assert_called_once_with(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2),
tag=Tag(template=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2)),
request=CreateTagRequest(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2),
tag=Tag(template=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2)),
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -777,9 +824,11 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_tag_template.assert_called_once_with(
parent=TEST_LOCATION_PATH.format(TEST_PROJECT_ID_2),
tag_template_id=TEST_TAG_TEMPLATE_ID,
tag_template=TEST_TAG_TEMPLATE,
request=CreateTagTemplateRequest(
parent=TEST_LOCATION_PATH.format(TEST_PROJECT_ID_2),
tag_template_id=TEST_TAG_TEMPLATE_ID,
tag_template=TEST_TAG_TEMPLATE,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -802,9 +851,11 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.create_tag_template_field.assert_called_once_with(
parent=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2),
tag_template_field_id=TEST_TAG_TEMPLATE_FIELD_ID,
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
request=dict(
parent=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2),
tag_template_field_id=TEST_TAG_TEMPLATE_FIELD_ID,
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -826,7 +877,7 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.delete_entry.assert_called_once_with(
name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2),
request=dict(name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2)),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -847,7 +898,7 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.delete_entry_group.assert_called_once_with(
name=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_2),
request=dict(name=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_2)),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -870,7 +921,7 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.delete_tag.assert_called_once_with(
name=TEST_TAG_PATH.format(TEST_PROJECT_ID_2),
request=dict(name=TEST_TAG_PATH.format(TEST_PROJECT_ID_2)),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -892,8 +943,7 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.delete_tag_template.assert_called_once_with(
name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2),
force=TEST_FORCE,
request=dict(name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2), force=TEST_FORCE),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -916,8 +966,7 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.delete_tag_template_field.assert_called_once_with(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_2),
force=TEST_FORCE,
request=dict(name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_2), force=TEST_FORCE),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -939,7 +988,7 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.get_entry.assert_called_once_with(
name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2),
request=dict(name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2)),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -961,8 +1010,10 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.get_entry_group.assert_called_once_with(
name=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_2),
read_mask=TEST_READ_MASK,
request=dict(
name=TEST_ENTRY_GROUP_PATH.format(TEST_PROJECT_ID_2),
read_mask=TEST_READ_MASK,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -983,7 +1034,7 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.get_tag_template.assert_called_once_with(
name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2),
request=dict(name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2)),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -1006,8 +1057,7 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.list_tags.assert_called_once_with(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2),
page_size=TEST_PAGE_SIZE,
request=dict(parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2), page_size=TEST_PAGE_SIZE),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -1034,8 +1084,7 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.list_tags.assert_called_once_with(
parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2),
page_size=100,
request=dict(parent=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2), page_size=100),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -1059,8 +1108,10 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.rename_tag_template_field.assert_called_once_with(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_2),
new_tag_template_field_id=TEST_NEW_TAG_TEMPLATE_FIELD_ID,
request=dict(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_2),
new_tag_template_field_id=TEST_NEW_TAG_TEMPLATE_FIELD_ID,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -1084,8 +1135,9 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.update_entry.assert_called_once_with(
entry=Entry(name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2)),
update_mask=TEST_UPDATE_MASK,
request=dict(
entry=Entry(name=TEST_ENTRY_PATH.format(TEST_PROJECT_ID_2)), update_mask=TEST_UPDATE_MASK
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -1110,8 +1162,7 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.update_tag.assert_called_once_with(
tag=Tag(name=TEST_TAG_PATH.format(TEST_PROJECT_ID_2)),
update_mask=TEST_UPDATE_MASK,
request=dict(tag=Tag(name=TEST_TAG_PATH.format(TEST_PROJECT_ID_2)), update_mask=TEST_UPDATE_MASK),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -1134,8 +1185,10 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.update_tag_template.assert_called_once_with(
tag_template=TagTemplate(name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2)),
update_mask=TEST_UPDATE_MASK,
request=dict(
tag_template=TagTemplate(name=TEST_TAG_TEMPLATE_PATH.format(TEST_PROJECT_ID_2)),
update_mask=TEST_UPDATE_MASK,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
@ -1159,9 +1212,11 @@ class TestCloudDataCatalogWithoutDefaultProjectIdHook(TestCase):
metadata=TEST_METADATA,
)
mock_get_conn.return_value.update_tag_template_field.assert_called_once_with(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_2),
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
update_mask=TEST_UPDATE_MASK,
request=dict(
name=TEST_TAG_TEMPLATE_FIELD_PATH.format(TEST_PROJECT_ID_2),
tag_template_field=TEST_TAG_TEMPLATE_FIELD,
update_mask=TEST_UPDATE_MASK,
),
retry=TEST_RETRY,
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,

Просмотреть файл

@ -87,15 +87,25 @@ TEST_TAG_PATH: str = (
)
TEST_ENTRY: Entry = Entry(name=TEST_ENTRY_PATH)
TEST_ENTRY_DICT: Dict = dict(name=TEST_ENTRY_PATH)
TEST_ENTRY_DICT: Dict = {
'description': '',
'display_name': '',
'linked_resource': '',
'name': TEST_ENTRY_PATH,
}
TEST_ENTRY_GROUP: EntryGroup = EntryGroup(name=TEST_ENTRY_GROUP_PATH)
TEST_ENTRY_GROUP_DICT: Dict = dict(name=TEST_ENTRY_GROUP_PATH)
TEST_TAG: EntryGroup = Tag(name=TEST_TAG_PATH)
TEST_TAG_DICT: Dict = dict(name=TEST_TAG_PATH)
TEST_ENTRY_GROUP_DICT: Dict = {'description': '', 'display_name': '', 'name': TEST_ENTRY_GROUP_PATH}
TEST_TAG: Tag = Tag(name=TEST_TAG_PATH)
TEST_TAG_DICT: Dict = {'fields': {}, 'name': TEST_TAG_PATH, 'template': '', 'template_display_name': ''}
TEST_TAG_TEMPLATE: TagTemplate = TagTemplate(name=TEST_TAG_TEMPLATE_PATH)
TEST_TAG_TEMPLATE_DICT: Dict = dict(name=TEST_TAG_TEMPLATE_PATH)
TEST_TAG_TEMPLATE_FIELD: Dict = TagTemplateField(name=TEST_TAG_TEMPLATE_FIELD_ID)
TEST_TAG_TEMPLATE_FIELD_DICT: Dict = dict(name=TEST_TAG_TEMPLATE_FIELD_ID)
TEST_TAG_TEMPLATE_DICT: Dict = {'display_name': '', 'fields': {}, 'name': TEST_TAG_TEMPLATE_PATH}
TEST_TAG_TEMPLATE_FIELD: TagTemplateField = TagTemplateField(name=TEST_TAG_TEMPLATE_FIELD_ID)
TEST_TAG_TEMPLATE_FIELD_DICT: Dict = {
'display_name': '',
'is_required': False,
'name': TEST_TAG_TEMPLATE_FIELD_ID,
'order': 0,
}
class TestCloudDataCatalogCreateEntryOperator(TestCase):
@ -498,7 +508,10 @@ class TestCloudDataCatalogDeleteTagTemplateFieldOperator(TestCase):
class TestCloudDataCatalogGetEntryOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.get_entry.return_value": TEST_ENTRY}, # type: ignore
)
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogGetEntryOperator(
task_id="task_id",
@ -529,7 +542,10 @@ class TestCloudDataCatalogGetEntryOperator(TestCase):
class TestCloudDataCatalogGetEntryGroupOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.get_entry_group.return_value": TEST_ENTRY_GROUP}, # type: ignore
)
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogGetEntryGroupOperator(
task_id="task_id",
@ -560,7 +576,10 @@ class TestCloudDataCatalogGetEntryGroupOperator(TestCase):
class TestCloudDataCatalogGetTagTemplateOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.get_tag_template.return_value": TEST_TAG_TEMPLATE}, # type: ignore
)
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogGetTagTemplateOperator(
task_id="task_id",
@ -589,7 +608,10 @@ class TestCloudDataCatalogGetTagTemplateOperator(TestCase):
class TestCloudDataCatalogListTagsOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.list_tags.return_value": [TEST_TAG]}, # type: ignore
)
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogListTagsOperator(
task_id="task_id",
@ -622,7 +644,10 @@ class TestCloudDataCatalogListTagsOperator(TestCase):
class TestCloudDataCatalogLookupEntryOperator(TestCase):
@mock.patch("airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook")
@mock.patch(
"airflow.providers.google.cloud.operators.datacatalog.CloudDataCatalogHook",
**{"return_value.lookup_entry.return_value": TEST_ENTRY}, # type: ignore
)
def test_assert_valid_hook_call(self, mock_hook) -> None:
task = CloudDataCatalogLookupEntryOperator(
task_id="task_id",