From 707d6f2a50217abd234c20f5c6e62d5766c8e61e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Thu, 21 Mar 2019 11:49:57 +0100 Subject: [PATCH] [AIRFLOW-3908] Add more Google Cloud Vision operators (#4791) --- .../example_dags/example_gcp_vision.py | 208 ++++- airflow/contrib/hooks/gcp_api_base_hook.py | 23 + airflow/contrib/hooks/gcp_vision_hook.py | 229 +++-- .../contrib/operators/gcp_vision_operator.py | 354 +++++++- airflow/utils/decorators.py | 16 + docs/code.rst | 4 + docs/howto/operator/gcp/vision.rst | 795 ++++++++++++------ docs/integration.rst | 24 +- tests/contrib/hooks/test_gcp_api_base_hook.py | 66 +- tests/contrib/hooks/test_gcp_vision_hook.py | 184 +++- .../operators/test_gcp_vision_operator.py | 127 ++- .../test_gcp_vision_operator_system.py | 33 +- .../test_gcp_vision_operator_system_helper.py | 90 ++ .../utils/base_gcp_system_test_case.py | 22 +- tests/utils/test_decorators.py | 29 +- 15 files changed, 1771 insertions(+), 433 deletions(-) create mode 100755 tests/contrib/operators/test_gcp_vision_operator_system_helper.py diff --git a/airflow/contrib/example_dags/example_gcp_vision.py b/airflow/contrib/example_dags/example_gcp_vision.py index 9ca96012eb..2a1facffce 100644 --- a/airflow/contrib/example_dags/example_gcp_vision.py +++ b/airflow/contrib/example_dags/example_gcp_vision.py @@ -23,22 +23,36 @@ Vision service in the Google Cloud Platform. This DAG relies on the following OS environment variables -* GCP_VISION_LOCATION - Google Cloud Platform zone where the instance exists. +* GCP_VISION_LOCATION - Zone where the instance exists. +* GCP_VISION_PRODUCT_SET_ID - Product Set ID. +* GCP_VISION_PRODUCT_ID - Product ID. +* GCP_VISION_REFERENCE_IMAGE_ID - Reference Image ID. +* GCP_VISION_REFERENCE_IMAGE_URL - A link to the bucket that contains the reference image. +* GCP_VISION_ANNOTATE_IMAGE_URL - A link to the bucket that contains the file to be annotated. + """ import os # [START howto_operator_vision_retry_import] from google.api_core.retry import Retry + # [END howto_operator_vision_retry_import] -# [START howto_operator_vision_productset_import] -from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet -# [END howto_operator_vision_productset_import] +# [START howto_operator_vision_product_set_import] +from google.cloud.vision_v1.types import ProductSet +# [END howto_operator_vision_product_set_import] # [START howto_operator_vision_product_import] -from google.cloud.vision_v1.proto.product_search_service_pb2 import Product +from google.cloud.vision_v1.types import Product # [END howto_operator_vision_product_import] +# [START howto_operator_vision_reference_image_import] +from google.cloud.vision_v1.types import ReferenceImage +# [END howto_operator_vision_reference_image_import] +# [START howto_operator_vision_enums_import] +from google.cloud.vision import enums +# [END howto_operator_vision_enums_import] import airflow from airflow import models +from airflow.operators.bash_operator import BashOperator from airflow.contrib.operators.gcp_vision_operator import ( CloudVisionProductSetCreateOperator, @@ -49,6 +63,10 @@ from airflow.contrib.operators.gcp_vision_operator import ( CloudVisionProductGetOperator, CloudVisionProductUpdateOperator, CloudVisionProductDeleteOperator, + CloudVisionReferenceImageCreateOperator, + CloudVisionAddProductToProductSetOperator, + CloudVisionRemoveProductFromProductSetOperator, + CloudVisionAnnotateImageOperator, ) default_args = {'start_date': airflow.utils.dates.days_ago(1)} @@ -57,25 +75,45 @@ default_args = {'start_date': airflow.utils.dates.days_ago(1)} GCP_VISION_LOCATION = os.environ.get('GCP_VISION_LOCATION', 'europe-west1') # [END howto_operator_vision_args_common] -# [START howto_operator_vision_productset] -product_set = ProductSet(display_name='My Product Set 1') -# [END howto_operator_vision_productset] - -# [START howto_operator_vision_product] -product = Product(display_name='My Product 1', product_category='toys') -# [END howto_operator_vision_product] - -# [START howto_operator_vision_productset_explicit_id] +# [START howto_operator_vision_product_set_explicit_id] GCP_VISION_PRODUCT_SET_ID = os.environ.get('GCP_VISION_PRODUCT_SET_ID', 'product_set_explicit_id') -# [END howto_operator_vision_productset_explicit_id] +# [END howto_operator_vision_product_set_explicit_id] # [START howto_operator_vision_product_explicit_id] GCP_VISION_PRODUCT_ID = os.environ.get('GCP_VISION_PRODUCT_ID', 'product_explicit_id') # [END howto_operator_vision_product_explicit_id] +# [START howto_operator_vision_reference_image_args] +GCP_VISION_REFERENCE_IMAGE_ID = os.environ.get('GCP_VISION_REFERENCE_IMAGE_ID', 'reference_image_explicit_id') +GCP_VISION_REFERENCE_IMAGE_URL = os.environ.get('GCP_VISION_REFERENCE_IMAGE_URL', 'gs://bucket/image1.jpg') +# [END howto_operator_vision_reference_image_args] + +# [START howto_operator_vision_annotate_image_url] +GCP_VISION_ANNOTATE_IMAGE_URL = os.environ.get('GCP_VISION_ANNOTATE_IMAGE_URL', 'gs://bucket/image2.jpg') +# [END howto_operator_vision_annotate_image_url] + +# [START howto_operator_vision_product_set] +product_set = ProductSet(display_name='My Product Set') +# [END howto_operator_vision_product_set] + +# [START howto_operator_vision_product] +product = Product(display_name='My Product 1', product_category='toys') +# [END howto_operator_vision_product] + +# [START howto_operator_vision_reference_image] +reference_image = ReferenceImage(uri=GCP_VISION_REFERENCE_IMAGE_URL) +# [END howto_operator_vision_reference_image] + +# [START howto_operator_vision_annotate_image_request] +annotate_image_request = { + 'image': {'source': {'image_uri': GCP_VISION_ANNOTATE_IMAGE_URL}}, + 'features': [{'type': enums.Feature.Type.LOGO_DETECTION}], +} +# [END howto_operator_vision_annotate_image_request] + with models.DAG( - 'example_gcp_vision', default_args=default_args, schedule_interval=None # Override to match your needs -) as dag: + 'example_gcp_vision_autogenerated_id', default_args=default_args, schedule_interval=None +) as dag_autogenerated_id: # ################################## # # ### Autogenerated IDs examples ### # # ################################## # @@ -150,9 +188,59 @@ with models.DAG( ) # [END howto_operator_vision_product_delete] - product_set_create >> product_set_get >> product_set_update >> product_set_delete + # [START howto_operator_vision_reference_image_create] + reference_image_create = CloudVisionReferenceImageCreateOperator( + location=GCP_VISION_LOCATION, + reference_image=reference_image, + product_id="{{ task_instance.xcom_pull('product_create') }}", + reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID, + retry=Retry(maximum=10.0), + timeout=5, + task_id='reference_image_create', + ) + # [END howto_operator_vision_reference_image_create] + + # [START howto_operator_vision_add_product_to_product_set] + add_product_to_product_set = CloudVisionAddProductToProductSetOperator( + location=GCP_VISION_LOCATION, + product_set_id="{{ task_instance.xcom_pull('product_set_create') }}", + product_id="{{ task_instance.xcom_pull('product_create') }}", + retry=Retry(maximum=10.0), + timeout=5, + task_id='add_product_to_product_set', + ) + # [END howto_operator_vision_add_product_to_product_set] + + # [START howto_operator_vision_remove_product_from_product_set] + remove_product_from_product_set = CloudVisionRemoveProductFromProductSetOperator( + location=GCP_VISION_LOCATION, + product_set_id="{{ task_instance.xcom_pull('product_set_create') }}", + product_id="{{ task_instance.xcom_pull('product_create') }}", + retry=Retry(maximum=10.0), + timeout=5, + task_id='remove_product_from_product_set', + ) + # [END howto_operator_vision_remove_product_from_product_set] + + # Product path product_create >> product_get >> product_update >> product_delete + # ProductSet path + product_set_create >> product_set_get >> product_set_update >> product_set_delete + + # ReferenceImage path + product_create >> reference_image_create >> product_delete + + # Product/ProductSet path + product_create >> add_product_to_product_set + product_set_create >> add_product_to_product_set + add_product_to_product_set >> remove_product_from_product_set + remove_product_from_product_set >> product_delete + remove_product_from_product_set >> product_set_delete + +with models.DAG( + 'example_gcp_vision_explicit_id', default_args=default_args, schedule_interval=None +) as dag_explicit_id: # ############################# # # ### Explicit IDs examples ### # # ############################# # @@ -241,5 +329,87 @@ with models.DAG( ) # [END howto_operator_vision_product_delete_2] + # [START howto_operator_vision_reference_image_create_2] + reference_image_create_2 = CloudVisionReferenceImageCreateOperator( + location=GCP_VISION_LOCATION, + reference_image=reference_image, + product_id=GCP_VISION_PRODUCT_ID, + reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID, + retry=Retry(maximum=10.0), + timeout=5, + task_id='reference_image_create_2', + ) + # [END howto_operator_vision_reference_image_create_2] + + # Second 'create' task with the same product_id to demonstrate idempotence + reference_image_create_2_idempotence = CloudVisionReferenceImageCreateOperator( + location=GCP_VISION_LOCATION, + reference_image=reference_image, + product_id=GCP_VISION_PRODUCT_ID, + reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID, + retry=Retry(maximum=10.0), + timeout=5, + task_id='reference_image_create_2_idempotence', + ) + + # [START howto_operator_vision_add_product_to_product_set_2] + add_product_to_product_set_2 = CloudVisionAddProductToProductSetOperator( + location=GCP_VISION_LOCATION, + product_set_id=GCP_VISION_PRODUCT_SET_ID, + product_id=GCP_VISION_PRODUCT_ID, + retry=Retry(maximum=10.0), + timeout=5, + task_id='add_product_to_product_set_2', + ) + # [END howto_operator_vision_add_product_to_product_set_2] + + # [START howto_operator_vision_remove_product_from_product_set_2] + remove_product_from_product_set_2 = CloudVisionRemoveProductFromProductSetOperator( + location=GCP_VISION_LOCATION, + product_set_id=GCP_VISION_PRODUCT_SET_ID, + product_id=GCP_VISION_PRODUCT_ID, + retry=Retry(maximum=10.0), + timeout=5, + task_id='remove_product_from_product_set_2', + ) + # [END howto_operator_vision_remove_product_from_product_set_2] + + # Product path + product_create_2 >> product_create_2_idempotence >> product_get_2 >> product_update_2 >> product_delete_2 + + # ProductSet path product_set_create_2 >> product_set_get_2 >> product_set_update_2 >> product_set_delete_2 - product_create_2 >> product_get_2 >> product_update_2 >> product_delete_2 + product_set_create_2 >> product_set_create_2_idempotence >> product_set_delete_2 + + # ReferenceImage path + product_create_2 >> reference_image_create_2 >> reference_image_create_2_idempotence >> product_delete_2 + + # Product/ProductSet path + add_product_to_product_set_2 >> remove_product_from_product_set_2 + product_set_create_2 >> add_product_to_product_set_2 + product_create_2 >> add_product_to_product_set_2 + remove_product_from_product_set_2 >> product_set_delete_2 + remove_product_from_product_set_2 >> product_delete_2 + +with models.DAG( + 'example_gcp_vision_annotate_image', default_args=default_args, schedule_interval=None +) as dag_annotate_image: + # ############################## # + # ### Annotate image example ### # + # ############################## # + + # [START howto_operator_vision_annotate_image] + annotate_image = CloudVisionAnnotateImageOperator( + request=annotate_image_request, retry=Retry(maximum=10.0), timeout=5, task_id='annotate_image' + ) + # [END howto_operator_vision_annotate_image] + + # [START howto_operator_vision_annotate_image_result] + annotate_image_result = BashOperator( + bash_command="echo {{ task_instance.xcom_pull('annotate_image')" + "['logoAnnotations'][0]['description'] }}", + task_id='annotate_image_result', + ) + # [END howto_operator_vision_annotate_image_result] + + annotate_image >> annotate_image_result diff --git a/airflow/contrib/hooks/gcp_api_base_hook.py b/airflow/contrib/hooks/gcp_api_base_hook.py index 853bf40108..a7d9a8e314 100644 --- a/airflow/contrib/hooks/gcp_api_base_hook.py +++ b/airflow/contrib/hooks/gcp_api_base_hook.py @@ -27,6 +27,8 @@ import google.oauth2.service_account import os import tempfile +from google.api_core.exceptions import GoogleAPICallError, AlreadyExists, RetryError + from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook @@ -159,6 +161,27 @@ class GoogleCloudBaseHook(BaseHook): def project_id(self): return self._get_field('project') + @staticmethod + def catch_http_exception(func): + @functools.wraps(func) + def wrapper_decorator(self, *args, **kwargs): + try: + return func(self, *args, **kwargs) + except GoogleAPICallError as e: + if isinstance(e, AlreadyExists): + raise e + else: + self.log.error('The request failed:\n%s', str(e)) + raise AirflowException(e) + except RetryError as e: + self.log.error('The request failed due to a retryable error and retry attempts failed.') + raise AirflowException(e) + except ValueError as e: + self.log.error('The request failed, the parameters are invalid.') + raise AirflowException(e) + + return wrapper_decorator + @staticmethod def fallback_to_default_project_id(func): """ diff --git a/airflow/contrib/hooks/gcp_vision_hook.py b/airflow/contrib/hooks/gcp_vision_hook.py index 573c20206a..ede44bf5e5 100644 --- a/airflow/contrib/hooks/gcp_vision_hook.py +++ b/airflow/contrib/hooks/gcp_vision_hook.py @@ -18,12 +18,12 @@ # under the License. from copy import deepcopy -from google.api_core.exceptions import AlreadyExists, GoogleAPICallError, RetryError -from google.cloud.vision_v1 import ProductSearchClient +from google.cloud.vision_v1 import ProductSearchClient, ImageAnnotatorClient from google.protobuf.json_format import MessageToDict from airflow import AirflowException from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook +from airflow.utils.decorators import cached_property class NameDeterminer: @@ -114,6 +114,11 @@ class CloudVisionHook(GoogleCloudBaseHook): self._client = ProductSearchClient(credentials=self._get_credentials()) return self._client + @cached_property + def annotator_client(self): + return ImageAnnotatorClient(credentials=self._get_credentials()) + + @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def create_product_set( self, @@ -132,8 +137,7 @@ class CloudVisionHook(GoogleCloudBaseHook): client = self.get_conn() parent = ProductSearchClient.location_path(project_id, location) self.log.info('Creating a new ProductSet under the parent: %s', parent) - response = self._handle_request( - lambda **kwargs: client.create_product_set(**kwargs), + response = client.create_product_set( parent=parent, product_set=product_set, product_set_id=product_set_id, @@ -151,6 +155,7 @@ class CloudVisionHook(GoogleCloudBaseHook): return product_set_id + @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def get_product_set( self, location, product_set_id, project_id=None, retry=None, timeout=None, metadata=None @@ -162,17 +167,12 @@ class CloudVisionHook(GoogleCloudBaseHook): client = self.get_conn() name = ProductSearchClient.product_set_path(project_id, location, product_set_id) self.log.info('Retrieving ProductSet: %s', name) - response = self._handle_request( - lambda **kwargs: client.get_product_set(**kwargs), - name=name, - retry=retry, - timeout=timeout, - metadata=metadata, - ) + response = client.get_product_set(name=name, retry=retry, timeout=timeout, metadata=metadata) self.log.info('ProductSet retrieved.') self.log.debug('ProductSet retrieved:\n%s', response) return MessageToDict(response) + @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def update_product_set( self, @@ -194,18 +194,14 @@ class CloudVisionHook(GoogleCloudBaseHook): product_set, product_set_id, location, project_id ) self.log.info('Updating ProductSet: %s', product_set.name) - response = self._handle_request( - lambda **kwargs: client.update_product_set(**kwargs), - product_set=product_set, - update_mask=update_mask, - retry=retry, - timeout=timeout, - metadata=metadata, + response = client.update_product_set( + product_set=product_set, update_mask=update_mask, retry=retry, timeout=timeout, metadata=metadata ) self.log.info('ProductSet updated: %s', response.name if response else '') self.log.debug('ProductSet updated:\n%s', response) return MessageToDict(response) + @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def delete_product_set( self, location, product_set_id, project_id=None, retry=None, timeout=None, metadata=None @@ -217,16 +213,10 @@ class CloudVisionHook(GoogleCloudBaseHook): client = self.get_conn() name = ProductSearchClient.product_set_path(project_id, location, product_set_id) self.log.info('Deleting ProductSet: %s', name) - response = self._handle_request( - lambda **kwargs: client.delete_product_set(**kwargs), - name=name, - retry=retry, - timeout=timeout, - metadata=metadata, - ) + client.delete_product_set(name=name, retry=retry, timeout=timeout, metadata=metadata) self.log.info('ProductSet with the name [%s] deleted.', name) - return response + @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def create_product( self, location, product, project_id=None, product_id=None, retry=None, timeout=None, metadata=None @@ -238,8 +228,7 @@ class CloudVisionHook(GoogleCloudBaseHook): client = self.get_conn() parent = ProductSearchClient.location_path(project_id, location) self.log.info('Creating a new Product under the parent: %s', parent) - response = self._handle_request( - lambda **kwargs: client.create_product(**kwargs), + response = client.create_product( parent=parent, product=product, product_id=product_id, @@ -257,6 +246,7 @@ class CloudVisionHook(GoogleCloudBaseHook): return product_id + @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def get_product(self, location, product_id, project_id=None, retry=None, timeout=None, metadata=None): """ @@ -266,17 +256,12 @@ class CloudVisionHook(GoogleCloudBaseHook): client = self.get_conn() name = ProductSearchClient.product_path(project_id, location, product_id) self.log.info('Retrieving Product: %s', name) - response = self._handle_request( - lambda **kwargs: client.get_product(**kwargs), - name=name, - retry=retry, - timeout=timeout, - metadata=metadata, - ) + response = client.get_product(name=name, retry=retry, timeout=timeout, metadata=metadata) self.log.info('Product retrieved.') self.log.debug('Product retrieved:\n%s', response) return MessageToDict(response) + @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def update_product( self, @@ -296,18 +281,14 @@ class CloudVisionHook(GoogleCloudBaseHook): client = self.get_conn() product = self.product_name_determiner.get_entity_with_name(product, product_id, location, project_id) self.log.info('Updating ProductSet: %s', product.name) - response = self._handle_request( - lambda **kwargs: client.update_product(**kwargs), - product=product, - update_mask=update_mask, - retry=retry, - timeout=timeout, - metadata=metadata, + response = client.update_product( + product=product, update_mask=update_mask, retry=retry, timeout=timeout, metadata=metadata ) self.log.info('Product updated: %s', response.name if response else '') self.log.debug('Product updated:\n%s', response) return MessageToDict(response) + @GoogleCloudBaseHook.catch_http_exception @GoogleCloudBaseHook.fallback_to_default_project_id def delete_product(self, location, product_id, project_id=None, retry=None, timeout=None, metadata=None): """ @@ -317,38 +298,150 @@ class CloudVisionHook(GoogleCloudBaseHook): client = self.get_conn() name = ProductSearchClient.product_path(project_id, location, product_id) self.log.info('Deleting ProductSet: %s', name) - response = self._handle_request( - lambda **kwargs: client.delete_product(**kwargs), - name=name, + client.delete_product(name=name, retry=retry, timeout=timeout, metadata=metadata) + self.log.info('Product with the name [%s] deleted:', name) + + @GoogleCloudBaseHook.catch_http_exception + @GoogleCloudBaseHook.fallback_to_default_project_id + def create_reference_image( + self, + location, + product_id, + reference_image, + reference_image_id=None, + project_id=None, + retry=None, + timeout=None, + metadata=None, + ): + """ + For the documentation see: + :py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator` + """ + client = self.get_conn() + self.log.info('Creating ReferenceImage') + parent = ProductSearchClient.product_path(project=project_id, location=location, product=product_id) + + response = client.create_reference_image( + parent=parent, + reference_image=reference_image, + reference_image_id=reference_image_id, retry=retry, timeout=timeout, metadata=metadata, ) - self.log.info('Product with the name [%s] deleted:', name) - return response - def _handle_request(self, fun, **kwargs): - try: - return fun(**kwargs) - except GoogleAPICallError as e: - if isinstance(e, AlreadyExists): - raise e - else: - self.log.error('The request failed:\n%s', str(e)) - raise AirflowException(e) - except RetryError as e: - self.log.error('The request failed due to a retryable error and retry attempts failed.') - raise AirflowException(e) - except ValueError as e: - self.log.error('The request failed, the parameters are invalid.') - raise AirflowException(e) + self.log.info('ReferenceImage created: %s', response.name if response else '') + self.log.debug('ReferenceImage created:\n%s', response) - @staticmethod - def _get_entity_name(is_product, project_id, location, entity_id): - if is_product: - return ProductSearchClient.product_path(project_id, location, entity_id) - else: - return ProductSearchClient.product_set_path(project_id, location, entity_id) + if not reference_image_id: + # Refernece image id was generated by the API + reference_image_id = self._get_autogenerated_id(response) + self.log.info( + 'Extracted autogenerated ReferenceImage ID from the response: %s', reference_image_id + ) + + return reference_image_id + + @GoogleCloudBaseHook.catch_http_exception + @GoogleCloudBaseHook.fallback_to_default_project_id + def delete_reference_image( + self, + location, + product_id, + reference_image_id, + project_id=None, + retry=None, + timeout=None, + metadata=None, + ): + """ + For the documentation see: + :py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator` + """ + client = self.get_conn() + self.log.info('Deleting ReferenceImage') + name = ProductSearchClient.reference_image_path( + project=project_id, location=location, product=product_id, reference_image=reference_image_id + ) + response = client.delete_reference_image(name=name, retry=retry, timeout=timeout, metadata=metadata) + self.log.info('ReferenceImage with the name [%s] deleted.', name) + + return MessageToDict(response) + + @GoogleCloudBaseHook.catch_http_exception + @GoogleCloudBaseHook.fallback_to_default_project_id + def add_product_to_product_set( + self, + product_set_id, + product_id, + location=None, + project_id=None, + retry=None, + timeout=None, + metadata=None, + ): + """ + For the documentation see: + :py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionAddProductToProductSetOperator` + """ + client = self.get_conn() + + product_name = ProductSearchClient.product_path(project_id, location, product_id) + product_set_name = ProductSearchClient.product_set_path(project_id, location, product_set_id) + + self.log.info('Add Product[name=%s] to Product Set[name=%s]', product_name, product_set_name) + + client.add_product_to_product_set( + name=product_set_name, product=product_name, retry=retry, timeout=timeout, metadata=metadata + ) + + self.log.info('Product added to Product Set') + + @GoogleCloudBaseHook.catch_http_exception + @GoogleCloudBaseHook.fallback_to_default_project_id + def remove_product_from_product_set( + self, + product_set_id, + product_id, + location=None, + project_id=None, + retry=None, + timeout=None, + metadata=None, + ): + """ + For the documentation see: + :py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionRemoveProductFromProductSetOperator` + """ + client = self.get_conn() + + product_name = ProductSearchClient.product_path(project_id, location, product_id) + product_set_name = ProductSearchClient.product_set_path(project_id, location, product_set_id) + + self.log.info('Remove Product[name=%s] from Product Set[name=%s]', product_name, product_set_name) + + client.remove_product_from_product_set( + name=product_set_name, product=product_name, retry=retry, timeout=timeout, metadata=metadata + ) + + self.log.info('Product removed from Product Set') + + @GoogleCloudBaseHook.catch_http_exception + def annotate_image(self, request, retry=None, timeout=None): + """ + For the documentation see: + :py:class:`~airflow.contrib.operators.gcp_vision_image_annotator_operator.CloudVisionAnnotateImage` + """ + client = self.annotator_client + + self.log.info('Annotating image') + + response = client.annotate_image(request=request, retry=retry, timeout=timeout) + + self.log.info('Image annotated') + + return MessageToDict(response) @staticmethod def _get_autogenerated_id(response): diff --git a/airflow/contrib/operators/gcp_vision_operator.py b/airflow/contrib/operators/gcp_vision_operator.py index 96ed11dc2b..ba1c1b5058 100644 --- a/airflow/contrib/operators/gcp_vision_operator.py +++ b/airflow/contrib/operators/gcp_vision_operator.py @@ -53,13 +53,13 @@ class CloudVisionProductSetCreateOperator(BaseOperator): attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: Sequence[Tuple[str, str]] - :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str """ # [START vision_productset_create_template_fields] - template_fields = ('location', 'project_id', 'product_set_id', 'gcp_conn_id') + template_fields = ("location", "project_id", "product_set_id", "gcp_conn_id") # [END vision_productset_create_template_fields] @apply_defaults @@ -72,7 +72,7 @@ class CloudVisionProductSetCreateOperator(BaseOperator): retry=None, timeout=None, metadata=None, - gcp_conn_id='google_cloud_default', + gcp_conn_id="google_cloud_default", *args, **kwargs ): @@ -100,7 +100,7 @@ class CloudVisionProductSetCreateOperator(BaseOperator): ) except AlreadyExists: self.log.info( - 'Product set with id %s already exists. Exiting from the create operation.', + "Product set with id %s already exists. Exiting from the create operation.", self.product_set_id, ) return self.product_set_id @@ -130,8 +130,8 @@ class CloudVisionProductSetGetOperator(BaseOperator): attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: Sequence[Tuple[str, str]] - :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str """ @@ -216,8 +216,8 @@ class CloudVisionProductSetUpdateOperator(BaseOperator): attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: Sequence[Tuple[str, str]] - :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str """ @@ -292,8 +292,8 @@ class CloudVisionProductSetDeleteOperator(BaseOperator): attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: Sequence[Tuple[str, str]] - :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str """ @@ -326,7 +326,7 @@ class CloudVisionProductSetDeleteOperator(BaseOperator): self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id) def execute(self, context): - return self._hook.delete_product_set( + self._hook.delete_product_set( location=self.location, product_set_id=self.product_set_id, project_id=self.project_id, @@ -342,9 +342,9 @@ class CloudVisionProductCreateOperator(BaseOperator): Possible errors regarding the `Product` object provided: - - Returns INVALID_ARGUMENT if `display_name` is missing or longer than 4096 characters. - - Returns INVALID_ARGUMENT if `description` is longer than 4096 characters. - - Returns INVALID_ARGUMENT if `product_category` is missing or invalid. + - Returns `INVALID_ARGUMENT` if `display_name` is missing or longer than 4096 characters. + - Returns `INVALID_ARGUMENT` if `description` is longer than 4096 characters. + - Returns `INVALID_ARGUMENT` if `product_category` is missing or invalid. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -372,8 +372,8 @@ class CloudVisionProductCreateOperator(BaseOperator): attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: Sequence[Tuple[str, str]] - :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str """ @@ -431,7 +431,7 @@ class CloudVisionProductGetOperator(BaseOperator): Possible errors: - - Returns NOT_FOUND if the `Product` does not exist. + - Returns `NOT_FOUND` if the `Product` does not exist. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -453,8 +453,8 @@ class CloudVisionProductGetOperator(BaseOperator): attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: Sequence[Tuple[str, str]] - :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str """ @@ -472,7 +472,7 @@ class CloudVisionProductGetOperator(BaseOperator): retry=None, timeout=None, metadata=None, - gcp_conn_id='google_cloud_default', + gcp_conn_id="google_cloud_default", *args, **kwargs ): @@ -516,11 +516,12 @@ class CloudVisionProductUpdateOperator(BaseOperator): Possible errors related to the provided `Product`: - - Returns NOT_FOUND if the Product does not exist. - - Returns INVALID_ARGUMENT if display_name is present in update_mask but is missing from the request or - longer than 4096 characters. - - Returns INVALID_ARGUMENT if description is present in update_mask but is longer than 4096 characters. - - Returns INVALID_ARGUMENT if product_category is present in update_mask. + - Returns `NOT_FOUND` if the Product does not exist. + - Returns `INVALID_ARGUMENT` if `display_name` is present in update_mask but is missing from the request + or longer than 4096 characters. + - Returns `INVALID_ARGUMENT` if `description` is present in update_mask but is longer than 4096 + characters. + - Returns `INVALID_ARGUMENT` if `product_category` is present in update_mask. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -550,8 +551,8 @@ class CloudVisionProductUpdateOperator(BaseOperator): attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: Sequence[Tuple[str, str]] - :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str """ @@ -608,7 +609,7 @@ class CloudVisionProductDeleteOperator(BaseOperator): Possible errors: - - Returns NOT_FOUND if the product does not exist. + - Returns `NOT_FOUND` if the product does not exist. .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -630,8 +631,8 @@ class CloudVisionProductDeleteOperator(BaseOperator): attempt. :type timeout: float :param metadata: (Optional) Additional metadata that is provided to the method. - :type metadata: Sequence[Tuple[str, str]] - :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. :type gcp_conn_id: str """ @@ -663,7 +664,7 @@ class CloudVisionProductDeleteOperator(BaseOperator): self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id) def execute(self, context): - return self._hook.delete_product( + self._hook.delete_product( location=self.location, product_id=self.product_id, project_id=self.project_id, @@ -671,3 +672,292 @@ class CloudVisionProductDeleteOperator(BaseOperator): timeout=self.timeout, metadata=self.metadata, ) + + +class CloudVisionAnnotateImageOperator(BaseOperator): + """ + Run image detection and annotation for an image. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudVisionAnnotateImageOperator` + + :param request: (Required) Individual file annotation requests. + If a dict is provided, it must be of the same form as the protobuf + message class:`google.cloud.vision_v1.types.AnnotateImageRequest` + :type request: dict or google.cloud.vision_v1.types.AnnotateImageRequest + :param retry: (Optional) A retry object used to retry requests. If `None` is + specified, requests will not be retried. + :type retry: google.api_core.retry.Retry + :param timeout: (Optional) The amount of time, in seconds, to wait for the request to + complete. Note that if retry is specified, the timeout applies to each individual + attempt. + :type timeout: float + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + """ + + # [START vision_annotate_image_template_fields] + template_fields = ('request', 'gcp_conn_id') + # [END vision_annotate_image_template_fields] + + @apply_defaults + def __init__( + self, request, retry=None, timeout=None, gcp_conn_id='google_cloud_default', *args, **kwargs + ): + super(CloudVisionAnnotateImageOperator, self).__init__(*args, **kwargs) + self.request = request + self.retry = retry + self.timeout = timeout + self.gcp_conn_id = gcp_conn_id + + def execute(self, context): + hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id) + return hook.annotate_image(request=self.request, retry=self.retry, timeout=self.timeout) + + +class CloudVisionReferenceImageCreateOperator(BaseOperator): + """ + Creates and returns a new ReferenceImage ID resource. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudVisionReferenceImageCreateOperator` + + :param location: (Required) The region where the Product is located. Valid regions (as of 2019-02-05) are: + us-east1, us-west1, europe-west1, asia-east1 + :type location: str + :param reference_image: (Required) The reference image to create. If an image ID is specified, it is + ignored. + If a dict is provided, it must be of the same form as the protobuf message + :class:`google.cloud.vision_v1.types.ReferenceImage` + :type reference_image: dict or google.cloud.vision_v1.types.ReferenceImage + :param reference_image_id: (Optional) A user-supplied resource id for the ReferenceImage to be added. + If set, the server will attempt to use this value as the resource id. If it is already in use, an + error is returned with code ALREADY_EXISTS. Must be at most 128 characters long. It cannot contain + the character `/`. + :type reference_image_id: str + :param product_id: (Optional) The resource id of this Product. + :type product_id: str + :param project_id: (Optional) The project in which the Product is located. If set to None or + missing, the default project_id from the GCP connection is used. + :type project_id: str + :param retry: (Optional) A retry object used to retry requests. If `None` is + specified, requests will not be retried. + :type retry: google.api_core.retry.Retry + :param timeout: (Optional) The amount of time, in seconds, to wait for the request to + complete. Note that if retry is specified, the timeout applies to each individual + attempt. + :type timeout: float + :param metadata: (Optional) Additional metadata that is provided to the method. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + """ + + # [START vision_reference_image_create_template_fields] + template_fields = ( + "location", + "reference_image", + "product_id", + "reference_image_id", + "project_id", + "gcp_conn_id", + ) + # [END vision_reference_image_create_template_fields] + + @apply_defaults + def __init__( + self, + location, + reference_image, + product_id, + reference_image_id=None, + project_id=None, + retry=None, + timeout=None, + metadata=None, + gcp_conn_id='google_cloud_default', + *args, + **kwargs + ): + super(CloudVisionReferenceImageCreateOperator, self).__init__(*args, **kwargs) + self.location = location + self.product_id = product_id + self.reference_image = reference_image + self.reference_image_id = reference_image_id + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + + def execute(self, context): + try: + hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id) + return hook.create_reference_image( + location=self.location, + product_id=self.product_id, + reference_image=self.reference_image, + reference_image_id=self.reference_image_id, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except AlreadyExists: + self.log.info( + "ReferenceImage with id %s already exists. Exiting from the create operation.", + self.product_id, + ) + return self.reference_image_id + + +class CloudVisionAddProductToProductSetOperator(BaseOperator): + """ + Adds a Product to the specified ProductSet. If the Product is already present, no change is made. + + One Product can be added to at most 100 ProductSets. + + Possible errors: + + - Returns `NOT_FOUND` if the Product or the ProductSet doesn’t exist. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudVisionAddProductToProductSetOperator` + + :param product_set_id: (Required) The resource id for the ProductSet to modify. + :type product_set_id: str + :param product_id: (Required) The resource id of this Product. + :type product_id: str + :param location: (Required) The region where the ProductSet is located. Valid regions (as of 2019-02-05) + are: us-east1, us-west1, europe-west1, asia-east1 + :type: str + :param project_id: (Optional) The project in which the Product is located. If set to None or + missing, the default project_id from the GCP connection is used. + :type project_id: str + :param retry: (Optional) A retry object used to retry requests. If `None` is + specified, requests will not be retried. + :type retry: google.api_core.retry.Retry + :param timeout: (Optional) The amount of time, in seconds, to wait for the request to + complete. Note that if retry is specified, the timeout applies to each individual + attempt. + :type timeout: float + :param metadata: (Optional) Additional metadata that is provided to the method. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + """ + + # [START vision_add_product_to_product_set_template_fields] + template_fields = ("location", "product_set_id", "product_id", "project_id", "gcp_conn_id") + # [END vision_add_product_to_product_set_template_fields] + + @apply_defaults + def __init__( + self, + product_set_id, + product_id, + location, + project_id=None, + retry=None, + timeout=None, + metadata=None, + gcp_conn_id="google_cloud_default", + *args, + **kwargs + ): + super(CloudVisionAddProductToProductSetOperator, self).__init__(*args, **kwargs) + self.product_set_id = product_set_id + self.product_id = product_id + self.location = location + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + + def execute(self, context): + hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id) + return hook.add_product_to_product_set( + product_set_id=self.product_set_id, + product_id=self.product_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + + +class CloudVisionRemoveProductFromProductSetOperator(BaseOperator): + """ + Removes a Product from the specified ProductSet. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudVisionRemoveProductFromProductSetOperator` + + :param product_set_id: (Required) The resource id for the ProductSet to modify. + :type product_set_id: str + :param product_id: (Required) The resource id of this Product. + :type product_id: str + :param location: (Required) The region where the ProductSet is located. Valid regions (as of 2019-02-05) + are: us-east1, us-west1, europe-west1, asia-east1 + :type: str + :param project_id: (Optional) The project in which the Product is located. If set to None or + missing, the default project_id from the GCP connection is used. + :type project_id: str + :param retry: (Optional) A retry object used to retry requests. If `None` is + specified, requests will not be retried. + :type retry: google.api_core.retry.Retry + :param timeout: (Optional) The amount of time, in seconds, to wait for the request to + complete. Note that if retry is specified, the timeout applies to each individual + attempt. + :type timeout: float + :param metadata: (Optional) Additional metadata that is provided to the method. + :type metadata: sequence[tuple[str, str]] + :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform. + :type gcp_conn_id: str + """ + + # [START vision_remove_product_from_product_set_template_fields] + template_fields = ("location", "product_set_id", "product_id", "project_id", "gcp_conn_id") + # [END vision_remove_product_from_product_set_template_fields] + + @apply_defaults + def __init__( + self, + product_set_id, + product_id, + location, + project_id=None, + retry=None, + timeout=None, + metadata=None, + gcp_conn_id="google_cloud_default", + *args, + **kwargs + ): + super(CloudVisionRemoveProductFromProductSetOperator, self).__init__(*args, **kwargs) + self.product_set_id = product_set_id + self.product_id = product_id + self.location = location + self.project_id = project_id + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + + def execute(self, context): + hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id) + return hook.remove_product_from_product_set( + product_set_id=self.product_set_id, + product_id=self.product_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) diff --git a/airflow/utils/decorators.py b/airflow/utils/decorators.py index 4bc8e0debd..15847cf050 100644 --- a/airflow/utils/decorators.py +++ b/airflow/utils/decorators.py @@ -103,3 +103,19 @@ if 'BUILDING_AIRFLOW_DOCS' in os.environ: # flake8: noqa: F811 # Monkey patch hook to get good function headers while building docs apply_defaults = lambda x: x + + +class cached_property: + """ + A decorator creating a property, the value of which is calculated only once and cached for later use. + """ + def __init__(self, func): + self.func = func + self.__doc__ = getattr(func, '__doc__') + + def __get__(self, instance, cls=None): + if instance is None: + return self + result = self.func(instance) + instance.__dict__[self.func.__name__] = result + return result diff --git a/docs/code.rst b/docs/code.rst index 9792c13521..39118f6a06 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -209,6 +209,8 @@ Operators .. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator .. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator .. autoclass:: airflow.contrib.operators.gcp_translate_operator.CloudTranslateTextOperator +.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionAddProductToProductSetOperator +.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionAnnotateImageOperator .. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator .. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator .. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator @@ -217,6 +219,8 @@ Operators .. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator .. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator .. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator +.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator +.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionRemoveProductFromProductSetOperator .. autoclass:: airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageBucketCreateAclEntryOperator .. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator .. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator diff --git a/docs/howto/operator/gcp/vision.rst b/docs/howto/operator/gcp/vision.rst index ee280cfa8c..165eca13e7 100644 --- a/docs/howto/operator/gcp/vision.rst +++ b/docs/howto/operator/gcp/vision.rst @@ -22,15 +22,15 @@ Google Cloud Vision Operators :depth: 1 :local: -.. _howto/operator:CloudVisionProductSetCreateOperator: +.. _howto/operator:CloudVisionAddProductToProductSetOperator: -CloudVisionProductSetCreateOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +CloudVisionAddProductToProductSetOperator +----------------------------------------- -Creates a new :code:`ProductSet` resource. +Creates a new :code:`ReferenceImage` resource. For parameter definition, take a look at -:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator` +:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionAddProductToProductSetOperator` Arguments """"""""" @@ -38,110 +38,59 @@ Arguments Some arguments in the example DAG are taken from the OS environment variables: .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_args_common] - :end-before: [END howto_operator_vision_args_common] + :language: python + :start-after: [START howto_operator_vision_args_common] + :end-before: [END howto_operator_vision_args_common] .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_productset_explicit_id] - :end-before: [END howto_operator_vision_productset_explicit_id] + :language: python + :start-after: [START howto_operator_vision_product_explicit_id] + :end-before: [END howto_operator_vision_product_explicit_id] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set_explicit_id] + :end-before: [END howto_operator_vision_product_set_explicit_id] Using the operator """""""""""""""""" -We are using the ``ProductSet`` and ``Retry`` objects from Google libraries: +We are using the :class:`~google.cloud.vision_v1.types.Product`, +:class:`~google.cloud.vision_v1.types.ProductSet` and ``Retry`` objects from +Google libraries: .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_productset_import] - :end-before: [END howto_operator_vision_productset_import] + :language: python + :start-after: [START howto_operator_vision_retry_import] + :end-before: [END howto_operator_vision_retry_import] .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_retry_import] - :end-before: [END howto_operator_vision_retry_import] + :language: python + :start-after: [START howto_operator_vision_product_set_import] + :end-before: [END howto_operator_vision_product_set_import] .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_productset] - :end-before: [END howto_operator_vision_productset] + :language: python + :start-after: [START howto_operator_vision_product_import] + :end-before: [END howto_operator_vision_product_import] -The ``product_set_id`` argument can be omitted (it will be generated by the API): + +If ``product_set_id`` and ``product_id`` was generated by the API it can be extracted from XCOM: .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_vision_product_set_create] - :end-before: [END howto_operator_vision_product_set_create] - -Or it can be specified explicitly: - -.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_vision_product_set_create_2] - :end-before: [END howto_operator_vision_product_set_create_2] - - -Templating -"""""""""" - -.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py - :language: python - :dedent: 4 - :start-after: [START vision_productset_create_template_fields] - :end-before: [END vision_productset_create_template_fields] - -More information -"""""""""""""""" - -See `Google Cloud Vision ProductSet create documentation -`_. - -.. _howto/operator:CloudVisionProductSetGetOperator: - -CloudVisionProductSetGetOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Gets information associated with a :code:`ProductSet`. - -For parameter definition, take a look at -:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator` - -Arguments -""""""""" - -Some arguments in the example DAG are taken from the OS environment variables: - -.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_args_common] - :end-before: [END howto_operator_vision_args_common] - -.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_productset_explicit_id] - :end-before: [END howto_operator_vision_productset_explicit_id] - -Using the operator -"""""""""""""""""" - -If ``product_set_id`` was generated by the API it can be extracted from XCOM: - -.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_vision_product_set_get] - :end-before: [END howto_operator_vision_product_set_get] + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_add_product_to_product_set] + :end-before: [END howto_operator_vision_add_product_to_product_set] Otherwise it can be specified explicitly: .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_vision_product_set_get_2] - :end-before: [END howto_operator_vision_product_set_get_2] + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_add_product_to_product_set_2] + :end-before: [END howto_operator_vision_add_product_to_product_set_2] + Templating """""""""" @@ -149,36 +98,25 @@ Templating .. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py :language: python :dedent: 4 - :start-after: [START vision_productset_get_template_fields] - :end-before: [END vision_productset_get_template_fields] + :start-after: [START vision_add_product_to_product_set_template_fields] + :end-before: [END vision_add_product_to_product_set_template_fields] More information """""""""""""""" -See `Google Cloud Vision ProductSet get documentation -`_. +See `Google Cloud Vision Add Product To Product Set documentation +`_. -.. _howto/operator:CloudVisionProductSetUpdateOperator: -CloudVisionProductSetUpdateOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +.. _howto/operator:CloudVisionAnnotateImageOperator: -Makes changes to a :code:`ProductSet` resource. Only :code:`display_name` can be updated -currently. +CloudVisionAnnotateImageOperator +-------------------------------- -.. note:: To locate the `ProductSet` resource, its `name` in the form - ``projects/PROJECT_ID/locations/LOC_ID/productSets/PRODUCT_SET_ID`` is necessary. - -You can provide the `name` directly as an attribute of the `product_set` object. -However, you can leave it blank and provide `location` and `product_set_id` instead (and -optionally `project_id` - if not present, the connection default will be used) and the -`name` will be created by the operator itself. - -This mechanism exists for your convenience, to allow leaving the `project_id` empty and -having Airflow use the connection default `project_id`. +Run image detection and annotation for an image. For parameter definition, take a look at -:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator` +:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionAnnotateImageOperator` Arguments """"""""" @@ -186,47 +124,47 @@ Arguments Some arguments in the example DAG are taken from the OS environment variables: .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_args_common] - :end-before: [END howto_operator_vision_args_common] + :language: python + :start-after: [START howto_operator_vision_args_common] + :end-before: [END howto_operator_vision_args_common] .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_productset_explicit_id] - :end-before: [END howto_operator_vision_productset_explicit_id] + :language: python + :start-after: [START howto_operator_vision_annotate_image_url] + :end-before: [END howto_operator_vision_annotate_image_url] + Using the operator """""""""""""""""" -We are using the ``ProductSet`` object from the Google Cloud Vision library: +We are using the :class:`~google.cloud.enums` and ``Retry`` objects from +Google libraries: .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_productset_import] - :end-before: [END howto_operator_vision_productset_import] + :language: python + :start-after: [START howto_operator_vision_retry_import] + :end-before: [END howto_operator_vision_retry_import] .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_productset] - :end-before: [END howto_operator_vision_productset] + :language: python + :start-after: [START howto_operator_vision_enums_import] + :end-before: [END howto_operator_vision_enums_import] -Initialization of the task: - -If ``product_set_id`` was generated by the API it can be extracted from XCOM: .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_vision_product_set_update] - :end-before: [END howto_operator_vision_product_set_update] + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_annotate_image] + :end-before: [END howto_operator_vision_annotate_image] -Otherwise it can be specified explicitly: +The result can be extracted from XCOM: .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_vision_product_set_update_2] - :end-before: [END howto_operator_vision_product_set_update_2] + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_annotate_image_result] + :end-before: [END howto_operator_vision_annotate_image_result] + Templating """""""""" @@ -234,80 +172,19 @@ Templating .. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py :language: python :dedent: 4 - :start-after: [START vision_productset_update_template_fields] - :end-before: [END vision_productset_update_template_fields] + :start-after: [START vision_annotate_image_template_fields] + :end-before: [END vision_annotate_image_template_fields] More information """""""""""""""" -See `Google Cloud Vision ProductSet update documentation -`_. - -.. _howto/operator:CloudVisionProductSetDeleteOperator: - -CloudVisionProductSetDeleteOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Permanently deletes a :code:`ProductSet`. :code:`Products` and :code:`ReferenceImages` in -the :code:`ProductSet` are not deleted. The actual image files are not deleted from -Google Cloud Storage. - -For parameter definition, take a look at -:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator` - -Arguments -""""""""" - -Some arguments in the example DAG are taken from the OS environment variables: - -.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_args_common] - :end-before: [END howto_operator_vision_args_common] - -.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_productset_explicit_id] - :end-before: [END howto_operator_vision_productset_explicit_id] - -Using the operator -"""""""""""""""""" - -If ``product_set_id`` was generated by the API it can be extracted from XCOM: - -.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_vision_product_set_delete] - :end-before: [END howto_operator_vision_product_set_delete] - -Otherwise it can be specified explicitly: - -.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_vision_product_set_delete_2] - :end-before: [END howto_operator_vision_product_set_delete_2] - -Templating -"""""""""" - -.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py - :language: python - :dedent: 4 - :start-after: [START vision_productset_delete_template_fields] - :end-before: [END vision_productset_delete_template_fields] - -More information -"""""""""""""""" - -See `Google Cloud Vision ProductSet delete documentation -`_. +See `Google Cloud Vision Annotate Image documentation +`_. .. _howto/operator:CloudVisionProductCreateOperator: CloudVisionProductCreateOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +-------------------------------- Creates and returns a new product resource. @@ -387,10 +264,77 @@ More information See `Google Cloud Vision Product create documentation `_. +.. _howto/operator:CloudVisionProductDeleteOperator: + +CloudVisionProductDeleteOperator +-------------------------------- + +Permanently deletes a product and its reference images. + +Metadata of the product and all its images will be deleted right away, but search queries +against :code:`ProductSets` containing the product may still work until all related +caches are refreshed. + +Possible errors: + +- Returns NOT_FOUND if the product does not exist. + +For parameter definition, take a look at +:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_args_common] + :end-before: [END howto_operator_vision_args_common] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_explicit_id] + :end-before: [END howto_operator_vision_product_explicit_id] + +Using the operator +"""""""""""""""""" + +If ``product_id`` was generated by the API it can be extracted from XCOM: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_product_delete] + :end-before: [END howto_operator_vision_product_delete] + +Otherwise it can be specified explicitly: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_product_delete_2] + :end-before: [END howto_operator_vision_product_delete_2] + +Templating +"""""""""" + +.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py + :language: python + :dedent: 4 + :start-after: [START vision_product_delete_template_fields] + :end-before: [END vision_product_delete_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud Vision Product delete documentation +`_. + .. _howto/operator:CloudVisionProductGetOperator: CloudVisionProductGetOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +----------------------------- Gets information associated with a :code:`Product`. @@ -450,10 +394,292 @@ More information See `Google Cloud Vision Product get documentation `_. +.. _howto/operator:CloudVisionProductSetCreateOperator: + +CloudVisionProductSetCreateOperator +----------------------------------- + +Creates a new :code:`ProductSet` resource. + +For parameter definition, take a look at +:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_args_common] + :end-before: [END howto_operator_vision_args_common] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set_explicit_id + :end-before: [END howto_operator_vision_product_set_explicit_id + +Using the operator +"""""""""""""""""" + +We are using the ``ProductSet`` and ``Retry`` objects from Google libraries: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set_import] + :end-before: [END howto_operator_vision_product_set_import] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_retry_import] + :end-before: [END howto_operator_vision_retry_import] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set] + :end-before: [END howto_operator_vision_product_set] + +The ``product_set_id`` argument can be omitted (it will be generated by the API): + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_product_set_create] + :end-before: [END howto_operator_vision_product_set_create] + +Or it can be specified explicitly: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_product_set_create_2] + :end-before: [END howto_operator_vision_product_set_create_2] + + +Templating +"""""""""" + +.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py + :language: python + :dedent: 4 + :start-after: [START vision_productset_create_template_fields] + :end-before: [END vision_productset_create_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud Vision ProductSet create documentation +`_. + +.. _howto/operator:CloudVisionProductSetDeleteOperator: + +CloudVisionProductSetDeleteOperator +----------------------------------- + +Permanently deletes a :code:`ProductSet`. :code:`Products` and :code:`ReferenceImages` in +the :code:`ProductSet` are not deleted. The actual image files are not deleted from +Google Cloud Storage. + +For parameter definition, take a look at +:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_args_common] + :end-before: [END howto_operator_vision_args_common] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set_explicit_id] + :end-before: [END howto_operator_vision_product_set_explicit_id] + +Using the operator +"""""""""""""""""" + +If ``product_set_id`` was generated by the API it can be extracted from XCOM: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_product_set_delete] + :end-before: [END howto_operator_vision_product_set_delete] + +Otherwise it can be specified explicitly: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_product_set_delete_2] + :end-before: [END howto_operator_vision_product_set_delete_2] + +Templating +"""""""""" + +.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py + :language: python + :dedent: 4 + :start-after: [START vision_productset_delete_template_fields] + :end-before: [END vision_productset_delete_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud Vision ProductSet delete documentation +`_. + +.. _howto/operator:CloudVisionProductSetGetOperator: + +CloudVisionProductSetGetOperator +-------------------------------- + +Gets information associated with a :code:`ProductSet`. + +For parameter definition, take a look at +:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_args_common] + :end-before: [END howto_operator_vision_args_common] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set_explicit_id] + :end-before: [END howto_operator_vision_product_set_explicit_id] + +Using the operator +"""""""""""""""""" + +If ``product_set_id`` was generated by the API it can be extracted from XCOM: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_product_set_get] + :end-before: [END howto_operator_vision_product_set_get] + +Otherwise it can be specified explicitly: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_product_set_get_2] + :end-before: [END howto_operator_vision_product_set_get_2] + +Templating +"""""""""" + +.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py + :language: python + :dedent: 4 + :start-after: [START vision_productset_get_template_fields] + :end-before: [END vision_productset_get_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud Vision ProductSet get documentation +`_. + +.. _howto/operator:CloudVisionProductSetUpdateOperator: + +CloudVisionProductSetUpdateOperator +----------------------------------- + +Makes changes to a :code:`ProductSet` resource. Only :code:`display_name` can be updated +currently. + +.. note:: To locate the `ProductSet` resource, its `name` in the form + ``projects/PROJECT_ID/locations/LOC_ID/productSets/PRODUCT_SET_ID`` is necessary. + +You can provide the `name` directly as an attribute of the `product_set` object. +However, you can leave it blank and provide `location` and `product_set_id` instead (and +optionally `project_id` - if not present, the connection default will be used) and the +`name` will be created by the operator itself. + +This mechanism exists for your convenience, to allow leaving the `project_id` empty and +having Airflow use the connection default `project_id`. + +For parameter definition, take a look at +:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_args_common] + :end-before: [END howto_operator_vision_args_common] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set_explicit_id] + :end-before: [END howto_operator_vision_product_set_explicit_id] + +Using the operator +"""""""""""""""""" + +We are using the ``ProductSet`` object from the Google Cloud Vision library: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set_import] + :end-before: [END howto_operator_vision_product_set_import] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set] + :end-before: [END howto_operator_vision_product_set] + +Initialization of the task: + +If ``product_set_id`` was generated by the API it can be extracted from XCOM: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_product_set_update] + :end-before: [END howto_operator_vision_product_set_update] + +Otherwise it can be specified explicitly: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_product_set_update_2] + :end-before: [END howto_operator_vision_product_set_update_2] + +Templating +"""""""""" + +.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py + :language: python + :dedent: 4 + :start-after: [START vision_productset_update_template_fields] + :end-before: [END vision_productset_update_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud Vision ProductSet update documentation +`_. + .. _howto/operator:CloudVisionProductUpdateOperator: CloudVisionProductUpdateOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +-------------------------------- Makes changes to a :code:`Product` resource. Only the :code:`display_name`, :code:`description`, and :code:`labels` fields can be updated right now. @@ -544,23 +770,15 @@ More information See `Google Cloud Vision Product update documentation `_. -.. _howto/operator:CloudVisionProductDeleteOperator: +.. _howto/operator:CloudVisionReferenceImageCreateOperator: -CloudVisionProductDeleteOperator -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +CloudVisionReferenceImageCreateOperator +--------------------------------------- -Permanently deletes a product and its reference images. - -Metadata of the product and all its images will be deleted right away, but search queries -against :code:`ProductSets` containing the product may still work until all related -caches are refreshed. - -Possible errors: - -- Returns NOT_FOUND if the product does not exist. +Creates a new :code:`ReferenceImage` resource. For parameter definition, take a look at -:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator` +:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator` Arguments """"""""" @@ -568,33 +786,51 @@ Arguments Some arguments in the example DAG are taken from the OS environment variables: .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_args_common] - :end-before: [END howto_operator_vision_args_common] + :language: python + :start-after: [START howto_operator_vision_args_common] + :end-before: [END howto_operator_vision_args_common] .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :start-after: [START howto_operator_vision_product_explicit_id] - :end-before: [END howto_operator_vision_product_explicit_id] + :language: python + :start-after: [START howto_operator_vision_reference_image_args] + :end-before: [END howto_operator_vision_reference_image_args] Using the operator """""""""""""""""" -If ``product_id`` was generated by the API it can be extracted from XCOM: +We are using the :class:`~google.cloud.vision_v1.types.ReferenceImage` and ``Retry`` objects from Google libraries: .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_vision_product_delete] - :end-before: [END howto_operator_vision_product_delete] - -Otherwise it can be specified explicitly: + :language: python + :start-after: [START howto_operator_vision_reference_image_import] + :end-before: [END howto_operator_vision_reference_image_import] .. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py - :language: python - :dedent: 4 - :start-after: [START howto_operator_vision_product_delete_2] - :end-before: [END howto_operator_vision_product_delete_2] + :language: python + :start-after: [START howto_operator_vision_retry_import] + :end-before: [END howto_operator_vision_retry_import] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_reference_image] + :end-before: [END howto_operator_vision_reference_image] + +The ``product_set_id`` argument can be omitted (it will be generated by the API): + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_reference_image_create] + :end-before: [END howto_operator_vision_reference_image_create] + +Or it can be specified explicitly: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_reference_image_create_2] + :end-before: [END howto_operator_vision_reference_image_create_2] + Templating """""""""" @@ -602,11 +838,96 @@ Templating .. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py :language: python :dedent: 4 - :start-after: [START vision_product_delete_template_fields] - :end-before: [END vision_product_delete_template_fields] + :start-after: [START vision_reference_image_create_template_fields] + :end-before: [END vision_reference_image_create_template_fields] More information """""""""""""""" -See `Google Cloud Vision Product delete documentation -`_. +See `Google Cloud Vision ReferenceImage create documentation +`_. + +.. _howto/operator:CloudVisionRemoveProductFromProductSetOperator: + +CloudVisionRemoveProductFromProductSetOperator +---------------------------------------------- + +Creates a new :code:`ReferenceImage` resource. + +For parameter definition, take a look at +:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionRemoveProductFromProductSetOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_args_common] + :end-before: [END howto_operator_vision_args_common] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_explicit_id] + :end-before: [END howto_operator_vision_product_explicit_id] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set_explicit_id] + :end-before: [END howto_operator_vision_product_set_explicit_id] + +Using the operator +"""""""""""""""""" + +We are using the :class:`~google.cloud.vision_v1.types.Product`, +:class:`~google.cloud.vision_v1.types.ProductSet` and ``Retry`` objects from +Google libraries: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_retry_import] + :end-before: [END howto_operator_vision_retry_import] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_set_import] + :end-before: [END howto_operator_vision_product_set_import] + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :start-after: [START howto_operator_vision_product_import] + :end-before: [END howto_operator_vision_product_import] + + +If ``product_set_id`` and ``product_id`` was generated by the API it can be extracted from XCOM: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_remove_product_from_product_set] + :end-before: [END howto_operator_vision_remove_product_from_product_set] + +Otherwise it can be specified explicitly: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_vision_remove_product_from_product_set_2] + :end-before: [END howto_operator_vision_remove_product_from_product_set_2] + + +Templating +"""""""""" + +.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py + :language: python + :dedent: 4 + :start-after: [START vision_remove_product_from_product_set_template_fields] + :end-before: [END vision_remove_product_from_product_set_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud Vision Remove Product From Product Set documentation +`_. diff --git a/docs/integration.rst b/docs/integration.rst index ecf8be5fb3..8ebbb86fdf 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -599,22 +599,30 @@ Cloud Vision Cloud Vision Product Search Operators """"""""""""""""""""""""""""""""""""" +:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionAddProductToProductSetOperator` + Adds a Product to the specified ProductSet. +:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionAnnotateImageOperator` + Run image detection and annotation for an image. +:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator` + Creates a new Product resource. +:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator` + Permanently deletes a product and its reference images. +:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator` + Gets information associated with a Product. :class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator` Creates a new ProductSet resource. +:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator` + Permanently deletes a ProductSet. :class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator` Gets information associated with a ProductSet. :class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator` Makes changes to a ProductSet resource. -:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator` - Permanently deletes a ProductSet. -:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator` - Creates a new Product resource. -:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator` - Gets information associated with a Product. :class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator` Makes changes to a Product resource. -:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator` - Permanently deletes a product and its reference images. +:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator` + Creates a new ReferenceImage resource. +:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionRemoveProductFromProductSetOperator` + Removes a Product from the specified ProductSet. They also use :class:`airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook` to communicate with Google Cloud Platform. diff --git a/tests/contrib/hooks/test_gcp_api_base_hook.py b/tests/contrib/hooks/test_gcp_api_base_hook.py index 116f684da9..b22e8c0881 100644 --- a/tests/contrib/hooks/test_gcp_api_base_hook.py +++ b/tests/contrib/hooks/test_gcp_api_base_hook.py @@ -21,10 +21,16 @@ import os import unittest +from parameterized import parameterized +from google.api_core.exceptions import RetryError, AlreadyExists +from google.cloud.exceptions import MovedPermanently + +from airflow import AirflowException, LoggingMixin from airflow.contrib.hooks import gcp_api_base_hook as hook import google.auth from google.auth.exceptions import GoogleAuthError + try: from StringIO import StringIO except ImportError: @@ -46,6 +52,55 @@ except GoogleAuthError: default_creds_available = False +class TestCatchHttpException(unittest.TestCase): + def test_no_exception(self): + self.called = False + + class FixtureClass(LoggingMixin): + @hook.GoogleCloudBaseHook.catch_http_exception + def text_fixture(*args, **kwargs): + self.called = True + + FixtureClass().text_fixture() + + self.assertTrue(self.called) + + @parameterized.expand( + [ + (MovedPermanently("MESSAGE"),), + (RetryError("MESSAGE", cause=Exception("MESSAGE")),), + (ValueError("MESSAGE"),), + ] + ) + def test_raise_airflowexception(self, ex_obj): + self.called = False + + class FixtureClass(LoggingMixin): + @hook.GoogleCloudBaseHook.catch_http_exception + def test_fixutre(*args, **kwargs): + self.called = True + raise ex_obj + + with self.assertRaises(AirflowException): + FixtureClass().test_fixutre() + + self.assertTrue(self.called) + + def test_raise_alreadyexists(self): + self.called = False + + class FixtureClass(LoggingMixin): + @hook.GoogleCloudBaseHook.catch_http_exception + def test_fixutre(*args, **kwargs): + self.called = True + raise AlreadyExists("MESSAGE") + + with self.assertRaises(AlreadyExists): + FixtureClass().test_fixutre() + + self.assertTrue(self.called) + + class TestGoogleCloudBaseHook(unittest.TestCase): def setUp(self): self.instance = hook.GoogleCloudBaseHook() @@ -81,7 +136,7 @@ class TestGoogleCloudBaseHook(unittest.TestCase): 'Default GCP credentials not available to run tests') def test_default_creds_no_scopes(self): self.instance.extras = { - 'extra__google_cloud_platform__project': default_project, + 'extra__google_cloud_platform__project': default_project } credentials = self.instance._get_credentials() @@ -96,19 +151,17 @@ class TestGoogleCloudBaseHook(unittest.TestCase): def test_provide_gcp_credential_file_decorator_key_path(self): key_path = '/test/key-path' - self.instance.extras = { - 'extra__google_cloud_platform__key_path': key_path - } + self.instance.extras = {'extra__google_cloud_platform__key_path': key_path} @hook.GoogleCloudBaseHook._Decorators.provide_gcp_credential_file def assert_gcp_credential_file_in_env(hook_instance): self.assertEqual(os.environ[hook._G_APP_CRED_ENV_VAR], key_path) + assert_gcp_credential_file_in_env(self.instance) @mock.patch('tempfile.NamedTemporaryFile') - def test_provide_gcp_credential_file_decorator_key_content(self, - mock_file): + def test_provide_gcp_credential_file_decorator_key_content(self, mock_file): string_file = StringIO() file_content = '{"foo": "bar"}' file_name = '/test/mock-file' @@ -124,4 +177,5 @@ class TestGoogleCloudBaseHook(unittest.TestCase): self.assertEqual(os.environ[hook._G_APP_CRED_ENV_VAR], file_name) self.assertEqual(file_content, string_file.getvalue()) + assert_gcp_credential_file_in_env(self.instance) diff --git a/tests/contrib/hooks/test_gcp_vision_hook.py b/tests/contrib/hooks/test_gcp_vision_hook.py index ddc7114d8a..ef8f2f0ff8 100644 --- a/tests/contrib/hooks/test_gcp_vision_hook.py +++ b/tests/contrib/hooks/test_gcp_vision_hook.py @@ -19,8 +19,9 @@ import unittest +from google.cloud.vision import enums from google.cloud.vision_v1 import ProductSearchClient -from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet, Product +from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet, Product, ReferenceImage from google.protobuf.json_format import MessageToDict from parameterized import parameterized @@ -43,8 +44,26 @@ LOC_ID_TEST = 'loc-id' LOC_ID_TEST_2 = 'loc-id-2' PRODUCTSET_ID_TEST = 'ps-id' PRODUCTSET_ID_TEST_2 = 'ps-id-2' +PRODUCTSET_NAME_TEST = 'projects/{}/locations/{}/productSets/{}'.format( + PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST +) PRODUCT_ID_TEST = 'p-id' PRODUCT_ID_TEST_2 = 'p-id-2' +PRODUCT_NAME_TEST = "projects/{}/locations/{}/products/{}".format( + PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST +) +PRODUCT_NAME = "projects/{}/locations/{}/products/{}".format(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST) +REFERENCE_IMAGE_ID_TEST = 'ri-id' +REFERENCE_IMAGE_GEN_ID_TEST = 'ri-id' +ANNOTATE_IMAGE_REQUEST = { + 'image': {'source': {'image_uri': "gs://bucket-name/object-name"}}, + 'features': [{'type': enums.Feature.Type.LOGO_DETECTION}], +} +REFERENCE_IMAGE_NAME_TEST = "projects/{}/locations/{}/products/{}/referenceImages/{}".format( + PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST, REFERENCE_IMAGE_ID_TEST +) +REFERENCE_IMAGE_TEST = ReferenceImage(name=REFERENCE_IMAGE_GEN_ID_TEST) +REFERENCE_IMAGE_WITHOUT_ID_NAME = ReferenceImage() class TestGcpVisionHook(unittest.TestCase): @@ -53,7 +72,7 @@ class TestGcpVisionHook(unittest.TestCase): 'airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.__init__', new=mock_base_gcp_hook_default_project_id, ): - self.vision_hook_default_project_id = CloudVisionHook(gcp_conn_id='test') + self.hook = CloudVisionHook(gcp_conn_id='test') @mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn') def test_create_productset_explicit_id(self, get_conn): @@ -61,10 +80,9 @@ class TestGcpVisionHook(unittest.TestCase): create_product_set_method = get_conn.return_value.create_product_set create_product_set_method.return_value = None parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST) - hook = self.vision_hook_default_project_id product_set = ProductSet() # When - result = hook.create_product_set( + result = self.hook.create_product_set( location=LOC_ID_TEST, product_set_id=PRODUCTSET_ID_TEST, product_set=product_set, @@ -73,6 +91,7 @@ class TestGcpVisionHook(unittest.TestCase): timeout=None, metadata=None, ) + # Then # ProductSet ID was provided explicitly in the method call above, should be returned from the method self.assertEqual(result, PRODUCTSET_ID_TEST) @@ -95,10 +114,9 @@ class TestGcpVisionHook(unittest.TestCase): create_product_set_method = get_conn.return_value.create_product_set create_product_set_method.return_value = response_product_set parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST) - hook = self.vision_hook_default_project_id product_set = ProductSet() # When - result = hook.create_product_set( + result = self.hook.create_product_set( location=LOC_ID_TEST, product_set_id=None, product_set=product_set, project_id=PROJECT_ID_TEST ) # Then @@ -121,11 +139,10 @@ class TestGcpVisionHook(unittest.TestCase): create_product_set_method = get_conn.return_value.create_product_set create_product_set_method.return_value = response_product_set parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST) - hook = self.vision_hook_default_project_id product_set = ProductSet() # When with self.assertRaises(AirflowException) as cm: - hook.create_product_set( + self.hook.create_product_set( location=LOC_ID_TEST, product_set_id=None, product_set=product_set, @@ -154,9 +171,8 @@ class TestGcpVisionHook(unittest.TestCase): response_product_set = ProductSet(name=name) get_product_set_method = get_conn.return_value.get_product_set get_product_set_method.return_value = response_product_set - hook = self.vision_hook_default_project_id # When - response = hook.get_product_set( + response = self.hook.get_product_set( location=LOC_ID_TEST, product_set_id=PRODUCTSET_ID_TEST, project_id=PROJECT_ID_TEST ) # Then @@ -170,12 +186,11 @@ class TestGcpVisionHook(unittest.TestCase): product_set = ProductSet() update_product_set_method = get_conn.return_value.update_product_set update_product_set_method.return_value = product_set - hook = self.vision_hook_default_project_id productset_name = ProductSearchClient.product_set_path( PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST ) # When - result = hook.update_product_set( + result = self.hook.update_product_set( location=LOC_ID_TEST, product_set_id=PRODUCTSET_ID_TEST, product_set=product_set, @@ -203,11 +218,10 @@ class TestGcpVisionHook(unittest.TestCase): # Given update_product_set_method = get_conn.return_value.update_product_set update_product_set_method.return_value = None - hook = self.vision_hook_default_project_id product_set = ProductSet() # When with self.assertRaises(AirflowException) as cm: - hook.update_product_set( + self.hook.update_product_set( location=location, product_set_id=product_set_id, product_set=product_set, @@ -238,9 +252,8 @@ class TestGcpVisionHook(unittest.TestCase): product_set = ProductSet(name=explicit_ps_name) update_product_set_method = get_conn.return_value.update_product_set update_product_set_method.return_value = product_set - hook = self.vision_hook_default_project_id # When - result = hook.update_product_set( + result = self.hook.update_product_set( location=location, product_set_id=product_set_id, product_set=product_set, @@ -265,7 +278,6 @@ class TestGcpVisionHook(unittest.TestCase): # Given update_product_set_method = get_conn.return_value.update_product_set update_product_set_method.return_value = None - hook = self.vision_hook_default_project_id explicit_ps_name = ProductSearchClient.product_set_path( PROJECT_ID_TEST_2, LOC_ID_TEST_2, PRODUCTSET_ID_TEST_2 ) @@ -278,7 +290,7 @@ class TestGcpVisionHook(unittest.TestCase): # but both names differ (constructed != explicit). # Should throw AirflowException in this case. with self.assertRaises(AirflowException) as cm: - hook.update_product_set( + self.hook.update_product_set( location=LOC_ID_TEST, product_set_id=PRODUCTSET_ID_TEST, product_set=product_set, @@ -306,25 +318,129 @@ class TestGcpVisionHook(unittest.TestCase): delete_product_set_method = get_conn.return_value.delete_product_set delete_product_set_method.return_value = None name = ProductSearchClient.product_set_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST) - hook = self.vision_hook_default_project_id # When - response = hook.delete_product_set( + response = self.hook.delete_product_set( location=LOC_ID_TEST, product_set_id=PRODUCTSET_ID_TEST, project_id=PROJECT_ID_TEST ) # Then self.assertIsNone(response) delete_product_set_method.assert_called_once_with(name=name, retry=None, timeout=None, metadata=None) + @mock.patch( + 'airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn', + **{'return_value.create_reference_image.return_value': REFERENCE_IMAGE_TEST} + ) + def test_create_reference_image_explicit_id(self, get_conn): + # Given + create_reference_image_method = get_conn.return_value.create_reference_image + + # When + result = self.hook.create_reference_image( + project_id=PROJECT_ID_TEST, + location=LOC_ID_TEST, + product_id=PRODUCT_ID_TEST, + reference_image=REFERENCE_IMAGE_WITHOUT_ID_NAME, + reference_image_id=REFERENCE_IMAGE_ID_TEST, + ) + # Then + # Product ID was provided explicitly in the method call above, should be returned from the method + self.assertEqual(result, REFERENCE_IMAGE_ID_TEST) + create_reference_image_method.assert_called_once_with( + parent=PRODUCT_NAME, + reference_image=REFERENCE_IMAGE_WITHOUT_ID_NAME, + reference_image_id=REFERENCE_IMAGE_ID_TEST, + retry=None, + timeout=None, + metadata=None, + ) + + @mock.patch( + 'airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn', + **{'return_value.create_reference_image.return_value': REFERENCE_IMAGE_TEST} + ) + def test_create_reference_image_autogenerated_id(self, get_conn): + # Given + create_reference_image_method = get_conn.return_value.create_reference_image + + # When + result = self.hook.create_reference_image( + project_id=PROJECT_ID_TEST, + location=LOC_ID_TEST, + product_id=PRODUCT_ID_TEST, + reference_image=REFERENCE_IMAGE_TEST, + reference_image_id=REFERENCE_IMAGE_ID_TEST, + ) + # Then + # Product ID was provided explicitly in the method call above, should be returned from the method + self.assertEqual(result, REFERENCE_IMAGE_GEN_ID_TEST) + create_reference_image_method.assert_called_once_with( + parent=PRODUCT_NAME, + reference_image=REFERENCE_IMAGE_TEST, + reference_image_id=REFERENCE_IMAGE_ID_TEST, + retry=None, + timeout=None, + metadata=None, + ) + + @mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn') + def test_add_product_to_product_set(self, get_conn): + # Given + add_product_to_product_set_method = get_conn.return_value.add_product_to_product_set + + # When + self.hook.add_product_to_product_set( + product_set_id=PRODUCTSET_ID_TEST, + product_id=PRODUCT_ID_TEST, + location=LOC_ID_TEST, + project_id=PROJECT_ID_TEST, + ) + # Then + # Product ID was provided explicitly in the method call above, should be returned from the method + add_product_to_product_set_method.assert_called_once_with( + name=PRODUCTSET_NAME_TEST, product=PRODUCT_NAME_TEST, retry=None, timeout=None, metadata=None + ) + + # remove_product_from_product_set + @mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn') + def test_remove_product_from_product_set(self, get_conn): + # Given + remove_product_from_product_set_method = get_conn.return_value.remove_product_from_product_set + + # When + self.hook.remove_product_from_product_set( + product_set_id=PRODUCTSET_ID_TEST, + product_id=PRODUCT_ID_TEST, + location=LOC_ID_TEST, + project_id=PROJECT_ID_TEST, + ) + # Then + # Product ID was provided explicitly in the method call above, should be returned from the method + remove_product_from_product_set_method.assert_called_once_with( + name=PRODUCTSET_NAME_TEST, product=PRODUCT_NAME_TEST, retry=None, timeout=None, metadata=None + ) + + @mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.annotator_client') + def test_annotate_image(self, annotator_client_mock): + # Given + annotate_image_method = annotator_client_mock.annotate_image + + # When + self.hook.annotate_image(request=ANNOTATE_IMAGE_REQUEST) + # Then + # Product ID was provided explicitly in the method call above, should be returned from the method + annotate_image_method.assert_called_once_with( + request=ANNOTATE_IMAGE_REQUEST, retry=None, timeout=None + ) + @mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn') def test_create_product_explicit_id(self, get_conn): # Given create_product_method = get_conn.return_value.create_product create_product_method.return_value = None parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST) - hook = self.vision_hook_default_project_id product = Product() # When - result = hook.create_product( + result = self.hook.create_product( location=LOC_ID_TEST, product_id=PRODUCT_ID_TEST, product=product, project_id=PROJECT_ID_TEST ) # Then @@ -349,10 +465,9 @@ class TestGcpVisionHook(unittest.TestCase): create_product_method = get_conn.return_value.create_product create_product_method.return_value = response_product parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST) - hook = self.vision_hook_default_project_id product = Product() # When - result = hook.create_product( + result = self.hook.create_product( location=LOC_ID_TEST, product_id=None, product=product, project_id=PROJECT_ID_TEST ) # Then @@ -371,11 +486,10 @@ class TestGcpVisionHook(unittest.TestCase): create_product_method = get_conn.return_value.create_product create_product_method.return_value = response_product parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST) - hook = self.vision_hook_default_project_id product = Product() # When with self.assertRaises(AirflowException) as cm: - hook.create_product( + self.hook.create_product( location=LOC_ID_TEST, product_id=None, product=product, project_id=PROJECT_ID_TEST ) # Then @@ -393,11 +507,10 @@ class TestGcpVisionHook(unittest.TestCase): create_product_method = get_conn.return_value.create_product create_product_method.return_value = response_product parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST) - hook = self.vision_hook_default_project_id product = Product() # When with self.assertRaises(AirflowException) as cm: - hook.create_product( + self.hook.create_product( location=LOC_ID_TEST, product_id=None, product=product, project_id=PROJECT_ID_TEST ) # Then @@ -414,10 +527,9 @@ class TestGcpVisionHook(unittest.TestCase): product = Product() update_product_method = get_conn.return_value.update_product update_product_method.return_value = product - hook = self.vision_hook_default_project_id product_name = ProductSearchClient.product_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST) # When - result = hook.update_product( + result = self.hook.update_product( location=LOC_ID_TEST, product_id=PRODUCT_ID_TEST, product=product, @@ -441,11 +553,10 @@ class TestGcpVisionHook(unittest.TestCase): # Given update_product_method = get_conn.return_value.update_product update_product_method.return_value = None - hook = self.vision_hook_default_project_id product = Product() # When with self.assertRaises(AirflowException) as cm: - hook.update_product( + self.hook.update_product( location=location, product_id=product_id, product=product, @@ -476,9 +587,8 @@ class TestGcpVisionHook(unittest.TestCase): product = Product(name=explicit_p_name) update_product_method = get_conn.return_value.update_product update_product_method.return_value = product - hook = self.vision_hook_default_project_id # When - result = hook.update_product( + result = self.hook.update_product( location=location, product_id=product_id, product=product, @@ -499,7 +609,6 @@ class TestGcpVisionHook(unittest.TestCase): # Given update_product_method = get_conn.return_value.update_product update_product_method.return_value = None - hook = self.vision_hook_default_project_id explicit_p_name = ProductSearchClient.product_path( PROJECT_ID_TEST_2, LOC_ID_TEST_2, PRODUCT_ID_TEST_2 ) @@ -510,7 +619,7 @@ class TestGcpVisionHook(unittest.TestCase): # but both names differ (constructed != explicit). # Should throw AirflowException in this case. with self.assertRaises(AirflowException) as cm: - hook.update_product( + self.hook.update_product( location=LOC_ID_TEST, product_id=PRODUCT_ID_TEST, product=product, @@ -538,9 +647,8 @@ class TestGcpVisionHook(unittest.TestCase): delete_product_method = get_conn.return_value.delete_product delete_product_method.return_value = None name = ProductSearchClient.product_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST) - hook = self.vision_hook_default_project_id # When - response = hook.delete_product( + response = self.hook.delete_product( location=LOC_ID_TEST, product_id=PRODUCT_ID_TEST, project_id=PROJECT_ID_TEST ) # Then diff --git a/tests/contrib/operators/test_gcp_vision_operator.py b/tests/contrib/operators/test_gcp_vision_operator.py index a806f85021..3448c1a95d 100644 --- a/tests/contrib/operators/test_gcp_vision_operator.py +++ b/tests/contrib/operators/test_gcp_vision_operator.py @@ -20,7 +20,7 @@ import unittest from google.api_core.exceptions import AlreadyExists -from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet, Product +from google.cloud.vision_v1.types import ProductSet, Product, ReferenceImage from airflow.contrib.operators.gcp_vision_operator import ( CloudVisionProductSetCreateOperator, @@ -31,6 +31,10 @@ from airflow.contrib.operators.gcp_vision_operator import ( CloudVisionProductGetOperator, CloudVisionProductUpdateOperator, CloudVisionProductDeleteOperator, + CloudVisionReferenceImageCreateOperator, + CloudVisionAddProductToProductSetOperator, + CloudVisionRemoveProductFromProductSetOperator, + CloudVisionAnnotateImageOperator, ) try: @@ -46,6 +50,9 @@ PRODUCTSET_TEST = ProductSet(display_name='Test Product Set') PRODUCTSET_ID_TEST = 'my-productset' PRODUCT_TEST = Product(display_name='My Product 1', product_category='toys') PRODUCT_ID_TEST = 'my-product' +REFERENCE_IMAGE_TEST = ReferenceImage(uri='gs://bucket_name/file.txt') +REFERENCE_IMAGE_ID_TEST = 'my-reference-image' +ANNOTATE_REQUEST_TEST = {'image': {'source': {'image_uri': 'https://foo.com/image.jpg'}}} LOCATION_TEST = 'europe-west1' GCP_CONN_ID = 'google_cloud_default' @@ -70,10 +77,10 @@ class CloudVisionProductSetCreateTest(unittest.TestCase): ) @mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook.get_conn') - @mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook._handle_request') - def test_already_exists(self, _handle_request, get_conn): + @mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook.create_product_set') + def test_already_exists(self, create_product_set_mock, get_conn): get_conn.return_value = {} - _handle_request.side_effect = AlreadyExists(message='') + create_product_set_mock.side_effect = AlreadyExists(message='') # Exception AlreadyExists not raised, caught in the operator's execute() - idempotence op = CloudVisionProductSetCreateOperator( location=LOCATION_TEST, @@ -163,10 +170,10 @@ class CloudVisionProductCreateTest(unittest.TestCase): ) @mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook.get_conn') - @mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook._handle_request') - def test_already_exists(self, _handle_request, get_conn): + @mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook.create_product') + def test_already_exists(self, create_product_mock, get_conn): get_conn.return_value = {} - _handle_request.side_effect = AlreadyExists(message='') + create_product_mock.side_effect = AlreadyExists(message='') # Exception AlreadyExists not raised, caught in the operator's execute() - idempotence op = CloudVisionProductCreateOperator( location=LOCATION_TEST, @@ -232,3 +239,109 @@ class CloudVisionProductDeleteTest(unittest.TestCase): timeout=None, metadata=None, ) + + +class CloudVisionReferenceImageCreateTest(unittest.TestCase): + @mock.patch( + 'airflow.contrib.operators.gcp_vision_operator.CloudVisionHook', + **{'return_value.create_reference_image.return_value': {}} + ) + def test_minimal_green_path(self, mock_hook): + op = CloudVisionReferenceImageCreateOperator( + location=LOCATION_TEST, + product_id=PRODUCT_ID_TEST, + reference_image=REFERENCE_IMAGE_TEST, + task_id='id', + ) + op.execute(context=None) + mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID) + mock_hook.return_value.create_reference_image.assert_called_once_with( + location=LOCATION_TEST, + product_id=PRODUCT_ID_TEST, + reference_image=REFERENCE_IMAGE_TEST, + reference_image_id=None, + project_id=None, + retry=None, + timeout=None, + metadata=None, + ) + + @mock.patch( + 'airflow.contrib.operators.gcp_vision_operator.CloudVisionHook', + **{'return_value.create_reference_image.side_effect': AlreadyExists("MESSAGe")} + ) + def test_already_exists(self, mock_hook): + # Exception AlreadyExists not raised, caught in the operator's execute() - idempotence + op = CloudVisionReferenceImageCreateOperator( + location=LOCATION_TEST, + product_id=PRODUCT_ID_TEST, + reference_image=REFERENCE_IMAGE_TEST, + task_id='id', + ) + op.execute(context=None) + mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID) + mock_hook.return_value.create_reference_image.assert_called_once_with( + location=LOCATION_TEST, + product_id=PRODUCT_ID_TEST, + reference_image=REFERENCE_IMAGE_TEST, + reference_image_id=None, + project_id=None, + retry=None, + timeout=None, + metadata=None, + ) + + +class CloudVisionAddProductToProductSetOperatorTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook') + def test_minimal_green_path(self, mock_hook): + op = CloudVisionAddProductToProductSetOperator( + location=LOCATION_TEST, + product_set_id=PRODUCTSET_ID_TEST, + product_id=PRODUCT_ID_TEST, + task_id='id', + ) + op.execute(context=None) + mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID) + mock_hook.return_value.add_product_to_product_set.assert_called_once_with( + product_set_id=PRODUCTSET_ID_TEST, + product_id=PRODUCT_ID_TEST, + location=LOCATION_TEST, + project_id=None, + retry=None, + timeout=None, + metadata=None, + ) + + +class CloudVisionRemoveProductFromProductSetOperatorTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook') + def test_minimal_green_path(self, mock_hook): + op = CloudVisionRemoveProductFromProductSetOperator( + location=LOCATION_TEST, + product_set_id=PRODUCTSET_ID_TEST, + product_id=PRODUCT_ID_TEST, + task_id='id', + ) + op.execute(context=None) + mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID) + mock_hook.return_value.remove_product_from_product_set.assert_called_once_with( + product_set_id=PRODUCTSET_ID_TEST, + product_id=PRODUCT_ID_TEST, + location=LOCATION_TEST, + project_id=None, + retry=None, + timeout=None, + metadata=None, + ) + + +class CloudVisionAnnotateImageOperatorTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook') + def test_minimal_green_path(self, mock_hook): + op = CloudVisionAnnotateImageOperator(request=ANNOTATE_REQUEST_TEST, task_id='id') + op.execute(context=None) + mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID) + mock_hook.return_value.annotate_image.assert_called_once_with( + request=ANNOTATE_REQUEST_TEST, retry=None, timeout=None + ) diff --git a/tests/contrib/operators/test_gcp_vision_operator_system.py b/tests/contrib/operators/test_gcp_vision_operator_system.py index 2b75642d6f..8618e61bcf 100644 --- a/tests/contrib/operators/test_gcp_vision_operator_system.py +++ b/tests/contrib/operators/test_gcp_vision_operator_system.py @@ -19,16 +19,41 @@ import unittest -from tests.contrib.utils.base_gcp_system_test_case import DagGcpSystemTestCase, SKIP_TEST_WARNING from tests.contrib.utils.gcp_authenticator import GCP_AI_KEY +from tests.contrib.operators.test_gcp_vision_operator_system_helper import GCPVisionTestHelper +from tests.contrib.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, DagGcpSystemTestCase + +VISION_HELPER = GCPVisionTestHelper() @unittest.skipIf(DagGcpSystemTestCase.skip_check(GCP_AI_KEY), SKIP_TEST_WARNING) class CloudVisionExampleDagsSystemTest(DagGcpSystemTestCase): def __init__(self, method_name='runTest'): super(CloudVisionExampleDagsSystemTest, self).__init__( - method_name, dag_id='example_gcp_vision', gcp_key=GCP_AI_KEY + method_name, dag_name='example_gcp_vision.py', gcp_key=GCP_AI_KEY ) - def test_run_example_dag_function(self): - self._run_dag() + def setUp(self): + super(CloudVisionExampleDagsSystemTest, self).setUp() + self.gcp_authenticator.gcp_authenticate() + try: + VISION_HELPER.create_bucket() + finally: + self.gcp_authenticator.gcp_revoke_authentication() + + def tearDown(self): + self.gcp_authenticator.gcp_authenticate() + try: + VISION_HELPER.delete_bucket() + finally: + self.gcp_authenticator.gcp_revoke_authentication() + super(CloudVisionExampleDagsSystemTest, self).tearDown() + + def test_run_example_gcp_vision_autogenerated_id_dag(self): + self._run_dag('example_gcp_vision_autogenerated_id') + + def test_run_example_gcp_vision_explicit_id_dag(self): + self._run_dag('example_gcp_vision_explicit_id') + + def test_run_example_gcp_vision_annotate_image_dag(self): + self._run_dag('example_gcp_vision_annotate_image') diff --git a/tests/contrib/operators/test_gcp_vision_operator_system_helper.py b/tests/contrib/operators/test_gcp_vision_operator_system_helper.py new file mode 100755 index 0000000000..3a88286ca2 --- /dev/null +++ b/tests/contrib/operators/test_gcp_vision_operator_system_helper.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os + +import argparse +from tempfile import NamedTemporaryFile + +from tests.contrib.utils.gcp_authenticator import GcpAuthenticator, GCP_AI_KEY +from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor + +GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project') +GCP_BUCKET_NAME = os.environ.get('GCP_VISION_BUCKET_NAME', 'vision-bucket-system-test') +GCP_REFERENCE_IMAGE_URL = os.environ.get('GCP_VISION_REFERENCE_IMAGE_URL', 'gs://bucket-name/image.png') +GCP_ANNOTATE_IMAGE_URL = os.environ.get('GCP_VISION_ANNOTATE_IMAGE_URL', 'gs://bucket-name/image.png') +GCP_VIDEO_SOURCE_URL = os.environ.get('GCP_VISION_SOURCE_IMAGE_URL', "http://google.com/image.jpg") + + +class GCPVisionTestHelper(LoggingCommandExecutor): + def create_bucket(self): + self.execute_cmd( + [ + 'gsutil', + 'mb', + "-p", + GCP_PROJECT_ID, + "-c", + "regional", + "-l", + "europe-north1", + "gs://%s/" % GCP_BUCKET_NAME, + ] + ) + + with NamedTemporaryFile(suffix=".png") as file: + self.execute_cmd(["curl", "-s", GCP_VIDEO_SOURCE_URL, "-o", file.name]) + self.execute_cmd(['gsutil', 'cp', file.name, GCP_REFERENCE_IMAGE_URL]) + self.execute_cmd(['gsutil', 'cp', file.name, GCP_ANNOTATE_IMAGE_URL]) + + def delete_bucket(self): + self.execute_cmd(['gsutil', 'rm', '-r', "gs://%s/" % GCP_BUCKET_NAME]) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Create and remove a bucket for system tests.') + parser.add_argument( + '--action', + dest='action', + required=True, + choices=('create-bucket', 'delete-bucket', 'before-tests', 'after-tests'), + ) + action = parser.parse_args().action + + helper = GCPVisionTestHelper() + gcp_authenticator = GcpAuthenticator(GCP_AI_KEY) + helper.log.info('Starting action: {}'.format(action)) + + gcp_authenticator.gcp_store_authentication() + try: + gcp_authenticator.gcp_authenticate() + if action == 'before-tests': + pass + elif action == 'after-tests': + pass + elif action == 'create-bucket': + helper.create_bucket() + elif action == 'delete-bucket': + helper.delete_bucket() + else: + raise Exception("Unknown action: {}".format(action)) + finally: + gcp_authenticator.gcp_restore_authentication() + + helper.log.info('Finishing action: {}'.format(action)) diff --git a/tests/contrib/utils/base_gcp_system_test_case.py b/tests/contrib/utils/base_gcp_system_test_case.py index d4b0359d94..1fdf2fdaac 100644 --- a/tests/contrib/utils/base_gcp_system_test_case.py +++ b/tests/contrib/utils/base_gcp_system_test_case.py @@ -119,8 +119,8 @@ class BaseGcpSystemTestCase(unittest.TestCase, LoggingMixin): class DagGcpSystemTestCase(BaseGcpSystemTestCase): def __init__(self, method_name, - dag_id, gcp_key, + dag_id=None, dag_name=None, require_local_executor=False, example_dags_folder=CONTRIB_OPERATORS_EXAMPLES_DAG_FOLDER, @@ -161,24 +161,21 @@ class DagGcpSystemTestCase(BaseGcpSystemTestCase): target_path = os.path.join(target_folder, file_name) if remove: try: - self.log.info("Remove symlink: {} -> {} ".format( - target_path, source_path)) + self.log.info("Remove symlink: %s -> %s", target_path, source_path) os.remove(target_path) except OSError: pass else: if not os.path.exists(target_path): - self.log.info("Symlink: {} -> {} ".format(target_path, source_path)) + self.log.info("Symlink: %s -> %s ", target_path, source_path) os.symlink(source_path, target_path) else: - self.log.info("Symlink {} already exists. Not symlinking it.". - format(target_path)) + self.log.info("Symlink %s already exists. Not symlinking it.", target_path) def _store_dags_to_temporary_directory(self): dag_folder = self._get_dag_folder() self.temp_dir = mkdtemp() - self.log.info("Storing DAGS from {} to temporary directory {}". - format(dag_folder, self.temp_dir)) + self.log.info("Storing DAGS from %s to temporary directory %s", dag_folder, self.temp_dir) try: os.mkdir(dag_folder) except OSError: @@ -188,20 +185,19 @@ class DagGcpSystemTestCase(BaseGcpSystemTestCase): def _restore_dags_from_temporary_directory(self): dag_folder = self._get_dag_folder() - self.log.info("Restoring DAGS to {} from temporary directory {}" - .format(dag_folder, self.temp_dir)) + self.log.info("Restoring DAGS to %s from temporary directory %s", dag_folder, self.temp_dir) for file in os.listdir(self.temp_dir): move(os.path.join(self.temp_dir, file), os.path.join(dag_folder, file)) - def _run_dag(self): - self.log.info("Attempting to run DAG: {}".format(self.dag_id)) + def _run_dag(self, dag_id=None): + self.log.info("Attempting to run DAG: %s", self.dag_id) if not self.setup_called: raise AirflowException("Please make sure to call super.setUp() in your " "test class!") dag_folder = self._get_dag_folder() dag_bag = models.DagBag(dag_folder=dag_folder, include_examples=False) self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} - dag = dag_bag.get_dag(self.dag_id) + dag = dag_bag.get_dag(self.dag_id or dag_id) if dag is None: raise AirflowException( "The Dag {} could not be found. It's either an import problem or " diff --git a/tests/utils/test_decorators.py b/tests/utils/test_decorators.py index 3a196dd8a1..5e6ff5f935 100644 --- a/tests/utils/test_decorators.py +++ b/tests/utils/test_decorators.py @@ -18,7 +18,8 @@ # under the License. import unittest -from airflow.utils.decorators import apply_defaults + +from airflow.utils.decorators import apply_defaults, cached_property from airflow.exceptions import AirflowException @@ -72,3 +73,29 @@ class ApplyDefaultTest(unittest.TestCase): default_args = {'random_params': True} with self.assertRaisesRegexp(AirflowException, 'Argument.*test_param.*required'): DummyClass(default_args=default_args) + + +class FixtureClass: + @cached_property + def value(self): + """Fixture docstring""" + return 1, object() + + +class FixtureSubClass(FixtureClass): + pass + + +class CachedPropertyTest(unittest.TestCase): + + def setUp(self): + self.test_obj = FixtureClass() + self.test_sub_obj = FixtureSubClass() + + def test_cache_works(self): + self.assertIs(self.test_obj.value, self.test_obj.value) + self.assertIs(self.test_sub_obj.value, self.test_sub_obj.value) + + def test_docstring(self): + self.assertEqual(FixtureClass.value.__doc__, "Fixture docstring") + self.assertEqual(FixtureSubClass.value.__doc__, "Fixture docstring")