[AIRFLOW-3701] Add Google Cloud Vision Product Search operators (#4665)

This commit is contained in:
Szymon Przedwojski 2019-02-22 15:56:53 +01:00 коммит произвёл Kaxil Naik
Родитель 240138f416
Коммит ce499bb277
11 изменённых файлов: 2718 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,245 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that creates, gets, updates and deletes Products and Product Sets in the Google Cloud
Vision service in the Google Cloud Platform.
This DAG relies on the following OS environment variables
* GCP_VISION_LOCATION - Google Cloud Platform zone where the instance exists.
"""
import os
# [START howto_operator_vision_retry_import]
from google.api_core.retry import Retry
# [END howto_operator_vision_retry_import]
# [START howto_operator_vision_productset_import]
from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet
# [END howto_operator_vision_productset_import]
# [START howto_operator_vision_product_import]
from google.cloud.vision_v1.proto.product_search_service_pb2 import Product
# [END howto_operator_vision_product_import]
import airflow
from airflow import models
from airflow.contrib.operators.gcp_vision_operator import (
CloudVisionProductSetCreateOperator,
CloudVisionProductSetGetOperator,
CloudVisionProductSetUpdateOperator,
CloudVisionProductSetDeleteOperator,
CloudVisionProductCreateOperator,
CloudVisionProductGetOperator,
CloudVisionProductUpdateOperator,
CloudVisionProductDeleteOperator,
)
default_args = {'start_date': airflow.utils.dates.days_ago(1)}
# [START howto_operator_vision_args_common]
GCP_VISION_LOCATION = os.environ.get('GCP_VISION_LOCATION', 'europe-west1')
# [END howto_operator_vision_args_common]
# [START howto_operator_vision_productset]
product_set = ProductSet(display_name='My Product Set 1')
# [END howto_operator_vision_productset]
# [START howto_operator_vision_product]
product = Product(display_name='My Product 1', product_category='toys')
# [END howto_operator_vision_product]
# [START howto_operator_vision_productset_explicit_id]
GCP_VISION_PRODUCT_SET_ID = os.environ.get('GCP_VISION_PRODUCT_SET_ID', 'product_set_explicit_id')
# [END howto_operator_vision_productset_explicit_id]
# [START howto_operator_vision_product_explicit_id]
GCP_VISION_PRODUCT_ID = os.environ.get('GCP_VISION_PRODUCT_ID', 'product_explicit_id')
# [END howto_operator_vision_product_explicit_id]
with models.DAG(
'example_gcp_vision', default_args=default_args, schedule_interval=None # Override to match your needs
) as dag:
# ################################## #
# ### Autogenerated IDs examples ### #
# ################################## #
# [START howto_operator_vision_product_set_create]
product_set_create = CloudVisionProductSetCreateOperator(
location=GCP_VISION_LOCATION,
product_set=product_set,
retry=Retry(maximum=10.0),
timeout=5,
task_id='product_set_create',
)
# [END howto_operator_vision_product_set_create]
# [START howto_operator_vision_product_set_get]
product_set_get = CloudVisionProductSetGetOperator(
location=GCP_VISION_LOCATION,
product_set_id="{{ task_instance.xcom_pull('product_set_create') }}",
task_id='product_set_get',
)
# [END howto_operator_vision_product_set_get]
# [START howto_operator_vision_product_set_update]
product_set_update = CloudVisionProductSetUpdateOperator(
location=GCP_VISION_LOCATION,
product_set_id="{{ task_instance.xcom_pull('product_set_create') }}",
product_set=ProductSet(display_name='My Product Set 2'),
task_id='product_set_update',
)
# [END howto_operator_vision_product_set_update]
# [START howto_operator_vision_product_set_delete]
product_set_delete = CloudVisionProductSetDeleteOperator(
location=GCP_VISION_LOCATION,
product_set_id="{{ task_instance.xcom_pull('product_set_create') }}",
task_id='product_set_delete',
)
# [END howto_operator_vision_product_set_delete]
# [START howto_operator_vision_product_create]
product_create = CloudVisionProductCreateOperator(
location=GCP_VISION_LOCATION,
product=product,
retry=Retry(maximum=10.0),
timeout=5,
task_id='product_create',
)
# [END howto_operator_vision_product_create]
# [START howto_operator_vision_product_get]
product_get = CloudVisionProductGetOperator(
location=GCP_VISION_LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
task_id='product_get',
)
# [END howto_operator_vision_product_get]
# [START howto_operator_vision_product_update]
product_update = CloudVisionProductUpdateOperator(
location=GCP_VISION_LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
product=Product(display_name='My Product 2', description='My updated description'),
task_id='product_update',
)
# [END howto_operator_vision_product_update]
# [START howto_operator_vision_product_delete]
product_delete = CloudVisionProductDeleteOperator(
location=GCP_VISION_LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
task_id='product_delete',
)
# [END howto_operator_vision_product_delete]
product_set_create >> product_set_get >> product_set_update >> product_set_delete
product_create >> product_get >> product_update >> product_delete
# ############################# #
# ### Explicit IDs examples ### #
# ############################# #
# [START howto_operator_vision_product_set_create_2]
product_set_create_2 = CloudVisionProductSetCreateOperator(
product_set_id=GCP_VISION_PRODUCT_SET_ID,
location=GCP_VISION_LOCATION,
product_set=product_set,
retry=Retry(maximum=10.0),
timeout=5,
task_id='product_set_create_2',
)
# [END howto_operator_vision_product_set_create_2]
# Second 'create' task with the same product_set_id to demonstrate idempotence
product_set_create_2_idempotence = CloudVisionProductSetCreateOperator(
product_set_id=GCP_VISION_PRODUCT_SET_ID,
location=GCP_VISION_LOCATION,
product_set=product_set,
retry=Retry(maximum=10.0),
timeout=5,
task_id='product_set_create_2_idempotence',
)
# [START howto_operator_vision_product_set_get_2]
product_set_get_2 = CloudVisionProductSetGetOperator(
location=GCP_VISION_LOCATION, product_set_id=GCP_VISION_PRODUCT_SET_ID, task_id='product_set_get_2'
)
# [END howto_operator_vision_product_set_get_2]
# [START howto_operator_vision_product_set_update_2]
product_set_update_2 = CloudVisionProductSetUpdateOperator(
location=GCP_VISION_LOCATION,
product_set_id=GCP_VISION_PRODUCT_SET_ID,
product_set=ProductSet(display_name='My Product Set 2'),
task_id='product_set_update_2',
)
# [END howto_operator_vision_product_set_update_2]
# [START howto_operator_vision_product_set_delete_2]
product_set_delete_2 = CloudVisionProductSetDeleteOperator(
location=GCP_VISION_LOCATION, product_set_id=GCP_VISION_PRODUCT_SET_ID, task_id='product_set_delete_2'
)
# [END howto_operator_vision_product_set_delete_2]
# [START howto_operator_vision_product_create_2]
product_create_2 = CloudVisionProductCreateOperator(
product_id=GCP_VISION_PRODUCT_ID,
location=GCP_VISION_LOCATION,
product=product,
retry=Retry(maximum=10.0),
timeout=5,
task_id='product_create_2',
)
# [END howto_operator_vision_product_create_2]
# Second 'create' task with the same product_id to demonstrate idempotence
product_create_2_idempotence = CloudVisionProductCreateOperator(
product_id=GCP_VISION_PRODUCT_ID,
location=GCP_VISION_LOCATION,
product=product,
retry=Retry(maximum=10.0),
timeout=5,
task_id='product_create_2_idempotence',
)
# [START howto_operator_vision_product_get_2]
product_get_2 = CloudVisionProductGetOperator(
location=GCP_VISION_LOCATION, product_id=GCP_VISION_PRODUCT_ID, task_id='product_get_2'
)
# [END howto_operator_vision_product_get_2]
# [START howto_operator_vision_product_update_2]
product_update_2 = CloudVisionProductUpdateOperator(
location=GCP_VISION_LOCATION,
product_id=GCP_VISION_PRODUCT_ID,
product=Product(display_name='My Product 2', description='My updated description'),
task_id='product_update_2',
)
# [END howto_operator_vision_product_update_2]
# [START howto_operator_vision_product_delete_2]
product_delete_2 = CloudVisionProductDeleteOperator(
location=GCP_VISION_LOCATION, product_id=GCP_VISION_PRODUCT_ID, task_id='product_delete_2'
)
# [END howto_operator_vision_product_delete_2]
product_set_create_2 >> product_set_get_2 >> product_set_update_2 >> product_set_delete_2
product_create_2 >> product_get_2 >> product_update_2 >> product_delete_2

Просмотреть файл

@ -0,0 +1,356 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from copy import deepcopy
from google.api_core.exceptions import AlreadyExists, GoogleAPICallError, RetryError
from google.cloud.vision_v1 import ProductSearchClient
from google.protobuf.json_format import MessageToDict
from airflow import AirflowException
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
class NameDeterminer:
"""
Class used for checking if the entity has the 'name' attribute set.
- If so, no action is taken.
- If not, and the name can be constructed from other parameters provided, it is created and filled in
the entity.
- If both the entity's 'name' attribute is set and the name can be constructed from other parameters
provided:
- If they are the same: no action is taken.
- If they are different: an exception is thrown.
"""
def __init__(self, label, id_label, get_path):
self.label = label
self.id_label = id_label
self.get_path = get_path
def get_entity_with_name(self, entity, entity_id, location, project_id):
entity = deepcopy(entity)
explicit_name = getattr(entity, 'name')
if location and entity_id:
# Necessary parameters to construct the name are present. Checking for conflict with explicit name
constructed_name = self.get_path(project_id, location, entity_id)
if not explicit_name:
entity.name = constructed_name
return entity
elif explicit_name != constructed_name:
self._raise_ex_different_names(constructed_name, explicit_name)
else:
# Not enough parameters to construct the name. Trying to use the name from Product / ProductSet.
if explicit_name:
return entity
else:
self._raise_ex_unable_to_determine_name()
def _raise_ex_unable_to_determine_name(self):
raise AirflowException(
"Unable to determine the {label} name. Please either set the name directly in the {label} "
"object or provide the `location` and `{id_label}` parameters.".format(
label=self.label, id_label=self.id_label
)
)
def _raise_ex_different_names(self, constructed_name, explicit_name):
raise AirflowException(
"The {label} name provided in the object ({explicit_name}) is different than the name created "
"from the input parameters ({constructed_name}). Please either: 1) Remove the {label} name, 2) "
"Remove the location and {id_label} parameters, 3) Unify the {label} name and input "
"parameters.".format(
label=self.label,
explicit_name=explicit_name,
constructed_name=constructed_name,
id_label=self.id_label,
)
)
class CloudVisionHook(GoogleCloudBaseHook):
"""
Hook for Google Cloud Vision APIs.
"""
_client = None
product_name_determiner = NameDeterminer('Product', 'product_id', ProductSearchClient.product_path)
product_set_name_determiner = NameDeterminer(
'ProductSet', 'productset_id', ProductSearchClient.product_set_path
)
def __init__(self, gcp_conn_id='google_cloud_default', delegate_to=None):
super(CloudVisionHook, self).__init__(gcp_conn_id, delegate_to)
def get_conn(self):
"""
Retrieves connection to Cloud Vision.
:return: Google Cloud Vision client object.
:rtype: google.cloud.vision_v1.ProductSearchClient
"""
if not self._client:
self._client = ProductSearchClient(credentials=self._get_credentials())
return self._client
@GoogleCloudBaseHook.fallback_to_default_project_id
def create_product_set(
self,
location,
product_set,
project_id=None,
product_set_id=None,
retry=None,
timeout=None,
metadata=None,
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator`
"""
client = self.get_conn()
parent = ProductSearchClient.location_path(project_id, location)
self.log.info('Creating a new ProductSet under the parent: %s', parent)
response = self._handle_request(
lambda **kwargs: client.create_product_set(**kwargs),
parent=parent,
product_set=product_set,
product_set_id=product_set_id,
retry=retry,
timeout=timeout,
metadata=metadata,
)
self.log.info('ProductSet created: %s', response.name if response else '')
self.log.debug('ProductSet created:\n%s', response)
if not product_set_id:
# Product set id was generated by the API
product_set_id = self._get_autogenerated_id(response)
self.log.info('Extracted autogenerated ProductSet ID from the response: %s', product_set_id)
return product_set_id
@GoogleCloudBaseHook.fallback_to_default_project_id
def get_product_set(
self, location, product_set_id, project_id=None, retry=None, timeout=None, metadata=None
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator`
"""
client = self.get_conn()
name = ProductSearchClient.product_set_path(project_id, location, product_set_id)
self.log.info('Retrieving ProductSet: %s', name)
response = self._handle_request(
lambda **kwargs: client.get_product_set(**kwargs),
name=name,
retry=retry,
timeout=timeout,
metadata=metadata,
)
self.log.info('ProductSet retrieved.')
self.log.debug('ProductSet retrieved:\n%s', response)
return MessageToDict(response)
@GoogleCloudBaseHook.fallback_to_default_project_id
def update_product_set(
self,
product_set,
location=None,
product_set_id=None,
update_mask=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator`
"""
client = self.get_conn()
product_set = self.product_set_name_determiner.get_entity_with_name(
product_set, product_set_id, location, project_id
)
self.log.info('Updating ProductSet: %s', product_set.name)
response = self._handle_request(
lambda **kwargs: client.update_product_set(**kwargs),
product_set=product_set,
update_mask=update_mask,
retry=retry,
timeout=timeout,
metadata=metadata,
)
self.log.info('ProductSet updated: %s', response.name if response else '')
self.log.debug('ProductSet updated:\n%s', response)
return MessageToDict(response)
@GoogleCloudBaseHook.fallback_to_default_project_id
def delete_product_set(
self, location, product_set_id, project_id=None, retry=None, timeout=None, metadata=None
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator`
"""
client = self.get_conn()
name = ProductSearchClient.product_set_path(project_id, location, product_set_id)
self.log.info('Deleting ProductSet: %s', name)
response = self._handle_request(
lambda **kwargs: client.delete_product_set(**kwargs),
name=name,
retry=retry,
timeout=timeout,
metadata=metadata,
)
self.log.info('ProductSet with the name [%s] deleted.', name)
return response
@GoogleCloudBaseHook.fallback_to_default_project_id
def create_product(
self, location, product, project_id=None, product_id=None, retry=None, timeout=None, metadata=None
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator`
"""
client = self.get_conn()
parent = ProductSearchClient.location_path(project_id, location)
self.log.info('Creating a new Product under the parent: %s', parent)
response = self._handle_request(
lambda **kwargs: client.create_product(**kwargs),
parent=parent,
product=product,
product_id=product_id,
retry=retry,
timeout=timeout,
metadata=metadata,
)
self.log.info('Product created: %s', response.name if response else '')
self.log.debug('Product created:\n%s', response)
if not product_id:
# Product id was generated by the API
product_id = self._get_autogenerated_id(response)
self.log.info('Extracted autogenerated Product ID from the response: %s', product_id)
return product_id
@GoogleCloudBaseHook.fallback_to_default_project_id
def get_product(self, location, product_id, project_id=None, retry=None, timeout=None, metadata=None):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator`
"""
client = self.get_conn()
name = ProductSearchClient.product_path(project_id, location, product_id)
self.log.info('Retrieving Product: %s', name)
response = self._handle_request(
lambda **kwargs: client.get_product(**kwargs),
name=name,
retry=retry,
timeout=timeout,
metadata=metadata,
)
self.log.info('Product retrieved.')
self.log.debug('Product retrieved:\n%s', response)
return MessageToDict(response)
@GoogleCloudBaseHook.fallback_to_default_project_id
def update_product(
self,
product,
location=None,
product_id=None,
update_mask=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator`
"""
client = self.get_conn()
product = self.product_name_determiner.get_entity_with_name(product, product_id, location, project_id)
self.log.info('Updating ProductSet: %s', product.name)
response = self._handle_request(
lambda **kwargs: client.update_product(**kwargs),
product=product,
update_mask=update_mask,
retry=retry,
timeout=timeout,
metadata=metadata,
)
self.log.info('Product updated: %s', response.name if response else '')
self.log.debug('Product updated:\n%s', response)
return MessageToDict(response)
@GoogleCloudBaseHook.fallback_to_default_project_id
def delete_product(self, location, product_id, project_id=None, retry=None, timeout=None, metadata=None):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator`
"""
client = self.get_conn()
name = ProductSearchClient.product_path(project_id, location, product_id)
self.log.info('Deleting ProductSet: %s', name)
response = self._handle_request(
lambda **kwargs: client.delete_product(**kwargs),
name=name,
retry=retry,
timeout=timeout,
metadata=metadata,
)
self.log.info('Product with the name [%s] deleted:', name)
return response
def _handle_request(self, fun, **kwargs):
try:
return fun(**kwargs)
except GoogleAPICallError as e:
if isinstance(e, AlreadyExists):
raise e
else:
self.log.error('The request failed:\n%s', str(e))
raise AirflowException(e)
except RetryError as e:
self.log.error('The request failed due to a retryable error and retry attempts failed.')
raise AirflowException(e)
except ValueError as e:
self.log.error('The request failed, the parameters are invalid.')
raise AirflowException(e)
@staticmethod
def _get_entity_name(is_product, project_id, location, entity_id):
if is_product:
return ProductSearchClient.product_path(project_id, location, entity_id)
else:
return ProductSearchClient.product_set_path(project_id, location, entity_id)
@staticmethod
def _get_autogenerated_id(response):
try:
name = response.name
except AttributeError as e:
raise AirflowException('Unable to get name from response... [{}]\n{}'.format(response, e))
if '/' not in name:
raise AirflowException('Unable to get id from name... [{}]'.format(name))
return name.rsplit('/', 1)[1]

Просмотреть файл

@ -0,0 +1,673 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from google.api_core.exceptions import AlreadyExists
from airflow.contrib.hooks.gcp_vision_hook import CloudVisionHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class CloudVisionProductSetCreateOperator(BaseOperator):
"""
Creates a new ProductSet resource.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionProductSetCreateOperator`
:param product_set: (Required) The ProductSet to create. If a dict is provided, it must be of the same
form as the protobuf message `ProductSet`.
:type product_set: dict or google.cloud.vision_v1.types.ProductSet
:param location: (Required) The region where the ProductSet should be created. Valid regions
(as of 2019-02-05) are: us-east1, us-west1, europe-west1, asia-east1
:type location: str
:param project_id: (Optional) The project in which the ProductSet should be created. If set to None or
missing, the default project_id from the GCP connection is used.
:type project_id: str
:param product_set_id: (Optional) A user-supplied resource id for this ProductSet.
If set, the server will attempt to use this value as the resource id. If it is
already in use, an error is returned with code ALREADY_EXISTS. Must be at most
128 characters long. It cannot contain the character /.
:type product_set_id: str
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_productset_create_template_fields]
template_fields = ('location', 'project_id', 'product_set_id', 'gcp_conn_id')
# [END vision_productset_create_template_fields]
@apply_defaults
def __init__(
self,
product_set,
location,
project_id=None,
product_set_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
super(CloudVisionProductSetCreateOperator, self).__init__(*args, **kwargs)
self.location = location
self.project_id = project_id
self.product_set = product_set
self.product_set_id = product_set_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
def execute(self, context):
try:
return self._hook.create_product_set(
location=self.location,
project_id=self.project_id,
product_set=self.product_set,
product_set_id=self.product_set_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
except AlreadyExists:
self.log.info(
'Product set with id %s already exists. Exiting from the create operation.',
self.product_set_id,
)
return self.product_set_id
class CloudVisionProductSetGetOperator(BaseOperator):
"""
Gets information associated with a ProductSet.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionProductSetGetOperator`
:param location: (Required) The region where the ProductSet is located. Valid regions (as of 2019-02-05)
are: us-east1, us-west1, europe-west1, asia-east1
:type location: str
:param product_set_id: (Required) The resource id of this ProductSet.
:type product_set_id: str
:param project_id: (Optional) The project in which the ProductSet is located. If set
to None or missing, the default `project_id` from the GCP connection is used.
:type project_id: str
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_productset_get_template_fields]
template_fields = ('location', 'project_id', 'product_set_id', 'gcp_conn_id')
# [END vision_productset_get_template_fields]
@apply_defaults
def __init__(
self,
location,
product_set_id,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
super(CloudVisionProductSetGetOperator, self).__init__(*args, **kwargs)
self.location = location
self.project_id = project_id
self.product_set_id = product_set_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
def execute(self, context):
return self._hook.get_product_set(
location=self.location,
product_set_id=self.product_set_id,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
class CloudVisionProductSetUpdateOperator(BaseOperator):
"""
Makes changes to a `ProductSet` resource. Only display_name can be updated currently.
.. note:: To locate the `ProductSet` resource, its `name` in the form
`projects/PROJECT_ID/locations/LOC_ID/productSets/PRODUCT_SET_ID` is necessary.
You can provide the `name` directly as an attribute of the `product_set` object.
However, you can leave it blank and provide `location` and `product_set_id` instead
(and optionally `project_id` - if not present, the connection default will be used)
and the `name` will be created by the operator itself.
This mechanism exists for your convenience, to allow leaving the `project_id` empty
and having Airflow use the connection default `project_id`.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionProductSetUpdateOperator`
:param product_set: (Required) The ProductSet resource which replaces the one on the
server. If a dict is provided, it must be of the same form as the protobuf
message `ProductSet`.
:type product_set: dict or google.cloud.vision_v1.types.ProductSet
:param location: (Optional) The region where the ProductSet is located. Valid regions (as of 2019-02-05)
are: us-east1, us-west1, europe-west1, asia-east1
:type location: str
:param product_set_id: (Optional) The resource id of this ProductSet.
:type product_set_id: str
:param project_id: (Optional) The project in which the ProductSet should be created. If set to None or
missing, the default project_id from the GCP connection is used.
:type project_id: str
:param update_mask: (Optional) The `FieldMask` that specifies which fields to update. If update_mask
isnt specified, all mutable fields are to be updated. Valid mask path is display_name. If a dict is
provided, it must be of the same form as the protobuf message `FieldMask`.
:type update_mask: dict or google.cloud.vision_v1.types.FieldMask
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_productset_update_template_fields]
template_fields = ('location', 'project_id', 'product_set_id', 'gcp_conn_id')
# [END vision_productset_update_template_fields]
@apply_defaults
def __init__(
self,
product_set,
location=None,
product_set_id=None,
project_id=None,
update_mask=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
super(CloudVisionProductSetUpdateOperator, self).__init__(*args, **kwargs)
self.product_set = product_set
self.update_mask = update_mask
self.location = location
self.project_id = project_id
self.product_set_id = product_set_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
def execute(self, context):
return self._hook.update_product_set(
location=self.location,
product_set_id=self.product_set_id,
project_id=self.project_id,
product_set=self.product_set,
update_mask=self.update_mask,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
class CloudVisionProductSetDeleteOperator(BaseOperator):
"""
Permanently deletes a `ProductSet`. `Products` and `ReferenceImages` in the
`ProductSet` are not deleted. The actual image files are not deleted from Google
Cloud Storage.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionProductSetDeleteOperator`
:param location: (Required) The region where the ProductSet is located. Valid regions (as of 2019-02-05)
are: us-east1, us-west1, europe-west1, asia-east1
:type location: str
:param product_set_id: (Required) The resource id of this ProductSet.
:type product_set_id: str
:param project_id: (Optional) The project in which the ProductSet should be created.
If set to None or missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_productset_delete_template_fields]
template_fields = ('location', 'project_id', 'product_set_id', 'gcp_conn_id')
# [END vision_productset_delete_template_fields]
@apply_defaults
def __init__(
self,
location,
product_set_id,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
super(CloudVisionProductSetDeleteOperator, self).__init__(*args, **kwargs)
self.location = location
self.project_id = project_id
self.product_set_id = product_set_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
def execute(self, context):
return self._hook.delete_product_set(
location=self.location,
product_set_id=self.product_set_id,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
class CloudVisionProductCreateOperator(BaseOperator):
"""
Creates and returns a new product resource.
Possible errors regarding the `Product` object provided:
- Returns INVALID_ARGUMENT if `display_name` is missing or longer than 4096 characters.
- Returns INVALID_ARGUMENT if `description` is longer than 4096 characters.
- Returns INVALID_ARGUMENT if `product_category` is missing or invalid.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionProductCreateOperator`
:param location: (Required) The region where the Product should be created. Valid regions
(as of 2019-02-05) are: us-east1, us-west1, europe-west1, asia-east1
:type location: str
:param product: (Required) The product to create. If a dict is provided, it must be of the same form as
the protobuf message `Product`.
:type product: dict or google.cloud.vision_v1.types.Product
:param project_id: (Optional) The project in which the Product should be created. If set to None or
missing, the default project_id from the GCP connection is used.
:type project_id: str
:param product_id: (Optional) A user-supplied resource id for this Product.
If set, the server will attempt to use this value as the resource id. If it is
already in use, an error is returned with code ALREADY_EXISTS. Must be at most
128 characters long. It cannot contain the character /.
:type product_id: str
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_product_create_template_fields]
template_fields = ('location', 'project_id', 'product_id', 'gcp_conn_id')
# [END vision_product_create_template_fields]
@apply_defaults
def __init__(
self,
location,
product,
project_id=None,
product_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
super(CloudVisionProductCreateOperator, self).__init__(*args, **kwargs)
self.location = location
self.product = product
self.project_id = project_id
self.product_id = product_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
def execute(self, context):
try:
return self._hook.create_product(
location=self.location,
product=self.product,
project_id=self.project_id,
product_id=self.product_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
except AlreadyExists:
self.log.info(
'Product with id %s already exists. Exiting from the create operation.', self.product_id
)
return self.product_id
class CloudVisionProductGetOperator(BaseOperator):
"""
Gets information associated with a `Product`.
Possible errors:
- Returns NOT_FOUND if the `Product` does not exist.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionProductGetOperator`
:param location: (Required) The region where the Product is located. Valid regions (as of 2019-02-05) are:
us-east1, us-west1, europe-west1, asia-east1
:type location: str
:param product_id: (Required) The resource id of this Product.
:type product_id: str
:param project_id: (Optional) The project in which the Product is located. If set to
None or missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_product_get_template_fields]
template_fields = ('location', 'project_id', 'product_id', 'gcp_conn_id')
# [END vision_product_get_template_fields]
@apply_defaults
def __init__(
self,
location,
product_id,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
super(CloudVisionProductGetOperator, self).__init__(*args, **kwargs)
self.location = location
self.product_id = product_id
self.project_id = project_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
def execute(self, context):
return self._hook.get_product(
location=self.location,
product_id=self.product_id,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
class CloudVisionProductUpdateOperator(BaseOperator):
"""
Makes changes to a Product resource. Only the display_name, description, and labels fields can be
updated right now.
If labels are updated, the change will not be reflected in queries until the next index time.
.. note:: To locate the `Product` resource, its `name` in the form
`projects/PROJECT_ID/locations/LOC_ID/products/PRODUCT_ID` is necessary.
You can provide the `name` directly as an attribute of the `product` object. However, you can leave it
blank and provide `location` and `product_id` instead (and optionally `project_id` - if not present,
the connection default will be used) and the `name` will be created by the operator itself.
This mechanism exists for your convenience, to allow leaving the `project_id` empty and having Airflow
use the connection default `project_id`.
Possible errors related to the provided `Product`:
- Returns NOT_FOUND if the Product does not exist.
- Returns INVALID_ARGUMENT if display_name is present in update_mask but is missing from the request or
longer than 4096 characters.
- Returns INVALID_ARGUMENT if description is present in update_mask but is longer than 4096 characters.
- Returns INVALID_ARGUMENT if product_category is present in update_mask.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionProductUpdateOperator`
:param product: (Required) The Product resource which replaces the one on the server. product.name is
immutable. If a dict is provided, it must be of the same form as the protobuf message `Product`.
:type product: dict or google.cloud.vision_v1.types.ProductSet
:param location: (Optional) The region where the Product is located. Valid regions (as of 2019-02-05) are:
us-east1, us-west1, europe-west1, asia-east1
:type location: str
:param product_id: (Optional) The resource id of this Product.
:type product_id: str
:param project_id: (Optional) The project in which the Product is located. If set to None or
missing, the default project_id from the GCP connection is used.
:type project_id: str
:param update_mask: (Optional) The `FieldMask` that specifies which fields to update. If update_mask
isnt specified, all mutable fields are to be updated. Valid mask paths include product_labels,
display_name, and description. If a dict is provided, it must be of the same form as the protobuf
message `FieldMask`.
:type update_mask: dict or google.cloud.vision_v1.types.FieldMask
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_product_update_template_fields]
template_fields = ('location', 'project_id', 'product_id', 'gcp_conn_id')
# [END vision_product_update_template_fields]
@apply_defaults
def __init__(
self,
product,
location=None,
product_id=None,
project_id=None,
update_mask=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
super(CloudVisionProductUpdateOperator, self).__init__(*args, **kwargs)
self.product = product
self.location = location
self.product_id = product_id
self.project_id = project_id
self.update_mask = update_mask
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
def execute(self, context):
return self._hook.update_product(
product=self.product,
location=self.location,
product_id=self.product_id,
project_id=self.project_id,
update_mask=self.update_mask,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
class CloudVisionProductDeleteOperator(BaseOperator):
"""
Permanently deletes a product and its reference images.
Metadata of the product and all its images will be deleted right away, but search queries against
ProductSets containing the product may still work until all related caches are refreshed.
Possible errors:
- Returns NOT_FOUND if the product does not exist.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionProductDeleteOperator`
:param location: (Required) The region where the Product is located. Valid regions (as of 2019-02-05) are:
us-east1, us-west1, europe-west1, asia-east1
:type location: str
:param product_id: (Required) The resource id of this Product.
:type product_id: str
:param project_id: (Optional) The project in which the Product is located. If set to None or
missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_product_delete_template_fields]
template_fields = ('location', 'project_id', 'product_id', 'gcp_conn_id')
# [END vision_product_delete_template_fields]
@apply_defaults
def __init__(
self,
location,
product_id,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
super(CloudVisionProductDeleteOperator, self).__init__(*args, **kwargs)
self.location = location
self.product_id = product_id
self.project_id = project_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
def execute(self, context):
return self._hook.delete_product(
location=self.location,
product_id=self.product_id,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)

Просмотреть файл

@ -208,6 +208,14 @@ Operators
.. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator
.. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator
.. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator
.. autoclass:: airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageBucketCreateAclEntryOperator
.. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator
.. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator
@ -481,6 +489,7 @@ Community contributed hooks
.. autoclass:: airflow.contrib.hooks.gcp_spanner_hook.CloudSpannerHook
.. autoclass:: airflow.contrib.hooks.gcp_sql_hook.CloudSqlDatabaseHook
.. autoclass:: airflow.contrib.hooks.gcp_sql_hook.CloudSqlHook
.. autoclass:: airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook
.. autoclass:: airflow.contrib.hooks.cloudant_hook.CloudantHook
.. autoclass:: airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook
.. autoclass:: airflow.contrib.hooks.gcp_dataproc_hook.DataProcHook

Просмотреть файл

@ -1735,3 +1735,595 @@ More information
See `Google Cloud Storage ObjectAccessControls insert documentation
<https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert>`_.
Google Cloud Vision Operators
------------------------------
.. _howto/operator:CloudVisionProductSetCreateOperator:
CloudVisionProductSetCreateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Creates a new :code:`ProductSet` resource.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_explicit_id]
:end-before: [END howto_operator_vision_productset_explicit_id]
Using the operator
""""""""""""""""""
We are using the ``ProductSet`` and ``Retry`` objects from Google libraries:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_import]
:end-before: [END howto_operator_vision_productset_import]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_retry_import]
:end-before: [END howto_operator_vision_retry_import]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset]
:end-before: [END howto_operator_vision_productset]
The ``product_set_id`` argument can be omitted (it will be generated by the API):
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_create]
:end-before: [END howto_operator_vision_product_set_create]
Or it can be specified explicitly:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_create_2]
:end-before: [END howto_operator_vision_product_set_create_2]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_create_template_fields]
:end-before: [END vision_productset_create_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet create documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.create_product_set>`_.
.. _howto/operator:CloudVisionProductSetGetOperator:
CloudVisionProductSetGetOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Gets information associated with a :code:`ProductSet`.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_explicit_id]
:end-before: [END howto_operator_vision_productset_explicit_id]
Using the operator
""""""""""""""""""
If ``product_set_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_get]
:end-before: [END howto_operator_vision_product_set_get]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_get_2]
:end-before: [END howto_operator_vision_product_set_get_2]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_get_template_fields]
:end-before: [END vision_productset_get_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet get documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.get_product_set>`_.
.. _howto/operator:CloudVisionProductSetUpdateOperator:
CloudVisionProductSetUpdateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Makes changes to a :code:`ProductSet` resource. Only :code:`display_name` can be updated
currently.
.. note:: To locate the `ProductSet` resource, its `name` in the form
``projects/PROJECT_ID/locations/LOC_ID/productSets/PRODUCT_SET_ID`` is necessary.
You can provide the `name` directly as an attribute of the `product_set` object.
However, you can leave it blank and provide `location` and `product_set_id` instead (and
optionally `project_id` - if not present, the connection default will be used) and the
`name` will be created by the operator itself.
This mechanism exists for your convenience, to allow leaving the `project_id` empty and
having Airflow use the connection default `project_id`.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_explicit_id]
:end-before: [END howto_operator_vision_productset_explicit_id]
Using the operator
""""""""""""""""""
We are using the ``ProductSet`` object from the Google Cloud Vision library:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_import]
:end-before: [END howto_operator_vision_productset_import]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset]
:end-before: [END howto_operator_vision_productset]
Initialization of the task:
If ``product_set_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_update]
:end-before: [END howto_operator_vision_product_set_update]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_update_2]
:end-before: [END howto_operator_vision_product_set_update_2]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_update_template_fields]
:end-before: [END vision_productset_update_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet update documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.update_product_set>`_.
.. _howto/operator:CloudVisionProductSetDeleteOperator:
CloudVisionProductSetDeleteOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Permanently deletes a :code:`ProductSet`. :code:`Products` and :code:`ReferenceImages` in
the :code:`ProductSet` are not deleted. The actual image files are not deleted from
Google Cloud Storage.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_explicit_id]
:end-before: [END howto_operator_vision_productset_explicit_id]
Using the operator
""""""""""""""""""
If ``product_set_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_delete]
:end-before: [END howto_operator_vision_product_set_delete]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_delete_2]
:end-before: [END howto_operator_vision_product_set_delete_2]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_delete_template_fields]
:end-before: [END vision_productset_delete_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet delete documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.delete_product_set>`_.
.. _howto/operator:CloudVisionProductCreateOperator:
CloudVisionProductCreateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Creates and returns a new product resource.
Possible errors regarding the :code:`Product` object provided:
- Returns INVALID_ARGUMENT if `display_name` is missing or longer than 4096 characters.
- Returns INVALID_ARGUMENT if `description` is longer than 4096 characters.
- Returns INVALID_ARGUMENT if `product_category` is missing or invalid.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_explicit_id]
:end-before: [END howto_operator_vision_product_explicit_id]
Using the operator
""""""""""""""""""
We are using the ``Product`` and ``Retry`` objects from Google libraries:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_import]
:end-before: [END howto_operator_vision_product_import]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_retry_import]
:end-before: [END howto_operator_vision_retry_import]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product]
:end-before: [END howto_operator_vision_product]
The ``product_id`` argument can be omitted (it will be generated by the API):
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_create]
:end-before: [END howto_operator_vision_product_create]
Or it can be specified explicitly:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_create_2]
:end-before: [END howto_operator_vision_product_create_2]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_product_create_template_fields]
:end-before: [END vision_product_create_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision Product create documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.create_product>`_.
.. _howto/operator:CloudVisionProductGetOperator:
CloudVisionProductGetOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Gets information associated with a :code:`Product`.
Possible errors:
- Returns NOT_FOUND if the `Product` does not exist.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_explicit_id]
:end-before: [END howto_operator_vision_product_explicit_id]
Using the operator
""""""""""""""""""
If ``product_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_get]
:end-before: [END howto_operator_vision_product_get]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_get_2]
:end-before: [END howto_operator_vision_product_get_2]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_product_get_template_fields]
:end-before: [END vision_product_get_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision Product get documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.get_product>`_.
.. _howto/operator:CloudVisionProductUpdateOperator:
CloudVisionProductUpdateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Makes changes to a :code:`Product` resource. Only the :code:`display_name`,
:code:`description`, and :code:`labels` fields can be updated right now.
If labels are updated, the change will not be reflected in queries until the next index
time.
.. note:: To locate the `Product` resource, its `name` in the form
``projects/PROJECT_ID/locations/LOC_ID/products/PRODUCT_ID`` is necessary.
You can provide the `name` directly as an attribute of the `product` object. However, you
can leave it blank and provide `location` and `product_id` instead (and optionally
`project_id` - if not present, the connection default will be used) and the `name` will
be created by the operator itself.
This mechanism exists for your convenience, to allow leaving the `project_id` empty and
having Airflow use the connection default `project_id`.
Possible errors:
- Returns NOT_FOUND if the `Product` does not exist.
- Returns INVALID_ARGUMENT if `display_name` is present in `update_mask` but is missing
from the request or longer than 4096 characters.
- Returns INVALID_ARGUMENT if `description` is present in `update_mask` but is longer than
4096 characters.
- Returns INVALID_ARGUMENT if `product_category` is present in `update_mask`.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_explicit_id]
:end-before: [END howto_operator_vision_product_explicit_id]
Using the operator
""""""""""""""""""
We are using the ``Product`` object from the Google Cloud Vision library:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_import]
:end-before: [END howto_operator_vision_product_import]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product]
:end-before: [END howto_operator_vision_product]
If ``product_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_update]
:end-before: [END howto_operator_vision_product_update]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_update_2]
:end-before: [END howto_operator_vision_product_update_2]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_product_update_template_fields]
:end-before: [END vision_product_update_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision Product update documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.update_product>`_.
.. _howto/operator:CloudVisionProductDeleteOperator:
CloudVisionProductDeleteOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Permanently deletes a product and its reference images.
Metadata of the product and all its images will be deleted right away, but search queries
against :code:`ProductSets` containing the product may still work until all related
caches are refreshed.
Possible errors:
- Returns NOT_FOUND if the product does not exist.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_explicit_id]
:end-before: [END howto_operator_vision_product_explicit_id]
Using the operator
""""""""""""""""""
If ``product_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_delete]
:end-before: [END howto_operator_vision_product_delete]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_delete_2]
:end-before: [END howto_operator_vision_product_delete_2]
Templating
""""""""""
.. literalinclude:: ../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_product_delete_template_fields]
:end-before: [END vision_product_delete_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision Product delete documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.delete_product>`_.

Просмотреть файл

@ -593,6 +593,31 @@ Transfer Service
They also use :class:`airflow.contrib.hooks.gcp_transfer_hook.GCPTransferServiceHook` to communicate with Google Cloud Platform.
Cloud Vision
''''''''''''
Cloud Vision Product Search Operators
"""""""""""""""""""""""""""""""""""""
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator`
Creates a new ProductSet resource.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator`
Gets information associated with a ProductSet.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator`
Makes changes to a ProductSet resource.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator`
Permanently deletes a ProductSet.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator`
Creates a new Product resource.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator`
Gets information associated with a Product.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator`
Makes changes to a Product resource.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator`
Permanently deletes a product and its reference images.
They also use :class:`airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook` to communicate with Google Cloud Platform.
Google Kubernetes Engine
''''''''''''''''''''''''

Просмотреть файл

@ -179,6 +179,7 @@ gcp_api = [
'google-cloud-container>=0.1.1',
'google-cloud-bigtable==0.31.0',
'google-cloud-spanner>=1.7.1',
'google-cloud-vision>=0.35.2',
'grpcio-gcp>=0.2.2',
'PyOpenSSL',
'pandas-gbq'

Просмотреть файл

@ -0,0 +1,548 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from google.cloud.vision_v1 import ProductSearchClient
from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet, Product
from google.protobuf.json_format import MessageToDict
from parameterized import parameterized
from airflow.contrib.hooks.gcp_vision_hook import CloudVisionHook
from tests.contrib.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id
try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None
from airflow import AirflowException
PROJECT_ID_TEST = 'project-id'
PROJECT_ID_TEST_2 = 'project-id-2'
LOC_ID_TEST = 'loc-id'
LOC_ID_TEST_2 = 'loc-id-2'
PRODUCTSET_ID_TEST = 'ps-id'
PRODUCTSET_ID_TEST_2 = 'ps-id-2'
PRODUCT_ID_TEST = 'p-id'
PRODUCT_ID_TEST_2 = 'p-id-2'
class TestGcpVisionHook(unittest.TestCase):
def setUp(self):
with mock.patch(
'airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.__init__',
new=mock_base_gcp_hook_default_project_id,
):
self.vision_hook_default_project_id = CloudVisionHook(gcp_conn_id='test')
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_create_productset_explicit_id(self, get_conn):
# Given
create_product_set_method = get_conn.return_value.create_product_set
create_product_set_method.return_value = None
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product_set = ProductSet()
# When
result = hook.create_product_set(
location=LOC_ID_TEST,
product_set_id=PRODUCTSET_ID_TEST,
product_set=product_set,
project_id=PROJECT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
# Then
# ProductSet ID was provided explicitly in the method call above, should be returned from the method
self.assertEqual(result, PRODUCTSET_ID_TEST)
create_product_set_method.assert_called_once_with(
parent=parent,
product_set=product_set,
product_set_id=PRODUCTSET_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_create_productset_autogenerated_id(self, get_conn):
# Given
autogenerated_id = 'autogen-id'
response_product_set = ProductSet(
name=ProductSearchClient.product_set_path(PROJECT_ID_TEST, LOC_ID_TEST, autogenerated_id)
)
create_product_set_method = get_conn.return_value.create_product_set
create_product_set_method.return_value = response_product_set
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product_set = ProductSet()
# When
result = hook.create_product_set(
location=LOC_ID_TEST, product_set_id=None, product_set=product_set, project_id=PROJECT_ID_TEST
)
# Then
# ProductSet ID was not provided in the method call above. Should be extracted from the API response
# and returned.
self.assertEqual(result, autogenerated_id)
create_product_set_method.assert_called_once_with(
parent=parent,
product_set=product_set,
product_set_id=None,
retry=None,
timeout=None,
metadata=None,
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_create_productset_autogenerated_id_wrong_api_response(self, get_conn):
# Given
response_product_set = None
create_product_set_method = get_conn.return_value.create_product_set
create_product_set_method.return_value = response_product_set
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product_set = ProductSet()
# When
with self.assertRaises(AirflowException) as cm:
hook.create_product_set(
location=LOC_ID_TEST,
product_set_id=None,
product_set=product_set,
project_id=PROJECT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
# Then
# API response was wrong (None) and thus ProductSet ID extraction should fail.
err = cm.exception
self.assertIn('Unable to get name from response...', str(err))
create_product_set_method.assert_called_once_with(
parent=parent,
product_set=product_set,
product_set_id=None,
retry=None,
timeout=None,
metadata=None,
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_get_productset(self, get_conn):
# Given
name = ProductSearchClient.product_set_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST)
response_product_set = ProductSet(name=name)
get_product_set_method = get_conn.return_value.get_product_set
get_product_set_method.return_value = response_product_set
hook = self.vision_hook_default_project_id
# When
response = hook.get_product_set(
location=LOC_ID_TEST, product_set_id=PRODUCTSET_ID_TEST, project_id=PROJECT_ID_TEST
)
# Then
self.assertTrue(response)
self.assertEqual(response, MessageToDict(response_product_set))
get_product_set_method.assert_called_once_with(name=name, retry=None, timeout=None, metadata=None)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_update_productset_no_explicit_name(self, get_conn):
# Given
product_set = ProductSet()
update_product_set_method = get_conn.return_value.update_product_set
update_product_set_method.return_value = product_set
hook = self.vision_hook_default_project_id
productset_name = ProductSearchClient.product_set_path(
PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST
)
# When
result = hook.update_product_set(
location=LOC_ID_TEST,
product_set_id=PRODUCTSET_ID_TEST,
product_set=product_set,
update_mask=None,
project_id=PROJECT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
# Then
self.assertEqual(result, MessageToDict(product_set))
update_product_set_method.assert_called_once_with(
product_set=ProductSet(name=productset_name),
metadata=None,
retry=None,
timeout=None,
update_mask=None,
)
@parameterized.expand([(None, None), (None, PRODUCTSET_ID_TEST), (LOC_ID_TEST, None)])
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_update_productset_no_explicit_name_and_missing_params_for_constructed_name(
self, location, product_set_id, get_conn
):
# Given
update_product_set_method = get_conn.return_value.update_product_set
update_product_set_method.return_value = None
hook = self.vision_hook_default_project_id
product_set = ProductSet()
# When
with self.assertRaises(AirflowException) as cm:
hook.update_product_set(
location=location,
product_set_id=product_set_id,
product_set=product_set,
update_mask=None,
project_id=PROJECT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
err = cm.exception
self.assertTrue(err)
self.assertIn(
"Unable to determine the ProductSet name. Please either set the name directly in the "
"ProductSet object or provide the `location` and `productset_id` parameters.",
str(err),
)
update_product_set_method.assert_not_called()
@parameterized.expand([(None, None), (None, PRODUCTSET_ID_TEST), (LOC_ID_TEST, None)])
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_update_productset_explicit_name_missing_params_for_constructed_name(
self, location, product_set_id, get_conn
):
# Given
explicit_ps_name = ProductSearchClient.product_set_path(
PROJECT_ID_TEST_2, LOC_ID_TEST_2, PRODUCTSET_ID_TEST_2
)
product_set = ProductSet(name=explicit_ps_name)
update_product_set_method = get_conn.return_value.update_product_set
update_product_set_method.return_value = product_set
hook = self.vision_hook_default_project_id
# When
result = hook.update_product_set(
location=location,
product_set_id=product_set_id,
product_set=product_set,
update_mask=None,
project_id=PROJECT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
# Then
self.assertEqual(result, MessageToDict(product_set))
update_product_set_method.assert_called_once_with(
product_set=ProductSet(name=explicit_ps_name),
metadata=None,
retry=None,
timeout=None,
update_mask=None,
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_update_productset_explicit_name_different_from_constructed(self, get_conn):
# Given
update_product_set_method = get_conn.return_value.update_product_set
update_product_set_method.return_value = None
hook = self.vision_hook_default_project_id
explicit_ps_name = ProductSearchClient.product_set_path(
PROJECT_ID_TEST_2, LOC_ID_TEST_2, PRODUCTSET_ID_TEST_2
)
product_set = ProductSet(name=explicit_ps_name)
template_ps_name = ProductSearchClient.product_set_path(
PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST
)
# When
# Location and product_set_id are passed in addition to a ProductSet with an explicit name,
# but both names differ (constructed != explicit).
# Should throw AirflowException in this case.
with self.assertRaises(AirflowException) as cm:
hook.update_product_set(
location=LOC_ID_TEST,
product_set_id=PRODUCTSET_ID_TEST,
product_set=product_set,
update_mask=None,
project_id=PROJECT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
err = cm.exception
# self.assertIn("The required parameter 'project_id' is missing", str(err))
self.assertTrue(err)
self.assertIn(
"The ProductSet name provided in the object ({}) is different than the name "
"created from the input parameters ({}). Please either: 1) Remove the ProductSet "
"name, 2) Remove the location and productset_id parameters, 3) Unify the "
"ProductSet name and input parameters.".format(explicit_ps_name, template_ps_name),
str(err),
)
update_product_set_method.assert_not_called()
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_delete_productset(self, get_conn):
# Given
delete_product_set_method = get_conn.return_value.delete_product_set
delete_product_set_method.return_value = None
name = ProductSearchClient.product_set_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST)
hook = self.vision_hook_default_project_id
# When
response = hook.delete_product_set(
location=LOC_ID_TEST, product_set_id=PRODUCTSET_ID_TEST, project_id=PROJECT_ID_TEST
)
# Then
self.assertIsNone(response)
delete_product_set_method.assert_called_once_with(name=name, retry=None, timeout=None, metadata=None)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_create_product_explicit_id(self, get_conn):
# Given
create_product_method = get_conn.return_value.create_product
create_product_method.return_value = None
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product = Product()
# When
result = hook.create_product(
location=LOC_ID_TEST, product_id=PRODUCT_ID_TEST, product=product, project_id=PROJECT_ID_TEST
)
# Then
# Product ID was provided explicitly in the method call above, should be returned from the method
self.assertEqual(result, PRODUCT_ID_TEST)
create_product_method.assert_called_once_with(
parent=parent,
product=product,
product_id=PRODUCT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_create_product_autogenerated_id(self, get_conn):
# Given
autogenerated_id = 'autogen-p-id'
response_product = Product(
name=ProductSearchClient.product_path(PROJECT_ID_TEST, LOC_ID_TEST, autogenerated_id)
)
create_product_method = get_conn.return_value.create_product
create_product_method.return_value = response_product
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product = Product()
# When
result = hook.create_product(
location=LOC_ID_TEST, product_id=None, product=product, project_id=PROJECT_ID_TEST
)
# Then
# Product ID was not provided in the method call above. Should be extracted from the API response
# and returned.
self.assertEqual(result, autogenerated_id)
create_product_method.assert_called_once_with(
parent=parent, product=product, product_id=None, retry=None, timeout=None, metadata=None
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_create_product_autogenerated_id_wrong_name_in_response(self, get_conn):
# Given
wrong_name = 'wrong_name_not_a_correct_path'
response_product = Product(name=wrong_name)
create_product_method = get_conn.return_value.create_product
create_product_method.return_value = response_product
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product = Product()
# When
with self.assertRaises(AirflowException) as cm:
hook.create_product(
location=LOC_ID_TEST, product_id=None, product=product, project_id=PROJECT_ID_TEST
)
# Then
# API response was wrong (wrong name format) and thus ProductSet ID extraction should fail.
err = cm.exception
self.assertIn('Unable to get id from name', str(err))
create_product_method.assert_called_once_with(
parent=parent, product=product, product_id=None, retry=None, timeout=None, metadata=None
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_create_product_autogenerated_id_wrong_api_response(self, get_conn):
# Given
response_product = None
create_product_method = get_conn.return_value.create_product
create_product_method.return_value = response_product
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product = Product()
# When
with self.assertRaises(AirflowException) as cm:
hook.create_product(
location=LOC_ID_TEST, product_id=None, product=product, project_id=PROJECT_ID_TEST
)
# Then
# API response was wrong (None) and thus ProductSet ID extraction should fail.
err = cm.exception
self.assertIn('Unable to get name from response...', str(err))
create_product_method.assert_called_once_with(
parent=parent, product=product, product_id=None, retry=None, timeout=None, metadata=None
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_update_product_no_explicit_name(self, get_conn):
# Given
product = Product()
update_product_method = get_conn.return_value.update_product
update_product_method.return_value = product
hook = self.vision_hook_default_project_id
product_name = ProductSearchClient.product_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST)
# When
result = hook.update_product(
location=LOC_ID_TEST,
product_id=PRODUCT_ID_TEST,
product=product,
update_mask=None,
project_id=PROJECT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
# Then
self.assertEqual(result, MessageToDict(product))
update_product_method.assert_called_once_with(
product=Product(name=product_name), metadata=None, retry=None, timeout=None, update_mask=None
)
@parameterized.expand([(None, None), (None, PRODUCT_ID_TEST), (LOC_ID_TEST, None)])
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_update_product_no_explicit_name_and_missing_params_for_constructed_name(
self, location, product_id, get_conn
):
# Given
update_product_method = get_conn.return_value.update_product
update_product_method.return_value = None
hook = self.vision_hook_default_project_id
product = Product()
# When
with self.assertRaises(AirflowException) as cm:
hook.update_product(
location=location,
product_id=product_id,
product=product,
update_mask=None,
project_id=PROJECT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
err = cm.exception
self.assertTrue(err)
self.assertIn(
"Unable to determine the Product name. Please either set the name directly in the "
"Product object or provide the `location` and `product_id` parameters.",
str(err),
)
update_product_method.assert_not_called()
@parameterized.expand([(None, None), (None, PRODUCT_ID_TEST), (LOC_ID_TEST, None)])
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_update_product_explicit_name_missing_params_for_constructed_name(
self, location, product_id, get_conn
):
# Given
explicit_p_name = ProductSearchClient.product_path(
PROJECT_ID_TEST_2, LOC_ID_TEST_2, PRODUCT_ID_TEST_2
)
product = Product(name=explicit_p_name)
update_product_method = get_conn.return_value.update_product
update_product_method.return_value = product
hook = self.vision_hook_default_project_id
# When
result = hook.update_product(
location=location,
product_id=product_id,
product=product,
update_mask=None,
project_id=PROJECT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
# Then
self.assertEqual(result, MessageToDict(product))
update_product_method.assert_called_once_with(
product=Product(name=explicit_p_name), metadata=None, retry=None, timeout=None, update_mask=None
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_update_product_explicit_name_different_from_constructed(self, get_conn):
# Given
update_product_method = get_conn.return_value.update_product
update_product_method.return_value = None
hook = self.vision_hook_default_project_id
explicit_p_name = ProductSearchClient.product_path(
PROJECT_ID_TEST_2, LOC_ID_TEST_2, PRODUCT_ID_TEST_2
)
product = Product(name=explicit_p_name)
template_p_name = ProductSearchClient.product_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST)
# When
# Location and product_id are passed in addition to a Product with an explicit name,
# but both names differ (constructed != explicit).
# Should throw AirflowException in this case.
with self.assertRaises(AirflowException) as cm:
hook.update_product(
location=LOC_ID_TEST,
product_id=PRODUCT_ID_TEST,
product=product,
update_mask=None,
project_id=PROJECT_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
err = cm.exception
self.assertTrue(err)
self.assertIn(
"The Product name provided in the object ({}) is different than the name created from the input "
"parameters ({}). Please either: 1) Remove the Product name, 2) Remove the location and product_"
"id parameters, 3) Unify the Product name and input parameters.".format(
explicit_p_name, template_p_name
),
str(err),
)
update_product_method.assert_not_called()
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_delete_product(self, get_conn):
# Given
delete_product_method = get_conn.return_value.delete_product
delete_product_method.return_value = None
name = ProductSearchClient.product_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST)
hook = self.vision_hook_default_project_id
# When
response = hook.delete_product(
location=LOC_ID_TEST, product_id=PRODUCT_ID_TEST, project_id=PROJECT_ID_TEST
)
# Then
self.assertIsNone(response)
delete_product_method.assert_called_once_with(name=name, retry=None, timeout=None, metadata=None)

Просмотреть файл

@ -0,0 +1,234 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from google.api_core.exceptions import AlreadyExists
from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet, Product
from airflow.contrib.operators.gcp_vision_operator import (
CloudVisionProductSetCreateOperator,
CloudVisionProductSetGetOperator,
CloudVisionProductSetUpdateOperator,
CloudVisionProductSetDeleteOperator,
CloudVisionProductCreateOperator,
CloudVisionProductGetOperator,
CloudVisionProductUpdateOperator,
CloudVisionProductDeleteOperator,
)
try:
# noinspection PyProtectedMember
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None
PRODUCTSET_TEST = ProductSet(display_name='Test Product Set')
PRODUCTSET_ID_TEST = 'my-productset'
PRODUCT_TEST = Product(display_name='My Product 1', product_category='toys')
PRODUCT_ID_TEST = 'my-product'
LOCATION_TEST = 'europe-west1'
GCP_CONN_ID = 'google_cloud_default'
class CloudVisionProductSetCreateTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
mock_hook.return_value.create_product_set.return_value = {}
op = CloudVisionProductSetCreateOperator(
location=LOCATION_TEST, product_set=PRODUCTSET_TEST, task_id='id'
)
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.create_product_set.assert_called_once_with(
location=LOCATION_TEST,
product_set=PRODUCTSET_TEST,
product_set_id=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
)
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook.get_conn')
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook._handle_request')
def test_already_exists(self, _handle_request, get_conn):
get_conn.return_value = {}
_handle_request.side_effect = AlreadyExists(message='')
# Exception AlreadyExists not raised, caught in the operator's execute() - idempotence
op = CloudVisionProductSetCreateOperator(
location=LOCATION_TEST,
product_set=PRODUCTSET_TEST,
product_set_id=PRODUCTSET_ID_TEST,
project_id='mock-project-id',
task_id='id',
)
result = op.execute(None)
self.assertEqual(PRODUCTSET_ID_TEST, result)
class CloudVisionProductSetUpdateTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
mock_hook.return_value.update_product_set.return_value = {}
op = CloudVisionProductSetUpdateOperator(
location=LOCATION_TEST, product_set=PRODUCTSET_TEST, task_id='id'
)
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.update_product_set.assert_called_once_with(
location=LOCATION_TEST,
product_set=PRODUCTSET_TEST,
product_set_id=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
update_mask=None,
)
class CloudVisionProductSetGetTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
mock_hook.return_value.get_product_set.return_value = {}
op = CloudVisionProductSetGetOperator(
location=LOCATION_TEST, product_set_id=PRODUCTSET_ID_TEST, task_id='id'
)
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.get_product_set.assert_called_once_with(
location=LOCATION_TEST,
product_set_id=PRODUCTSET_ID_TEST,
project_id=None,
retry=None,
timeout=None,
metadata=None,
)
class CloudVisionProductSetDeleteTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
mock_hook.return_value.delete_product_set.return_value = {}
op = CloudVisionProductSetDeleteOperator(
location=LOCATION_TEST, product_set_id=PRODUCTSET_ID_TEST, task_id='id'
)
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.delete_product_set.assert_called_once_with(
location=LOCATION_TEST,
product_set_id=PRODUCTSET_ID_TEST,
project_id=None,
retry=None,
timeout=None,
metadata=None,
)
class CloudVisionProductCreateTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
mock_hook.return_value.create_product.return_value = {}
op = CloudVisionProductCreateOperator(location=LOCATION_TEST, product=PRODUCT_TEST, task_id='id')
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.create_product.assert_called_once_with(
location=LOCATION_TEST,
product=PRODUCT_TEST,
product_id=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
)
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook.get_conn')
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook._handle_request')
def test_already_exists(self, _handle_request, get_conn):
get_conn.return_value = {}
_handle_request.side_effect = AlreadyExists(message='')
# Exception AlreadyExists not raised, caught in the operator's execute() - idempotence
op = CloudVisionProductCreateOperator(
location=LOCATION_TEST,
product=PRODUCT_TEST,
product_id=PRODUCT_ID_TEST,
project_id='mock-project-id',
task_id='id',
)
result = op.execute(None)
self.assertEqual(PRODUCT_ID_TEST, result)
class CloudVisionProductGetTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
mock_hook.return_value.get_product.return_value = {}
op = CloudVisionProductGetOperator(location=LOCATION_TEST, product_id=PRODUCT_ID_TEST, task_id='id')
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.get_product.assert_called_once_with(
location=LOCATION_TEST,
product_id=PRODUCT_ID_TEST,
project_id=None,
retry=None,
timeout=None,
metadata=None,
)
class CloudVisionProductUpdateTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
mock_hook.return_value.update_product.return_value = {}
op = CloudVisionProductUpdateOperator(location=LOCATION_TEST, product=PRODUCT_TEST, task_id='id')
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.update_product.assert_called_once_with(
location=LOCATION_TEST,
product=PRODUCT_TEST,
product_id=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
update_mask=None,
)
class CloudVisionProductDeleteTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
mock_hook.return_value.delete_product.return_value = {}
op = CloudVisionProductDeleteOperator(
location=LOCATION_TEST, product_id=PRODUCT_ID_TEST, task_id='id'
)
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.delete_product.assert_called_once_with(
location=LOCATION_TEST,
product_id=PRODUCT_ID_TEST,
project_id=None,
retry=None,
timeout=None,
metadata=None,
)

Просмотреть файл

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.contrib.utils.base_gcp_system_test_case import DagGcpSystemTestCase, SKIP_TEST_WARNING
from tests.contrib.utils.gcp_authenticator import GCP_AI_KEY
@unittest.skipIf(DagGcpSystemTestCase.skip_check(GCP_AI_KEY), SKIP_TEST_WARNING)
class CloudVisionExampleDagsSystemTest(DagGcpSystemTestCase):
def __init__(self, method_name='runTest'):
super(CloudVisionExampleDagsSystemTest, self).__init__(
method_name, dag_id='example_gcp_vision', gcp_key=GCP_AI_KEY
)
def test_run_example_dag_function(self):
self._run_dag()

Просмотреть файл

@ -31,6 +31,7 @@ GCP_CLOUDSQL_KEY = 'gcp_cloudsql.json'
GCP_BIGTABLE_KEY = 'gcp_bigtable.json'
GCP_SPANNER_KEY = 'gcp_spanner.json'
GCP_GCS_KEY = 'gcp_gcs.json'
GCP_AI_KEY = 'gcp_ai.json'
KEYPATH_EXTRA = 'extra__google_cloud_platform__key_path'
KEYFILE_DICT_EXTRA = 'extra__google_cloud_platform__keyfile_dict'