[AIRFLOW-3908] Add more Google Cloud Vision operators (#4791)

This commit is contained in:
Kamil Breguła 2019-03-21 11:49:57 +01:00 коммит произвёл Kaxil Naik
Родитель ae295382a0
Коммит 707d6f2a50
15 изменённых файлов: 1771 добавлений и 433 удалений

Просмотреть файл

@ -23,22 +23,36 @@ Vision service in the Google Cloud Platform.
This DAG relies on the following OS environment variables
* GCP_VISION_LOCATION - Google Cloud Platform zone where the instance exists.
* GCP_VISION_LOCATION - Zone where the instance exists.
* GCP_VISION_PRODUCT_SET_ID - Product Set ID.
* GCP_VISION_PRODUCT_ID - Product ID.
* GCP_VISION_REFERENCE_IMAGE_ID - Reference Image ID.
* GCP_VISION_REFERENCE_IMAGE_URL - A link to the bucket that contains the reference image.
* GCP_VISION_ANNOTATE_IMAGE_URL - A link to the bucket that contains the file to be annotated.
"""
import os
# [START howto_operator_vision_retry_import]
from google.api_core.retry import Retry
# [END howto_operator_vision_retry_import]
# [START howto_operator_vision_productset_import]
from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet
# [END howto_operator_vision_productset_import]
# [START howto_operator_vision_product_set_import]
from google.cloud.vision_v1.types import ProductSet
# [END howto_operator_vision_product_set_import]
# [START howto_operator_vision_product_import]
from google.cloud.vision_v1.proto.product_search_service_pb2 import Product
from google.cloud.vision_v1.types import Product
# [END howto_operator_vision_product_import]
# [START howto_operator_vision_reference_image_import]
from google.cloud.vision_v1.types import ReferenceImage
# [END howto_operator_vision_reference_image_import]
# [START howto_operator_vision_enums_import]
from google.cloud.vision import enums
# [END howto_operator_vision_enums_import]
import airflow
from airflow import models
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.gcp_vision_operator import (
CloudVisionProductSetCreateOperator,
@ -49,6 +63,10 @@ from airflow.contrib.operators.gcp_vision_operator import (
CloudVisionProductGetOperator,
CloudVisionProductUpdateOperator,
CloudVisionProductDeleteOperator,
CloudVisionReferenceImageCreateOperator,
CloudVisionAddProductToProductSetOperator,
CloudVisionRemoveProductFromProductSetOperator,
CloudVisionAnnotateImageOperator,
)
default_args = {'start_date': airflow.utils.dates.days_ago(1)}
@ -57,25 +75,45 @@ default_args = {'start_date': airflow.utils.dates.days_ago(1)}
GCP_VISION_LOCATION = os.environ.get('GCP_VISION_LOCATION', 'europe-west1')
# [END howto_operator_vision_args_common]
# [START howto_operator_vision_productset]
product_set = ProductSet(display_name='My Product Set 1')
# [END howto_operator_vision_productset]
# [START howto_operator_vision_product]
product = Product(display_name='My Product 1', product_category='toys')
# [END howto_operator_vision_product]
# [START howto_operator_vision_productset_explicit_id]
# [START howto_operator_vision_product_set_explicit_id]
GCP_VISION_PRODUCT_SET_ID = os.environ.get('GCP_VISION_PRODUCT_SET_ID', 'product_set_explicit_id')
# [END howto_operator_vision_productset_explicit_id]
# [END howto_operator_vision_product_set_explicit_id]
# [START howto_operator_vision_product_explicit_id]
GCP_VISION_PRODUCT_ID = os.environ.get('GCP_VISION_PRODUCT_ID', 'product_explicit_id')
# [END howto_operator_vision_product_explicit_id]
# [START howto_operator_vision_reference_image_args]
GCP_VISION_REFERENCE_IMAGE_ID = os.environ.get('GCP_VISION_REFERENCE_IMAGE_ID', 'reference_image_explicit_id')
GCP_VISION_REFERENCE_IMAGE_URL = os.environ.get('GCP_VISION_REFERENCE_IMAGE_URL', 'gs://bucket/image1.jpg')
# [END howto_operator_vision_reference_image_args]
# [START howto_operator_vision_annotate_image_url]
GCP_VISION_ANNOTATE_IMAGE_URL = os.environ.get('GCP_VISION_ANNOTATE_IMAGE_URL', 'gs://bucket/image2.jpg')
# [END howto_operator_vision_annotate_image_url]
# [START howto_operator_vision_product_set]
product_set = ProductSet(display_name='My Product Set')
# [END howto_operator_vision_product_set]
# [START howto_operator_vision_product]
product = Product(display_name='My Product 1', product_category='toys')
# [END howto_operator_vision_product]
# [START howto_operator_vision_reference_image]
reference_image = ReferenceImage(uri=GCP_VISION_REFERENCE_IMAGE_URL)
# [END howto_operator_vision_reference_image]
# [START howto_operator_vision_annotate_image_request]
annotate_image_request = {
'image': {'source': {'image_uri': GCP_VISION_ANNOTATE_IMAGE_URL}},
'features': [{'type': enums.Feature.Type.LOGO_DETECTION}],
}
# [END howto_operator_vision_annotate_image_request]
with models.DAG(
'example_gcp_vision', default_args=default_args, schedule_interval=None # Override to match your needs
) as dag:
'example_gcp_vision_autogenerated_id', default_args=default_args, schedule_interval=None
) as dag_autogenerated_id:
# ################################## #
# ### Autogenerated IDs examples ### #
# ################################## #
@ -150,9 +188,59 @@ with models.DAG(
)
# [END howto_operator_vision_product_delete]
product_set_create >> product_set_get >> product_set_update >> product_set_delete
# [START howto_operator_vision_reference_image_create]
reference_image_create = CloudVisionReferenceImageCreateOperator(
location=GCP_VISION_LOCATION,
reference_image=reference_image,
product_id="{{ task_instance.xcom_pull('product_create') }}",
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id='reference_image_create',
)
# [END howto_operator_vision_reference_image_create]
# [START howto_operator_vision_add_product_to_product_set]
add_product_to_product_set = CloudVisionAddProductToProductSetOperator(
location=GCP_VISION_LOCATION,
product_set_id="{{ task_instance.xcom_pull('product_set_create') }}",
product_id="{{ task_instance.xcom_pull('product_create') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id='add_product_to_product_set',
)
# [END howto_operator_vision_add_product_to_product_set]
# [START howto_operator_vision_remove_product_from_product_set]
remove_product_from_product_set = CloudVisionRemoveProductFromProductSetOperator(
location=GCP_VISION_LOCATION,
product_set_id="{{ task_instance.xcom_pull('product_set_create') }}",
product_id="{{ task_instance.xcom_pull('product_create') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id='remove_product_from_product_set',
)
# [END howto_operator_vision_remove_product_from_product_set]
# Product path
product_create >> product_get >> product_update >> product_delete
# ProductSet path
product_set_create >> product_set_get >> product_set_update >> product_set_delete
# ReferenceImage path
product_create >> reference_image_create >> product_delete
# Product/ProductSet path
product_create >> add_product_to_product_set
product_set_create >> add_product_to_product_set
add_product_to_product_set >> remove_product_from_product_set
remove_product_from_product_set >> product_delete
remove_product_from_product_set >> product_set_delete
with models.DAG(
'example_gcp_vision_explicit_id', default_args=default_args, schedule_interval=None
) as dag_explicit_id:
# ############################# #
# ### Explicit IDs examples ### #
# ############################# #
@ -241,5 +329,87 @@ with models.DAG(
)
# [END howto_operator_vision_product_delete_2]
# [START howto_operator_vision_reference_image_create_2]
reference_image_create_2 = CloudVisionReferenceImageCreateOperator(
location=GCP_VISION_LOCATION,
reference_image=reference_image,
product_id=GCP_VISION_PRODUCT_ID,
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id='reference_image_create_2',
)
# [END howto_operator_vision_reference_image_create_2]
# Second 'create' task with the same product_id to demonstrate idempotence
reference_image_create_2_idempotence = CloudVisionReferenceImageCreateOperator(
location=GCP_VISION_LOCATION,
reference_image=reference_image,
product_id=GCP_VISION_PRODUCT_ID,
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id='reference_image_create_2_idempotence',
)
# [START howto_operator_vision_add_product_to_product_set_2]
add_product_to_product_set_2 = CloudVisionAddProductToProductSetOperator(
location=GCP_VISION_LOCATION,
product_set_id=GCP_VISION_PRODUCT_SET_ID,
product_id=GCP_VISION_PRODUCT_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id='add_product_to_product_set_2',
)
# [END howto_operator_vision_add_product_to_product_set_2]
# [START howto_operator_vision_remove_product_from_product_set_2]
remove_product_from_product_set_2 = CloudVisionRemoveProductFromProductSetOperator(
location=GCP_VISION_LOCATION,
product_set_id=GCP_VISION_PRODUCT_SET_ID,
product_id=GCP_VISION_PRODUCT_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id='remove_product_from_product_set_2',
)
# [END howto_operator_vision_remove_product_from_product_set_2]
# Product path
product_create_2 >> product_create_2_idempotence >> product_get_2 >> product_update_2 >> product_delete_2
# ProductSet path
product_set_create_2 >> product_set_get_2 >> product_set_update_2 >> product_set_delete_2
product_create_2 >> product_get_2 >> product_update_2 >> product_delete_2
product_set_create_2 >> product_set_create_2_idempotence >> product_set_delete_2
# ReferenceImage path
product_create_2 >> reference_image_create_2 >> reference_image_create_2_idempotence >> product_delete_2
# Product/ProductSet path
add_product_to_product_set_2 >> remove_product_from_product_set_2
product_set_create_2 >> add_product_to_product_set_2
product_create_2 >> add_product_to_product_set_2
remove_product_from_product_set_2 >> product_set_delete_2
remove_product_from_product_set_2 >> product_delete_2
with models.DAG(
'example_gcp_vision_annotate_image', default_args=default_args, schedule_interval=None
) as dag_annotate_image:
# ############################## #
# ### Annotate image example ### #
# ############################## #
# [START howto_operator_vision_annotate_image]
annotate_image = CloudVisionAnnotateImageOperator(
request=annotate_image_request, retry=Retry(maximum=10.0), timeout=5, task_id='annotate_image'
)
# [END howto_operator_vision_annotate_image]
# [START howto_operator_vision_annotate_image_result]
annotate_image_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('annotate_image')"
"['logoAnnotations'][0]['description'] }}",
task_id='annotate_image_result',
)
# [END howto_operator_vision_annotate_image_result]
annotate_image >> annotate_image_result

Просмотреть файл

@ -27,6 +27,8 @@ import google.oauth2.service_account
import os
import tempfile
from google.api_core.exceptions import GoogleAPICallError, AlreadyExists, RetryError
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
@ -159,6 +161,27 @@ class GoogleCloudBaseHook(BaseHook):
def project_id(self):
return self._get_field('project')
@staticmethod
def catch_http_exception(func):
@functools.wraps(func)
def wrapper_decorator(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except GoogleAPICallError as e:
if isinstance(e, AlreadyExists):
raise e
else:
self.log.error('The request failed:\n%s', str(e))
raise AirflowException(e)
except RetryError as e:
self.log.error('The request failed due to a retryable error and retry attempts failed.')
raise AirflowException(e)
except ValueError as e:
self.log.error('The request failed, the parameters are invalid.')
raise AirflowException(e)
return wrapper_decorator
@staticmethod
def fallback_to_default_project_id(func):
"""

Просмотреть файл

@ -18,12 +18,12 @@
# under the License.
from copy import deepcopy
from google.api_core.exceptions import AlreadyExists, GoogleAPICallError, RetryError
from google.cloud.vision_v1 import ProductSearchClient
from google.cloud.vision_v1 import ProductSearchClient, ImageAnnotatorClient
from google.protobuf.json_format import MessageToDict
from airflow import AirflowException
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.utils.decorators import cached_property
class NameDeterminer:
@ -114,6 +114,11 @@ class CloudVisionHook(GoogleCloudBaseHook):
self._client = ProductSearchClient(credentials=self._get_credentials())
return self._client
@cached_property
def annotator_client(self):
return ImageAnnotatorClient(credentials=self._get_credentials())
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def create_product_set(
self,
@ -132,8 +137,7 @@ class CloudVisionHook(GoogleCloudBaseHook):
client = self.get_conn()
parent = ProductSearchClient.location_path(project_id, location)
self.log.info('Creating a new ProductSet under the parent: %s', parent)
response = self._handle_request(
lambda **kwargs: client.create_product_set(**kwargs),
response = client.create_product_set(
parent=parent,
product_set=product_set,
product_set_id=product_set_id,
@ -151,6 +155,7 @@ class CloudVisionHook(GoogleCloudBaseHook):
return product_set_id
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def get_product_set(
self, location, product_set_id, project_id=None, retry=None, timeout=None, metadata=None
@ -162,17 +167,12 @@ class CloudVisionHook(GoogleCloudBaseHook):
client = self.get_conn()
name = ProductSearchClient.product_set_path(project_id, location, product_set_id)
self.log.info('Retrieving ProductSet: %s', name)
response = self._handle_request(
lambda **kwargs: client.get_product_set(**kwargs),
name=name,
retry=retry,
timeout=timeout,
metadata=metadata,
)
response = client.get_product_set(name=name, retry=retry, timeout=timeout, metadata=metadata)
self.log.info('ProductSet retrieved.')
self.log.debug('ProductSet retrieved:\n%s', response)
return MessageToDict(response)
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def update_product_set(
self,
@ -194,18 +194,14 @@ class CloudVisionHook(GoogleCloudBaseHook):
product_set, product_set_id, location, project_id
)
self.log.info('Updating ProductSet: %s', product_set.name)
response = self._handle_request(
lambda **kwargs: client.update_product_set(**kwargs),
product_set=product_set,
update_mask=update_mask,
retry=retry,
timeout=timeout,
metadata=metadata,
response = client.update_product_set(
product_set=product_set, update_mask=update_mask, retry=retry, timeout=timeout, metadata=metadata
)
self.log.info('ProductSet updated: %s', response.name if response else '')
self.log.debug('ProductSet updated:\n%s', response)
return MessageToDict(response)
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def delete_product_set(
self, location, product_set_id, project_id=None, retry=None, timeout=None, metadata=None
@ -217,16 +213,10 @@ class CloudVisionHook(GoogleCloudBaseHook):
client = self.get_conn()
name = ProductSearchClient.product_set_path(project_id, location, product_set_id)
self.log.info('Deleting ProductSet: %s', name)
response = self._handle_request(
lambda **kwargs: client.delete_product_set(**kwargs),
name=name,
retry=retry,
timeout=timeout,
metadata=metadata,
)
client.delete_product_set(name=name, retry=retry, timeout=timeout, metadata=metadata)
self.log.info('ProductSet with the name [%s] deleted.', name)
return response
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def create_product(
self, location, product, project_id=None, product_id=None, retry=None, timeout=None, metadata=None
@ -238,8 +228,7 @@ class CloudVisionHook(GoogleCloudBaseHook):
client = self.get_conn()
parent = ProductSearchClient.location_path(project_id, location)
self.log.info('Creating a new Product under the parent: %s', parent)
response = self._handle_request(
lambda **kwargs: client.create_product(**kwargs),
response = client.create_product(
parent=parent,
product=product,
product_id=product_id,
@ -257,6 +246,7 @@ class CloudVisionHook(GoogleCloudBaseHook):
return product_id
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def get_product(self, location, product_id, project_id=None, retry=None, timeout=None, metadata=None):
"""
@ -266,17 +256,12 @@ class CloudVisionHook(GoogleCloudBaseHook):
client = self.get_conn()
name = ProductSearchClient.product_path(project_id, location, product_id)
self.log.info('Retrieving Product: %s', name)
response = self._handle_request(
lambda **kwargs: client.get_product(**kwargs),
name=name,
retry=retry,
timeout=timeout,
metadata=metadata,
)
response = client.get_product(name=name, retry=retry, timeout=timeout, metadata=metadata)
self.log.info('Product retrieved.')
self.log.debug('Product retrieved:\n%s', response)
return MessageToDict(response)
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def update_product(
self,
@ -296,18 +281,14 @@ class CloudVisionHook(GoogleCloudBaseHook):
client = self.get_conn()
product = self.product_name_determiner.get_entity_with_name(product, product_id, location, project_id)
self.log.info('Updating ProductSet: %s', product.name)
response = self._handle_request(
lambda **kwargs: client.update_product(**kwargs),
product=product,
update_mask=update_mask,
retry=retry,
timeout=timeout,
metadata=metadata,
response = client.update_product(
product=product, update_mask=update_mask, retry=retry, timeout=timeout, metadata=metadata
)
self.log.info('Product updated: %s', response.name if response else '')
self.log.debug('Product updated:\n%s', response)
return MessageToDict(response)
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def delete_product(self, location, product_id, project_id=None, retry=None, timeout=None, metadata=None):
"""
@ -317,38 +298,150 @@ class CloudVisionHook(GoogleCloudBaseHook):
client = self.get_conn()
name = ProductSearchClient.product_path(project_id, location, product_id)
self.log.info('Deleting ProductSet: %s', name)
response = self._handle_request(
lambda **kwargs: client.delete_product(**kwargs),
name=name,
client.delete_product(name=name, retry=retry, timeout=timeout, metadata=metadata)
self.log.info('Product with the name [%s] deleted:', name)
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def create_reference_image(
self,
location,
product_id,
reference_image,
reference_image_id=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator`
"""
client = self.get_conn()
self.log.info('Creating ReferenceImage')
parent = ProductSearchClient.product_path(project=project_id, location=location, product=product_id)
response = client.create_reference_image(
parent=parent,
reference_image=reference_image,
reference_image_id=reference_image_id,
retry=retry,
timeout=timeout,
metadata=metadata,
)
self.log.info('Product with the name [%s] deleted:', name)
return response
def _handle_request(self, fun, **kwargs):
try:
return fun(**kwargs)
except GoogleAPICallError as e:
if isinstance(e, AlreadyExists):
raise e
else:
self.log.error('The request failed:\n%s', str(e))
raise AirflowException(e)
except RetryError as e:
self.log.error('The request failed due to a retryable error and retry attempts failed.')
raise AirflowException(e)
except ValueError as e:
self.log.error('The request failed, the parameters are invalid.')
raise AirflowException(e)
self.log.info('ReferenceImage created: %s', response.name if response else '')
self.log.debug('ReferenceImage created:\n%s', response)
@staticmethod
def _get_entity_name(is_product, project_id, location, entity_id):
if is_product:
return ProductSearchClient.product_path(project_id, location, entity_id)
else:
return ProductSearchClient.product_set_path(project_id, location, entity_id)
if not reference_image_id:
# Refernece image id was generated by the API
reference_image_id = self._get_autogenerated_id(response)
self.log.info(
'Extracted autogenerated ReferenceImage ID from the response: %s', reference_image_id
)
return reference_image_id
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def delete_reference_image(
self,
location,
product_id,
reference_image_id,
project_id=None,
retry=None,
timeout=None,
metadata=None,
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator`
"""
client = self.get_conn()
self.log.info('Deleting ReferenceImage')
name = ProductSearchClient.reference_image_path(
project=project_id, location=location, product=product_id, reference_image=reference_image_id
)
response = client.delete_reference_image(name=name, retry=retry, timeout=timeout, metadata=metadata)
self.log.info('ReferenceImage with the name [%s] deleted.', name)
return MessageToDict(response)
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def add_product_to_product_set(
self,
product_set_id,
product_id,
location=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionAddProductToProductSetOperator`
"""
client = self.get_conn()
product_name = ProductSearchClient.product_path(project_id, location, product_id)
product_set_name = ProductSearchClient.product_set_path(project_id, location, product_set_id)
self.log.info('Add Product[name=%s] to Product Set[name=%s]', product_name, product_set_name)
client.add_product_to_product_set(
name=product_set_name, product=product_name, retry=retry, timeout=timeout, metadata=metadata
)
self.log.info('Product added to Product Set')
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def remove_product_from_product_set(
self,
product_set_id,
product_id,
location=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionRemoveProductFromProductSetOperator`
"""
client = self.get_conn()
product_name = ProductSearchClient.product_path(project_id, location, product_id)
product_set_name = ProductSearchClient.product_set_path(project_id, location, product_set_id)
self.log.info('Remove Product[name=%s] from Product Set[name=%s]', product_name, product_set_name)
client.remove_product_from_product_set(
name=product_set_name, product=product_name, retry=retry, timeout=timeout, metadata=metadata
)
self.log.info('Product removed from Product Set')
@GoogleCloudBaseHook.catch_http_exception
def annotate_image(self, request, retry=None, timeout=None):
"""
For the documentation see:
:py:class:`~airflow.contrib.operators.gcp_vision_image_annotator_operator.CloudVisionAnnotateImage`
"""
client = self.annotator_client
self.log.info('Annotating image')
response = client.annotate_image(request=request, retry=retry, timeout=timeout)
self.log.info('Image annotated')
return MessageToDict(response)
@staticmethod
def _get_autogenerated_id(response):

Просмотреть файл

@ -53,13 +53,13 @@ class CloudVisionProductSetCreateOperator(BaseOperator):
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_productset_create_template_fields]
template_fields = ('location', 'project_id', 'product_set_id', 'gcp_conn_id')
template_fields = ("location", "project_id", "product_set_id", "gcp_conn_id")
# [END vision_productset_create_template_fields]
@apply_defaults
@ -72,7 +72,7 @@ class CloudVisionProductSetCreateOperator(BaseOperator):
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
gcp_conn_id="google_cloud_default",
*args,
**kwargs
):
@ -100,7 +100,7 @@ class CloudVisionProductSetCreateOperator(BaseOperator):
)
except AlreadyExists:
self.log.info(
'Product set with id %s already exists. Exiting from the create operation.',
"Product set with id %s already exists. Exiting from the create operation.",
self.product_set_id,
)
return self.product_set_id
@ -130,8 +130,8 @@ class CloudVisionProductSetGetOperator(BaseOperator):
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
@ -216,8 +216,8 @@ class CloudVisionProductSetUpdateOperator(BaseOperator):
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
@ -292,8 +292,8 @@ class CloudVisionProductSetDeleteOperator(BaseOperator):
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
@ -326,7 +326,7 @@ class CloudVisionProductSetDeleteOperator(BaseOperator):
self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
def execute(self, context):
return self._hook.delete_product_set(
self._hook.delete_product_set(
location=self.location,
product_set_id=self.product_set_id,
project_id=self.project_id,
@ -342,9 +342,9 @@ class CloudVisionProductCreateOperator(BaseOperator):
Possible errors regarding the `Product` object provided:
- Returns INVALID_ARGUMENT if `display_name` is missing or longer than 4096 characters.
- Returns INVALID_ARGUMENT if `description` is longer than 4096 characters.
- Returns INVALID_ARGUMENT if `product_category` is missing or invalid.
- Returns `INVALID_ARGUMENT` if `display_name` is missing or longer than 4096 characters.
- Returns `INVALID_ARGUMENT` if `description` is longer than 4096 characters.
- Returns `INVALID_ARGUMENT` if `product_category` is missing or invalid.
.. seealso::
For more information on how to use this operator, take a look at the guide:
@ -372,8 +372,8 @@ class CloudVisionProductCreateOperator(BaseOperator):
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
@ -431,7 +431,7 @@ class CloudVisionProductGetOperator(BaseOperator):
Possible errors:
- Returns NOT_FOUND if the `Product` does not exist.
- Returns `NOT_FOUND` if the `Product` does not exist.
.. seealso::
For more information on how to use this operator, take a look at the guide:
@ -453,8 +453,8 @@ class CloudVisionProductGetOperator(BaseOperator):
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
@ -472,7 +472,7 @@ class CloudVisionProductGetOperator(BaseOperator):
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
gcp_conn_id="google_cloud_default",
*args,
**kwargs
):
@ -516,11 +516,12 @@ class CloudVisionProductUpdateOperator(BaseOperator):
Possible errors related to the provided `Product`:
- Returns NOT_FOUND if the Product does not exist.
- Returns INVALID_ARGUMENT if display_name is present in update_mask but is missing from the request or
longer than 4096 characters.
- Returns INVALID_ARGUMENT if description is present in update_mask but is longer than 4096 characters.
- Returns INVALID_ARGUMENT if product_category is present in update_mask.
- Returns `NOT_FOUND` if the Product does not exist.
- Returns `INVALID_ARGUMENT` if `display_name` is present in update_mask but is missing from the request
or longer than 4096 characters.
- Returns `INVALID_ARGUMENT` if `description` is present in update_mask but is longer than 4096
characters.
- Returns `INVALID_ARGUMENT` if `product_category` is present in update_mask.
.. seealso::
For more information on how to use this operator, take a look at the guide:
@ -550,8 +551,8 @@ class CloudVisionProductUpdateOperator(BaseOperator):
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
@ -608,7 +609,7 @@ class CloudVisionProductDeleteOperator(BaseOperator):
Possible errors:
- Returns NOT_FOUND if the product does not exist.
- Returns `NOT_FOUND` if the product does not exist.
.. seealso::
For more information on how to use this operator, take a look at the guide:
@ -630,8 +631,8 @@ class CloudVisionProductDeleteOperator(BaseOperator):
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
@ -663,7 +664,7 @@ class CloudVisionProductDeleteOperator(BaseOperator):
self._hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
def execute(self, context):
return self._hook.delete_product(
self._hook.delete_product(
location=self.location,
product_id=self.product_id,
project_id=self.project_id,
@ -671,3 +672,292 @@ class CloudVisionProductDeleteOperator(BaseOperator):
timeout=self.timeout,
metadata=self.metadata,
)
class CloudVisionAnnotateImageOperator(BaseOperator):
"""
Run image detection and annotation for an image.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionAnnotateImageOperator`
:param request: (Required) Individual file annotation requests.
If a dict is provided, it must be of the same form as the protobuf
message class:`google.cloud.vision_v1.types.AnnotateImageRequest`
:type request: dict or google.cloud.vision_v1.types.AnnotateImageRequest
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_annotate_image_template_fields]
template_fields = ('request', 'gcp_conn_id')
# [END vision_annotate_image_template_fields]
@apply_defaults
def __init__(
self, request, retry=None, timeout=None, gcp_conn_id='google_cloud_default', *args, **kwargs
):
super(CloudVisionAnnotateImageOperator, self).__init__(*args, **kwargs)
self.request = request
self.retry = retry
self.timeout = timeout
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
return hook.annotate_image(request=self.request, retry=self.retry, timeout=self.timeout)
class CloudVisionReferenceImageCreateOperator(BaseOperator):
"""
Creates and returns a new ReferenceImage ID resource.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionReferenceImageCreateOperator`
:param location: (Required) The region where the Product is located. Valid regions (as of 2019-02-05) are:
us-east1, us-west1, europe-west1, asia-east1
:type location: str
:param reference_image: (Required) The reference image to create. If an image ID is specified, it is
ignored.
If a dict is provided, it must be of the same form as the protobuf message
:class:`google.cloud.vision_v1.types.ReferenceImage`
:type reference_image: dict or google.cloud.vision_v1.types.ReferenceImage
:param reference_image_id: (Optional) A user-supplied resource id for the ReferenceImage to be added.
If set, the server will attempt to use this value as the resource id. If it is already in use, an
error is returned with code ALREADY_EXISTS. Must be at most 128 characters long. It cannot contain
the character `/`.
:type reference_image_id: str
:param product_id: (Optional) The resource id of this Product.
:type product_id: str
:param project_id: (Optional) The project in which the Product is located. If set to None or
missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_reference_image_create_template_fields]
template_fields = (
"location",
"reference_image",
"product_id",
"reference_image_id",
"project_id",
"gcp_conn_id",
)
# [END vision_reference_image_create_template_fields]
@apply_defaults
def __init__(
self,
location,
reference_image,
product_id,
reference_image_id=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id='google_cloud_default',
*args,
**kwargs
):
super(CloudVisionReferenceImageCreateOperator, self).__init__(*args, **kwargs)
self.location = location
self.product_id = product_id
self.reference_image = reference_image
self.reference_image_id = reference_image_id
self.project_id = project_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
try:
hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
return hook.create_reference_image(
location=self.location,
product_id=self.product_id,
reference_image=self.reference_image,
reference_image_id=self.reference_image_id,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
except AlreadyExists:
self.log.info(
"ReferenceImage with id %s already exists. Exiting from the create operation.",
self.product_id,
)
return self.reference_image_id
class CloudVisionAddProductToProductSetOperator(BaseOperator):
"""
Adds a Product to the specified ProductSet. If the Product is already present, no change is made.
One Product can be added to at most 100 ProductSets.
Possible errors:
- Returns `NOT_FOUND` if the Product or the ProductSet doesnt exist.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionAddProductToProductSetOperator`
:param product_set_id: (Required) The resource id for the ProductSet to modify.
:type product_set_id: str
:param product_id: (Required) The resource id of this Product.
:type product_id: str
:param location: (Required) The region where the ProductSet is located. Valid regions (as of 2019-02-05)
are: us-east1, us-west1, europe-west1, asia-east1
:type: str
:param project_id: (Optional) The project in which the Product is located. If set to None or
missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_add_product_to_product_set_template_fields]
template_fields = ("location", "product_set_id", "product_id", "project_id", "gcp_conn_id")
# [END vision_add_product_to_product_set_template_fields]
@apply_defaults
def __init__(
self,
product_set_id,
product_id,
location,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
):
super(CloudVisionAddProductToProductSetOperator, self).__init__(*args, **kwargs)
self.product_set_id = product_set_id
self.product_id = product_id
self.location = location
self.project_id = project_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
return hook.add_product_to_product_set(
product_set_id=self.product_set_id,
product_id=self.product_id,
location=self.location,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
class CloudVisionRemoveProductFromProductSetOperator(BaseOperator):
"""
Removes a Product from the specified ProductSet.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVisionRemoveProductFromProductSetOperator`
:param product_set_id: (Required) The resource id for the ProductSet to modify.
:type product_set_id: str
:param product_id: (Required) The resource id of this Product.
:type product_id: str
:param location: (Required) The region where the ProductSet is located. Valid regions (as of 2019-02-05)
are: us-east1, us-west1, europe-west1, asia-east1
:type: str
:param project_id: (Optional) The project in which the Product is located. If set to None or
missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: (Optional) A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: (Optional) The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: float
:param metadata: (Optional) Additional metadata that is provided to the method.
:type metadata: sequence[tuple[str, str]]
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
# [START vision_remove_product_from_product_set_template_fields]
template_fields = ("location", "product_set_id", "product_id", "project_id", "gcp_conn_id")
# [END vision_remove_product_from_product_set_template_fields]
@apply_defaults
def __init__(
self,
product_set_id,
product_id,
location,
project_id=None,
retry=None,
timeout=None,
metadata=None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
):
super(CloudVisionRemoveProductFromProductSetOperator, self).__init__(*args, **kwargs)
self.product_set_id = product_set_id
self.product_id = product_id
self.location = location
self.project_id = project_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
return hook.remove_product_from_product_set(
product_set_id=self.product_set_id,
product_id=self.product_id,
location=self.location,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)

Просмотреть файл

@ -103,3 +103,19 @@ if 'BUILDING_AIRFLOW_DOCS' in os.environ:
# flake8: noqa: F811
# Monkey patch hook to get good function headers while building docs
apply_defaults = lambda x: x
class cached_property:
"""
A decorator creating a property, the value of which is calculated only once and cached for later use.
"""
def __init__(self, func):
self.func = func
self.__doc__ = getattr(func, '__doc__')
def __get__(self, instance, cls=None):
if instance is None:
return self
result = self.func(instance)
instance.__dict__[self.func.__name__] = result
return result

Просмотреть файл

@ -209,6 +209,8 @@ Operators
.. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator
.. autoclass:: airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator
.. autoclass:: airflow.contrib.operators.gcp_translate_operator.CloudTranslateTextOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionAddProductToProductSetOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionAnnotateImageOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator
@ -217,6 +219,8 @@ Operators
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator
.. autoclass:: airflow.contrib.operators.gcp_vision_operator.CloudVisionRemoveProductFromProductSetOperator
.. autoclass:: airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageBucketCreateAclEntryOperator
.. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator
.. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator

Просмотреть файл

@ -22,15 +22,15 @@ Google Cloud Vision Operators
:depth: 1
:local:
.. _howto/operator:CloudVisionProductSetCreateOperator:
.. _howto/operator:CloudVisionAddProductToProductSetOperator:
CloudVisionProductSetCreateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
CloudVisionAddProductToProductSetOperator
-----------------------------------------
Creates a new :code:`ProductSet` resource.
Creates a new :code:`ReferenceImage` resource.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionAddProductToProductSetOperator`
Arguments
"""""""""
@ -38,110 +38,59 @@ Arguments
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_explicit_id]
:end-before: [END howto_operator_vision_productset_explicit_id]
:language: python
:start-after: [START howto_operator_vision_product_explicit_id]
:end-before: [END howto_operator_vision_product_explicit_id]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set_explicit_id]
:end-before: [END howto_operator_vision_product_set_explicit_id]
Using the operator
""""""""""""""""""
We are using the ``ProductSet`` and ``Retry`` objects from Google libraries:
We are using the :class:`~google.cloud.vision_v1.types.Product`,
:class:`~google.cloud.vision_v1.types.ProductSet` and ``Retry`` objects from
Google libraries:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_import]
:end-before: [END howto_operator_vision_productset_import]
:language: python
:start-after: [START howto_operator_vision_retry_import]
:end-before: [END howto_operator_vision_retry_import]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_retry_import]
:end-before: [END howto_operator_vision_retry_import]
:language: python
:start-after: [START howto_operator_vision_product_set_import]
:end-before: [END howto_operator_vision_product_set_import]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset]
:end-before: [END howto_operator_vision_productset]
:language: python
:start-after: [START howto_operator_vision_product_import]
:end-before: [END howto_operator_vision_product_import]
The ``product_set_id`` argument can be omitted (it will be generated by the API):
If ``product_set_id`` and ``product_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_create]
:end-before: [END howto_operator_vision_product_set_create]
Or it can be specified explicitly:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_create_2]
:end-before: [END howto_operator_vision_product_set_create_2]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_create_template_fields]
:end-before: [END vision_productset_create_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet create documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.create_product_set>`_.
.. _howto/operator:CloudVisionProductSetGetOperator:
CloudVisionProductSetGetOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Gets information associated with a :code:`ProductSet`.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_explicit_id]
:end-before: [END howto_operator_vision_productset_explicit_id]
Using the operator
""""""""""""""""""
If ``product_set_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_get]
:end-before: [END howto_operator_vision_product_set_get]
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_add_product_to_product_set]
:end-before: [END howto_operator_vision_add_product_to_product_set]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_get_2]
:end-before: [END howto_operator_vision_product_set_get_2]
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_add_product_to_product_set_2]
:end-before: [END howto_operator_vision_add_product_to_product_set_2]
Templating
""""""""""
@ -149,36 +98,25 @@ Templating
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_get_template_fields]
:end-before: [END vision_productset_get_template_fields]
:start-after: [START vision_add_product_to_product_set_template_fields]
:end-before: [END vision_add_product_to_product_set_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet get documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.get_product_set>`_.
See `Google Cloud Vision Add Product To Product Set documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.add_product_to_product_set>`_.
.. _howto/operator:CloudVisionProductSetUpdateOperator:
CloudVisionProductSetUpdateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. _howto/operator:CloudVisionAnnotateImageOperator:
Makes changes to a :code:`ProductSet` resource. Only :code:`display_name` can be updated
currently.
CloudVisionAnnotateImageOperator
--------------------------------
.. note:: To locate the `ProductSet` resource, its `name` in the form
``projects/PROJECT_ID/locations/LOC_ID/productSets/PRODUCT_SET_ID`` is necessary.
You can provide the `name` directly as an attribute of the `product_set` object.
However, you can leave it blank and provide `location` and `product_set_id` instead (and
optionally `project_id` - if not present, the connection default will be used) and the
`name` will be created by the operator itself.
This mechanism exists for your convenience, to allow leaving the `project_id` empty and
having Airflow use the connection default `project_id`.
Run image detection and annotation for an image.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionAnnotateImageOperator`
Arguments
"""""""""
@ -186,47 +124,47 @@ Arguments
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_explicit_id]
:end-before: [END howto_operator_vision_productset_explicit_id]
:language: python
:start-after: [START howto_operator_vision_annotate_image_url]
:end-before: [END howto_operator_vision_annotate_image_url]
Using the operator
""""""""""""""""""
We are using the ``ProductSet`` object from the Google Cloud Vision library:
We are using the :class:`~google.cloud.enums` and ``Retry`` objects from
Google libraries:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_import]
:end-before: [END howto_operator_vision_productset_import]
:language: python
:start-after: [START howto_operator_vision_retry_import]
:end-before: [END howto_operator_vision_retry_import]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset]
:end-before: [END howto_operator_vision_productset]
:language: python
:start-after: [START howto_operator_vision_enums_import]
:end-before: [END howto_operator_vision_enums_import]
Initialization of the task:
If ``product_set_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_update]
:end-before: [END howto_operator_vision_product_set_update]
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_annotate_image]
:end-before: [END howto_operator_vision_annotate_image]
Otherwise it can be specified explicitly:
The result can be extracted from XCOM:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_update_2]
:end-before: [END howto_operator_vision_product_set_update_2]
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_annotate_image_result]
:end-before: [END howto_operator_vision_annotate_image_result]
Templating
""""""""""
@ -234,80 +172,19 @@ Templating
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_update_template_fields]
:end-before: [END vision_productset_update_template_fields]
:start-after: [START vision_annotate_image_template_fields]
:end-before: [END vision_annotate_image_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet update documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.update_product_set>`_.
.. _howto/operator:CloudVisionProductSetDeleteOperator:
CloudVisionProductSetDeleteOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Permanently deletes a :code:`ProductSet`. :code:`Products` and :code:`ReferenceImages` in
the :code:`ProductSet` are not deleted. The actual image files are not deleted from
Google Cloud Storage.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_productset_explicit_id]
:end-before: [END howto_operator_vision_productset_explicit_id]
Using the operator
""""""""""""""""""
If ``product_set_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_delete]
:end-before: [END howto_operator_vision_product_set_delete]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_delete_2]
:end-before: [END howto_operator_vision_product_set_delete_2]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_delete_template_fields]
:end-before: [END vision_productset_delete_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet delete documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.delete_product_set>`_.
See `Google Cloud Vision Annotate Image documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ImageAnnotatorClient.annotate_image>`_.
.. _howto/operator:CloudVisionProductCreateOperator:
CloudVisionProductCreateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
--------------------------------
Creates and returns a new product resource.
@ -387,10 +264,77 @@ More information
See `Google Cloud Vision Product create documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.create_product>`_.
.. _howto/operator:CloudVisionProductDeleteOperator:
CloudVisionProductDeleteOperator
--------------------------------
Permanently deletes a product and its reference images.
Metadata of the product and all its images will be deleted right away, but search queries
against :code:`ProductSets` containing the product may still work until all related
caches are refreshed.
Possible errors:
- Returns NOT_FOUND if the product does not exist.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_explicit_id]
:end-before: [END howto_operator_vision_product_explicit_id]
Using the operator
""""""""""""""""""
If ``product_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_delete]
:end-before: [END howto_operator_vision_product_delete]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_delete_2]
:end-before: [END howto_operator_vision_product_delete_2]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_product_delete_template_fields]
:end-before: [END vision_product_delete_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision Product delete documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.delete_product>`_.
.. _howto/operator:CloudVisionProductGetOperator:
CloudVisionProductGetOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-----------------------------
Gets information associated with a :code:`Product`.
@ -450,10 +394,292 @@ More information
See `Google Cloud Vision Product get documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.get_product>`_.
.. _howto/operator:CloudVisionProductSetCreateOperator:
CloudVisionProductSetCreateOperator
-----------------------------------
Creates a new :code:`ProductSet` resource.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set_explicit_id
:end-before: [END howto_operator_vision_product_set_explicit_id
Using the operator
""""""""""""""""""
We are using the ``ProductSet`` and ``Retry`` objects from Google libraries:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set_import]
:end-before: [END howto_operator_vision_product_set_import]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_retry_import]
:end-before: [END howto_operator_vision_retry_import]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set]
:end-before: [END howto_operator_vision_product_set]
The ``product_set_id`` argument can be omitted (it will be generated by the API):
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_create]
:end-before: [END howto_operator_vision_product_set_create]
Or it can be specified explicitly:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_create_2]
:end-before: [END howto_operator_vision_product_set_create_2]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_create_template_fields]
:end-before: [END vision_productset_create_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet create documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.create_product_set>`_.
.. _howto/operator:CloudVisionProductSetDeleteOperator:
CloudVisionProductSetDeleteOperator
-----------------------------------
Permanently deletes a :code:`ProductSet`. :code:`Products` and :code:`ReferenceImages` in
the :code:`ProductSet` are not deleted. The actual image files are not deleted from
Google Cloud Storage.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set_explicit_id]
:end-before: [END howto_operator_vision_product_set_explicit_id]
Using the operator
""""""""""""""""""
If ``product_set_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_delete]
:end-before: [END howto_operator_vision_product_set_delete]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_delete_2]
:end-before: [END howto_operator_vision_product_set_delete_2]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_delete_template_fields]
:end-before: [END vision_productset_delete_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet delete documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.delete_product_set>`_.
.. _howto/operator:CloudVisionProductSetGetOperator:
CloudVisionProductSetGetOperator
--------------------------------
Gets information associated with a :code:`ProductSet`.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set_explicit_id]
:end-before: [END howto_operator_vision_product_set_explicit_id]
Using the operator
""""""""""""""""""
If ``product_set_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_get]
:end-before: [END howto_operator_vision_product_set_get]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_get_2]
:end-before: [END howto_operator_vision_product_set_get_2]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_get_template_fields]
:end-before: [END vision_productset_get_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet get documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.get_product_set>`_.
.. _howto/operator:CloudVisionProductSetUpdateOperator:
CloudVisionProductSetUpdateOperator
-----------------------------------
Makes changes to a :code:`ProductSet` resource. Only :code:`display_name` can be updated
currently.
.. note:: To locate the `ProductSet` resource, its `name` in the form
``projects/PROJECT_ID/locations/LOC_ID/productSets/PRODUCT_SET_ID`` is necessary.
You can provide the `name` directly as an attribute of the `product_set` object.
However, you can leave it blank and provide `location` and `product_set_id` instead (and
optionally `project_id` - if not present, the connection default will be used) and the
`name` will be created by the operator itself.
This mechanism exists for your convenience, to allow leaving the `project_id` empty and
having Airflow use the connection default `project_id`.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set_explicit_id]
:end-before: [END howto_operator_vision_product_set_explicit_id]
Using the operator
""""""""""""""""""
We are using the ``ProductSet`` object from the Google Cloud Vision library:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set_import]
:end-before: [END howto_operator_vision_product_set_import]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set]
:end-before: [END howto_operator_vision_product_set]
Initialization of the task:
If ``product_set_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_update]
:end-before: [END howto_operator_vision_product_set_update]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_set_update_2]
:end-before: [END howto_operator_vision_product_set_update_2]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_productset_update_template_fields]
:end-before: [END vision_productset_update_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision ProductSet update documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.update_product_set>`_.
.. _howto/operator:CloudVisionProductUpdateOperator:
CloudVisionProductUpdateOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
--------------------------------
Makes changes to a :code:`Product` resource. Only the :code:`display_name`,
:code:`description`, and :code:`labels` fields can be updated right now.
@ -544,23 +770,15 @@ More information
See `Google Cloud Vision Product update documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.update_product>`_.
.. _howto/operator:CloudVisionProductDeleteOperator:
.. _howto/operator:CloudVisionReferenceImageCreateOperator:
CloudVisionProductDeleteOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
CloudVisionReferenceImageCreateOperator
---------------------------------------
Permanently deletes a product and its reference images.
Metadata of the product and all its images will be deleted right away, but search queries
against :code:`ProductSets` containing the product may still work until all related
caches are refreshed.
Possible errors:
- Returns NOT_FOUND if the product does not exist.
Creates a new :code:`ReferenceImage` resource.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator`
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator`
Arguments
"""""""""
@ -568,33 +786,51 @@ Arguments
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_explicit_id]
:end-before: [END howto_operator_vision_product_explicit_id]
:language: python
:start-after: [START howto_operator_vision_reference_image_args]
:end-before: [END howto_operator_vision_reference_image_args]
Using the operator
""""""""""""""""""
If ``product_id`` was generated by the API it can be extracted from XCOM:
We are using the :class:`~google.cloud.vision_v1.types.ReferenceImage` and ``Retry`` objects from Google libraries:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_delete]
:end-before: [END howto_operator_vision_product_delete]
Otherwise it can be specified explicitly:
:language: python
:start-after: [START howto_operator_vision_reference_image_import]
:end-before: [END howto_operator_vision_reference_image_import]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_product_delete_2]
:end-before: [END howto_operator_vision_product_delete_2]
:language: python
:start-after: [START howto_operator_vision_retry_import]
:end-before: [END howto_operator_vision_retry_import]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_reference_image]
:end-before: [END howto_operator_vision_reference_image]
The ``product_set_id`` argument can be omitted (it will be generated by the API):
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_reference_image_create]
:end-before: [END howto_operator_vision_reference_image_create]
Or it can be specified explicitly:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_reference_image_create_2]
:end-before: [END howto_operator_vision_reference_image_create_2]
Templating
""""""""""
@ -602,11 +838,96 @@ Templating
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_product_delete_template_fields]
:end-before: [END vision_product_delete_template_fields]
:start-after: [START vision_reference_image_create_template_fields]
:end-before: [END vision_reference_image_create_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision Product delete documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.delete_product>`_.
See `Google Cloud Vision ReferenceImage create documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.create_reference_image>`_.
.. _howto/operator:CloudVisionRemoveProductFromProductSetOperator:
CloudVisionRemoveProductFromProductSetOperator
----------------------------------------------
Creates a new :code:`ReferenceImage` resource.
For parameter definition, take a look at
:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionRemoveProductFromProductSetOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_args_common]
:end-before: [END howto_operator_vision_args_common]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_explicit_id]
:end-before: [END howto_operator_vision_product_explicit_id]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set_explicit_id]
:end-before: [END howto_operator_vision_product_set_explicit_id]
Using the operator
""""""""""""""""""
We are using the :class:`~google.cloud.vision_v1.types.Product`,
:class:`~google.cloud.vision_v1.types.ProductSet` and ``Retry`` objects from
Google libraries:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_retry_import]
:end-before: [END howto_operator_vision_retry_import]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_set_import]
:end-before: [END howto_operator_vision_product_set_import]
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:start-after: [START howto_operator_vision_product_import]
:end-before: [END howto_operator_vision_product_import]
If ``product_set_id`` and ``product_id`` was generated by the API it can be extracted from XCOM:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_remove_product_from_product_set]
:end-before: [END howto_operator_vision_remove_product_from_product_set]
Otherwise it can be specified explicitly:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_vision.py
:language: python
:dedent: 4
:start-after: [START howto_operator_vision_remove_product_from_product_set_2]
:end-before: [END howto_operator_vision_remove_product_from_product_set_2]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_vision_operator.py
:language: python
:dedent: 4
:start-after: [START vision_remove_product_from_product_set_template_fields]
:end-before: [END vision_remove_product_from_product_set_template_fields]
More information
""""""""""""""""
See `Google Cloud Vision Remove Product From Product Set documentation
<https://googleapis.github.io/google-cloud-python/latest/vision/gapic/v1/api.html#google.cloud.vision_v1.ProductSearchClient.remove_product_from_product_set>`_.

Просмотреть файл

@ -599,22 +599,30 @@ Cloud Vision
Cloud Vision Product Search Operators
"""""""""""""""""""""""""""""""""""""
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionAddProductToProductSetOperator`
Adds a Product to the specified ProductSet.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionAnnotateImageOperator`
Run image detection and annotation for an image.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator`
Creates a new Product resource.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator`
Permanently deletes a product and its reference images.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator`
Gets information associated with a Product.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator`
Creates a new ProductSet resource.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator`
Permanently deletes a ProductSet.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator`
Gets information associated with a ProductSet.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator`
Makes changes to a ProductSet resource.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator`
Permanently deletes a ProductSet.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator`
Creates a new Product resource.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator`
Gets information associated with a Product.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator`
Makes changes to a Product resource.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator`
Permanently deletes a product and its reference images.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator`
Creates a new ReferenceImage resource.
:class:`airflow.contrib.operators.gcp_vision_operator.CloudVisionRemoveProductFromProductSetOperator`
Removes a Product from the specified ProductSet.
They also use :class:`airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook` to communicate with Google Cloud Platform.

Просмотреть файл

@ -21,10 +21,16 @@
import os
import unittest
from parameterized import parameterized
from google.api_core.exceptions import RetryError, AlreadyExists
from google.cloud.exceptions import MovedPermanently
from airflow import AirflowException, LoggingMixin
from airflow.contrib.hooks import gcp_api_base_hook as hook
import google.auth
from google.auth.exceptions import GoogleAuthError
try:
from StringIO import StringIO
except ImportError:
@ -46,6 +52,55 @@ except GoogleAuthError:
default_creds_available = False
class TestCatchHttpException(unittest.TestCase):
def test_no_exception(self):
self.called = False
class FixtureClass(LoggingMixin):
@hook.GoogleCloudBaseHook.catch_http_exception
def text_fixture(*args, **kwargs):
self.called = True
FixtureClass().text_fixture()
self.assertTrue(self.called)
@parameterized.expand(
[
(MovedPermanently("MESSAGE"),),
(RetryError("MESSAGE", cause=Exception("MESSAGE")),),
(ValueError("MESSAGE"),),
]
)
def test_raise_airflowexception(self, ex_obj):
self.called = False
class FixtureClass(LoggingMixin):
@hook.GoogleCloudBaseHook.catch_http_exception
def test_fixutre(*args, **kwargs):
self.called = True
raise ex_obj
with self.assertRaises(AirflowException):
FixtureClass().test_fixutre()
self.assertTrue(self.called)
def test_raise_alreadyexists(self):
self.called = False
class FixtureClass(LoggingMixin):
@hook.GoogleCloudBaseHook.catch_http_exception
def test_fixutre(*args, **kwargs):
self.called = True
raise AlreadyExists("MESSAGE")
with self.assertRaises(AlreadyExists):
FixtureClass().test_fixutre()
self.assertTrue(self.called)
class TestGoogleCloudBaseHook(unittest.TestCase):
def setUp(self):
self.instance = hook.GoogleCloudBaseHook()
@ -81,7 +136,7 @@ class TestGoogleCloudBaseHook(unittest.TestCase):
'Default GCP credentials not available to run tests')
def test_default_creds_no_scopes(self):
self.instance.extras = {
'extra__google_cloud_platform__project': default_project,
'extra__google_cloud_platform__project': default_project
}
credentials = self.instance._get_credentials()
@ -96,19 +151,17 @@ class TestGoogleCloudBaseHook(unittest.TestCase):
def test_provide_gcp_credential_file_decorator_key_path(self):
key_path = '/test/key-path'
self.instance.extras = {
'extra__google_cloud_platform__key_path': key_path
}
self.instance.extras = {'extra__google_cloud_platform__key_path': key_path}
@hook.GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
def assert_gcp_credential_file_in_env(hook_instance):
self.assertEqual(os.environ[hook._G_APP_CRED_ENV_VAR],
key_path)
assert_gcp_credential_file_in_env(self.instance)
@mock.patch('tempfile.NamedTemporaryFile')
def test_provide_gcp_credential_file_decorator_key_content(self,
mock_file):
def test_provide_gcp_credential_file_decorator_key_content(self, mock_file):
string_file = StringIO()
file_content = '{"foo": "bar"}'
file_name = '/test/mock-file'
@ -124,4 +177,5 @@ class TestGoogleCloudBaseHook(unittest.TestCase):
self.assertEqual(os.environ[hook._G_APP_CRED_ENV_VAR],
file_name)
self.assertEqual(file_content, string_file.getvalue())
assert_gcp_credential_file_in_env(self.instance)

Просмотреть файл

@ -19,8 +19,9 @@
import unittest
from google.cloud.vision import enums
from google.cloud.vision_v1 import ProductSearchClient
from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet, Product
from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet, Product, ReferenceImage
from google.protobuf.json_format import MessageToDict
from parameterized import parameterized
@ -43,8 +44,26 @@ LOC_ID_TEST = 'loc-id'
LOC_ID_TEST_2 = 'loc-id-2'
PRODUCTSET_ID_TEST = 'ps-id'
PRODUCTSET_ID_TEST_2 = 'ps-id-2'
PRODUCTSET_NAME_TEST = 'projects/{}/locations/{}/productSets/{}'.format(
PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST
)
PRODUCT_ID_TEST = 'p-id'
PRODUCT_ID_TEST_2 = 'p-id-2'
PRODUCT_NAME_TEST = "projects/{}/locations/{}/products/{}".format(
PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST
)
PRODUCT_NAME = "projects/{}/locations/{}/products/{}".format(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST)
REFERENCE_IMAGE_ID_TEST = 'ri-id'
REFERENCE_IMAGE_GEN_ID_TEST = 'ri-id'
ANNOTATE_IMAGE_REQUEST = {
'image': {'source': {'image_uri': "gs://bucket-name/object-name"}},
'features': [{'type': enums.Feature.Type.LOGO_DETECTION}],
}
REFERENCE_IMAGE_NAME_TEST = "projects/{}/locations/{}/products/{}/referenceImages/{}".format(
PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST, REFERENCE_IMAGE_ID_TEST
)
REFERENCE_IMAGE_TEST = ReferenceImage(name=REFERENCE_IMAGE_GEN_ID_TEST)
REFERENCE_IMAGE_WITHOUT_ID_NAME = ReferenceImage()
class TestGcpVisionHook(unittest.TestCase):
@ -53,7 +72,7 @@ class TestGcpVisionHook(unittest.TestCase):
'airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.__init__',
new=mock_base_gcp_hook_default_project_id,
):
self.vision_hook_default_project_id = CloudVisionHook(gcp_conn_id='test')
self.hook = CloudVisionHook(gcp_conn_id='test')
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_create_productset_explicit_id(self, get_conn):
@ -61,10 +80,9 @@ class TestGcpVisionHook(unittest.TestCase):
create_product_set_method = get_conn.return_value.create_product_set
create_product_set_method.return_value = None
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product_set = ProductSet()
# When
result = hook.create_product_set(
result = self.hook.create_product_set(
location=LOC_ID_TEST,
product_set_id=PRODUCTSET_ID_TEST,
product_set=product_set,
@ -73,6 +91,7 @@ class TestGcpVisionHook(unittest.TestCase):
timeout=None,
metadata=None,
)
# Then
# ProductSet ID was provided explicitly in the method call above, should be returned from the method
self.assertEqual(result, PRODUCTSET_ID_TEST)
@ -95,10 +114,9 @@ class TestGcpVisionHook(unittest.TestCase):
create_product_set_method = get_conn.return_value.create_product_set
create_product_set_method.return_value = response_product_set
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product_set = ProductSet()
# When
result = hook.create_product_set(
result = self.hook.create_product_set(
location=LOC_ID_TEST, product_set_id=None, product_set=product_set, project_id=PROJECT_ID_TEST
)
# Then
@ -121,11 +139,10 @@ class TestGcpVisionHook(unittest.TestCase):
create_product_set_method = get_conn.return_value.create_product_set
create_product_set_method.return_value = response_product_set
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product_set = ProductSet()
# When
with self.assertRaises(AirflowException) as cm:
hook.create_product_set(
self.hook.create_product_set(
location=LOC_ID_TEST,
product_set_id=None,
product_set=product_set,
@ -154,9 +171,8 @@ class TestGcpVisionHook(unittest.TestCase):
response_product_set = ProductSet(name=name)
get_product_set_method = get_conn.return_value.get_product_set
get_product_set_method.return_value = response_product_set
hook = self.vision_hook_default_project_id
# When
response = hook.get_product_set(
response = self.hook.get_product_set(
location=LOC_ID_TEST, product_set_id=PRODUCTSET_ID_TEST, project_id=PROJECT_ID_TEST
)
# Then
@ -170,12 +186,11 @@ class TestGcpVisionHook(unittest.TestCase):
product_set = ProductSet()
update_product_set_method = get_conn.return_value.update_product_set
update_product_set_method.return_value = product_set
hook = self.vision_hook_default_project_id
productset_name = ProductSearchClient.product_set_path(
PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST
)
# When
result = hook.update_product_set(
result = self.hook.update_product_set(
location=LOC_ID_TEST,
product_set_id=PRODUCTSET_ID_TEST,
product_set=product_set,
@ -203,11 +218,10 @@ class TestGcpVisionHook(unittest.TestCase):
# Given
update_product_set_method = get_conn.return_value.update_product_set
update_product_set_method.return_value = None
hook = self.vision_hook_default_project_id
product_set = ProductSet()
# When
with self.assertRaises(AirflowException) as cm:
hook.update_product_set(
self.hook.update_product_set(
location=location,
product_set_id=product_set_id,
product_set=product_set,
@ -238,9 +252,8 @@ class TestGcpVisionHook(unittest.TestCase):
product_set = ProductSet(name=explicit_ps_name)
update_product_set_method = get_conn.return_value.update_product_set
update_product_set_method.return_value = product_set
hook = self.vision_hook_default_project_id
# When
result = hook.update_product_set(
result = self.hook.update_product_set(
location=location,
product_set_id=product_set_id,
product_set=product_set,
@ -265,7 +278,6 @@ class TestGcpVisionHook(unittest.TestCase):
# Given
update_product_set_method = get_conn.return_value.update_product_set
update_product_set_method.return_value = None
hook = self.vision_hook_default_project_id
explicit_ps_name = ProductSearchClient.product_set_path(
PROJECT_ID_TEST_2, LOC_ID_TEST_2, PRODUCTSET_ID_TEST_2
)
@ -278,7 +290,7 @@ class TestGcpVisionHook(unittest.TestCase):
# but both names differ (constructed != explicit).
# Should throw AirflowException in this case.
with self.assertRaises(AirflowException) as cm:
hook.update_product_set(
self.hook.update_product_set(
location=LOC_ID_TEST,
product_set_id=PRODUCTSET_ID_TEST,
product_set=product_set,
@ -306,25 +318,129 @@ class TestGcpVisionHook(unittest.TestCase):
delete_product_set_method = get_conn.return_value.delete_product_set
delete_product_set_method.return_value = None
name = ProductSearchClient.product_set_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCTSET_ID_TEST)
hook = self.vision_hook_default_project_id
# When
response = hook.delete_product_set(
response = self.hook.delete_product_set(
location=LOC_ID_TEST, product_set_id=PRODUCTSET_ID_TEST, project_id=PROJECT_ID_TEST
)
# Then
self.assertIsNone(response)
delete_product_set_method.assert_called_once_with(name=name, retry=None, timeout=None, metadata=None)
@mock.patch(
'airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn',
**{'return_value.create_reference_image.return_value': REFERENCE_IMAGE_TEST}
)
def test_create_reference_image_explicit_id(self, get_conn):
# Given
create_reference_image_method = get_conn.return_value.create_reference_image
# When
result = self.hook.create_reference_image(
project_id=PROJECT_ID_TEST,
location=LOC_ID_TEST,
product_id=PRODUCT_ID_TEST,
reference_image=REFERENCE_IMAGE_WITHOUT_ID_NAME,
reference_image_id=REFERENCE_IMAGE_ID_TEST,
)
# Then
# Product ID was provided explicitly in the method call above, should be returned from the method
self.assertEqual(result, REFERENCE_IMAGE_ID_TEST)
create_reference_image_method.assert_called_once_with(
parent=PRODUCT_NAME,
reference_image=REFERENCE_IMAGE_WITHOUT_ID_NAME,
reference_image_id=REFERENCE_IMAGE_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
@mock.patch(
'airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn',
**{'return_value.create_reference_image.return_value': REFERENCE_IMAGE_TEST}
)
def test_create_reference_image_autogenerated_id(self, get_conn):
# Given
create_reference_image_method = get_conn.return_value.create_reference_image
# When
result = self.hook.create_reference_image(
project_id=PROJECT_ID_TEST,
location=LOC_ID_TEST,
product_id=PRODUCT_ID_TEST,
reference_image=REFERENCE_IMAGE_TEST,
reference_image_id=REFERENCE_IMAGE_ID_TEST,
)
# Then
# Product ID was provided explicitly in the method call above, should be returned from the method
self.assertEqual(result, REFERENCE_IMAGE_GEN_ID_TEST)
create_reference_image_method.assert_called_once_with(
parent=PRODUCT_NAME,
reference_image=REFERENCE_IMAGE_TEST,
reference_image_id=REFERENCE_IMAGE_ID_TEST,
retry=None,
timeout=None,
metadata=None,
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_add_product_to_product_set(self, get_conn):
# Given
add_product_to_product_set_method = get_conn.return_value.add_product_to_product_set
# When
self.hook.add_product_to_product_set(
product_set_id=PRODUCTSET_ID_TEST,
product_id=PRODUCT_ID_TEST,
location=LOC_ID_TEST,
project_id=PROJECT_ID_TEST,
)
# Then
# Product ID was provided explicitly in the method call above, should be returned from the method
add_product_to_product_set_method.assert_called_once_with(
name=PRODUCTSET_NAME_TEST, product=PRODUCT_NAME_TEST, retry=None, timeout=None, metadata=None
)
# remove_product_from_product_set
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_remove_product_from_product_set(self, get_conn):
# Given
remove_product_from_product_set_method = get_conn.return_value.remove_product_from_product_set
# When
self.hook.remove_product_from_product_set(
product_set_id=PRODUCTSET_ID_TEST,
product_id=PRODUCT_ID_TEST,
location=LOC_ID_TEST,
project_id=PROJECT_ID_TEST,
)
# Then
# Product ID was provided explicitly in the method call above, should be returned from the method
remove_product_from_product_set_method.assert_called_once_with(
name=PRODUCTSET_NAME_TEST, product=PRODUCT_NAME_TEST, retry=None, timeout=None, metadata=None
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.annotator_client')
def test_annotate_image(self, annotator_client_mock):
# Given
annotate_image_method = annotator_client_mock.annotate_image
# When
self.hook.annotate_image(request=ANNOTATE_IMAGE_REQUEST)
# Then
# Product ID was provided explicitly in the method call above, should be returned from the method
annotate_image_method.assert_called_once_with(
request=ANNOTATE_IMAGE_REQUEST, retry=None, timeout=None
)
@mock.patch('airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook.get_conn')
def test_create_product_explicit_id(self, get_conn):
# Given
create_product_method = get_conn.return_value.create_product
create_product_method.return_value = None
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product = Product()
# When
result = hook.create_product(
result = self.hook.create_product(
location=LOC_ID_TEST, product_id=PRODUCT_ID_TEST, product=product, project_id=PROJECT_ID_TEST
)
# Then
@ -349,10 +465,9 @@ class TestGcpVisionHook(unittest.TestCase):
create_product_method = get_conn.return_value.create_product
create_product_method.return_value = response_product
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product = Product()
# When
result = hook.create_product(
result = self.hook.create_product(
location=LOC_ID_TEST, product_id=None, product=product, project_id=PROJECT_ID_TEST
)
# Then
@ -371,11 +486,10 @@ class TestGcpVisionHook(unittest.TestCase):
create_product_method = get_conn.return_value.create_product
create_product_method.return_value = response_product
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product = Product()
# When
with self.assertRaises(AirflowException) as cm:
hook.create_product(
self.hook.create_product(
location=LOC_ID_TEST, product_id=None, product=product, project_id=PROJECT_ID_TEST
)
# Then
@ -393,11 +507,10 @@ class TestGcpVisionHook(unittest.TestCase):
create_product_method = get_conn.return_value.create_product
create_product_method.return_value = response_product
parent = ProductSearchClient.location_path(PROJECT_ID_TEST, LOC_ID_TEST)
hook = self.vision_hook_default_project_id
product = Product()
# When
with self.assertRaises(AirflowException) as cm:
hook.create_product(
self.hook.create_product(
location=LOC_ID_TEST, product_id=None, product=product, project_id=PROJECT_ID_TEST
)
# Then
@ -414,10 +527,9 @@ class TestGcpVisionHook(unittest.TestCase):
product = Product()
update_product_method = get_conn.return_value.update_product
update_product_method.return_value = product
hook = self.vision_hook_default_project_id
product_name = ProductSearchClient.product_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST)
# When
result = hook.update_product(
result = self.hook.update_product(
location=LOC_ID_TEST,
product_id=PRODUCT_ID_TEST,
product=product,
@ -441,11 +553,10 @@ class TestGcpVisionHook(unittest.TestCase):
# Given
update_product_method = get_conn.return_value.update_product
update_product_method.return_value = None
hook = self.vision_hook_default_project_id
product = Product()
# When
with self.assertRaises(AirflowException) as cm:
hook.update_product(
self.hook.update_product(
location=location,
product_id=product_id,
product=product,
@ -476,9 +587,8 @@ class TestGcpVisionHook(unittest.TestCase):
product = Product(name=explicit_p_name)
update_product_method = get_conn.return_value.update_product
update_product_method.return_value = product
hook = self.vision_hook_default_project_id
# When
result = hook.update_product(
result = self.hook.update_product(
location=location,
product_id=product_id,
product=product,
@ -499,7 +609,6 @@ class TestGcpVisionHook(unittest.TestCase):
# Given
update_product_method = get_conn.return_value.update_product
update_product_method.return_value = None
hook = self.vision_hook_default_project_id
explicit_p_name = ProductSearchClient.product_path(
PROJECT_ID_TEST_2, LOC_ID_TEST_2, PRODUCT_ID_TEST_2
)
@ -510,7 +619,7 @@ class TestGcpVisionHook(unittest.TestCase):
# but both names differ (constructed != explicit).
# Should throw AirflowException in this case.
with self.assertRaises(AirflowException) as cm:
hook.update_product(
self.hook.update_product(
location=LOC_ID_TEST,
product_id=PRODUCT_ID_TEST,
product=product,
@ -538,9 +647,8 @@ class TestGcpVisionHook(unittest.TestCase):
delete_product_method = get_conn.return_value.delete_product
delete_product_method.return_value = None
name = ProductSearchClient.product_path(PROJECT_ID_TEST, LOC_ID_TEST, PRODUCT_ID_TEST)
hook = self.vision_hook_default_project_id
# When
response = hook.delete_product(
response = self.hook.delete_product(
location=LOC_ID_TEST, product_id=PRODUCT_ID_TEST, project_id=PROJECT_ID_TEST
)
# Then

Просмотреть файл

@ -20,7 +20,7 @@
import unittest
from google.api_core.exceptions import AlreadyExists
from google.cloud.vision_v1.proto.product_search_service_pb2 import ProductSet, Product
from google.cloud.vision_v1.types import ProductSet, Product, ReferenceImage
from airflow.contrib.operators.gcp_vision_operator import (
CloudVisionProductSetCreateOperator,
@ -31,6 +31,10 @@ from airflow.contrib.operators.gcp_vision_operator import (
CloudVisionProductGetOperator,
CloudVisionProductUpdateOperator,
CloudVisionProductDeleteOperator,
CloudVisionReferenceImageCreateOperator,
CloudVisionAddProductToProductSetOperator,
CloudVisionRemoveProductFromProductSetOperator,
CloudVisionAnnotateImageOperator,
)
try:
@ -46,6 +50,9 @@ PRODUCTSET_TEST = ProductSet(display_name='Test Product Set')
PRODUCTSET_ID_TEST = 'my-productset'
PRODUCT_TEST = Product(display_name='My Product 1', product_category='toys')
PRODUCT_ID_TEST = 'my-product'
REFERENCE_IMAGE_TEST = ReferenceImage(uri='gs://bucket_name/file.txt')
REFERENCE_IMAGE_ID_TEST = 'my-reference-image'
ANNOTATE_REQUEST_TEST = {'image': {'source': {'image_uri': 'https://foo.com/image.jpg'}}}
LOCATION_TEST = 'europe-west1'
GCP_CONN_ID = 'google_cloud_default'
@ -70,10 +77,10 @@ class CloudVisionProductSetCreateTest(unittest.TestCase):
)
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook.get_conn')
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook._handle_request')
def test_already_exists(self, _handle_request, get_conn):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook.create_product_set')
def test_already_exists(self, create_product_set_mock, get_conn):
get_conn.return_value = {}
_handle_request.side_effect = AlreadyExists(message='')
create_product_set_mock.side_effect = AlreadyExists(message='')
# Exception AlreadyExists not raised, caught in the operator's execute() - idempotence
op = CloudVisionProductSetCreateOperator(
location=LOCATION_TEST,
@ -163,10 +170,10 @@ class CloudVisionProductCreateTest(unittest.TestCase):
)
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook.get_conn')
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook._handle_request')
def test_already_exists(self, _handle_request, get_conn):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook.create_product')
def test_already_exists(self, create_product_mock, get_conn):
get_conn.return_value = {}
_handle_request.side_effect = AlreadyExists(message='')
create_product_mock.side_effect = AlreadyExists(message='')
# Exception AlreadyExists not raised, caught in the operator's execute() - idempotence
op = CloudVisionProductCreateOperator(
location=LOCATION_TEST,
@ -232,3 +239,109 @@ class CloudVisionProductDeleteTest(unittest.TestCase):
timeout=None,
metadata=None,
)
class CloudVisionReferenceImageCreateTest(unittest.TestCase):
@mock.patch(
'airflow.contrib.operators.gcp_vision_operator.CloudVisionHook',
**{'return_value.create_reference_image.return_value': {}}
)
def test_minimal_green_path(self, mock_hook):
op = CloudVisionReferenceImageCreateOperator(
location=LOCATION_TEST,
product_id=PRODUCT_ID_TEST,
reference_image=REFERENCE_IMAGE_TEST,
task_id='id',
)
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.create_reference_image.assert_called_once_with(
location=LOCATION_TEST,
product_id=PRODUCT_ID_TEST,
reference_image=REFERENCE_IMAGE_TEST,
reference_image_id=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
)
@mock.patch(
'airflow.contrib.operators.gcp_vision_operator.CloudVisionHook',
**{'return_value.create_reference_image.side_effect': AlreadyExists("MESSAGe")}
)
def test_already_exists(self, mock_hook):
# Exception AlreadyExists not raised, caught in the operator's execute() - idempotence
op = CloudVisionReferenceImageCreateOperator(
location=LOCATION_TEST,
product_id=PRODUCT_ID_TEST,
reference_image=REFERENCE_IMAGE_TEST,
task_id='id',
)
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.create_reference_image.assert_called_once_with(
location=LOCATION_TEST,
product_id=PRODUCT_ID_TEST,
reference_image=REFERENCE_IMAGE_TEST,
reference_image_id=None,
project_id=None,
retry=None,
timeout=None,
metadata=None,
)
class CloudVisionAddProductToProductSetOperatorTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
op = CloudVisionAddProductToProductSetOperator(
location=LOCATION_TEST,
product_set_id=PRODUCTSET_ID_TEST,
product_id=PRODUCT_ID_TEST,
task_id='id',
)
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.add_product_to_product_set.assert_called_once_with(
product_set_id=PRODUCTSET_ID_TEST,
product_id=PRODUCT_ID_TEST,
location=LOCATION_TEST,
project_id=None,
retry=None,
timeout=None,
metadata=None,
)
class CloudVisionRemoveProductFromProductSetOperatorTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
op = CloudVisionRemoveProductFromProductSetOperator(
location=LOCATION_TEST,
product_set_id=PRODUCTSET_ID_TEST,
product_id=PRODUCT_ID_TEST,
task_id='id',
)
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.remove_product_from_product_set.assert_called_once_with(
product_set_id=PRODUCTSET_ID_TEST,
product_id=PRODUCT_ID_TEST,
location=LOCATION_TEST,
project_id=None,
retry=None,
timeout=None,
metadata=None,
)
class CloudVisionAnnotateImageOperatorTest(unittest.TestCase):
@mock.patch('airflow.contrib.operators.gcp_vision_operator.CloudVisionHook')
def test_minimal_green_path(self, mock_hook):
op = CloudVisionAnnotateImageOperator(request=ANNOTATE_REQUEST_TEST, task_id='id')
op.execute(context=None)
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.annotate_image.assert_called_once_with(
request=ANNOTATE_REQUEST_TEST, retry=None, timeout=None
)

Просмотреть файл

@ -19,16 +19,41 @@
import unittest
from tests.contrib.utils.base_gcp_system_test_case import DagGcpSystemTestCase, SKIP_TEST_WARNING
from tests.contrib.utils.gcp_authenticator import GCP_AI_KEY
from tests.contrib.operators.test_gcp_vision_operator_system_helper import GCPVisionTestHelper
from tests.contrib.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, DagGcpSystemTestCase
VISION_HELPER = GCPVisionTestHelper()
@unittest.skipIf(DagGcpSystemTestCase.skip_check(GCP_AI_KEY), SKIP_TEST_WARNING)
class CloudVisionExampleDagsSystemTest(DagGcpSystemTestCase):
def __init__(self, method_name='runTest'):
super(CloudVisionExampleDagsSystemTest, self).__init__(
method_name, dag_id='example_gcp_vision', gcp_key=GCP_AI_KEY
method_name, dag_name='example_gcp_vision.py', gcp_key=GCP_AI_KEY
)
def test_run_example_dag_function(self):
self._run_dag()
def setUp(self):
super(CloudVisionExampleDagsSystemTest, self).setUp()
self.gcp_authenticator.gcp_authenticate()
try:
VISION_HELPER.create_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
VISION_HELPER.delete_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
super(CloudVisionExampleDagsSystemTest, self).tearDown()
def test_run_example_gcp_vision_autogenerated_id_dag(self):
self._run_dag('example_gcp_vision_autogenerated_id')
def test_run_example_gcp_vision_explicit_id_dag(self):
self._run_dag('example_gcp_vision_explicit_id')
def test_run_example_gcp_vision_annotate_image_dag(self):
self._run_dag('example_gcp_vision_annotate_image')

Просмотреть файл

@ -0,0 +1,90 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import argparse
from tempfile import NamedTemporaryFile
from tests.contrib.utils.gcp_authenticator import GcpAuthenticator, GCP_AI_KEY
from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
GCP_BUCKET_NAME = os.environ.get('GCP_VISION_BUCKET_NAME', 'vision-bucket-system-test')
GCP_REFERENCE_IMAGE_URL = os.environ.get('GCP_VISION_REFERENCE_IMAGE_URL', 'gs://bucket-name/image.png')
GCP_ANNOTATE_IMAGE_URL = os.environ.get('GCP_VISION_ANNOTATE_IMAGE_URL', 'gs://bucket-name/image.png')
GCP_VIDEO_SOURCE_URL = os.environ.get('GCP_VISION_SOURCE_IMAGE_URL', "http://google.com/image.jpg")
class GCPVisionTestHelper(LoggingCommandExecutor):
def create_bucket(self):
self.execute_cmd(
[
'gsutil',
'mb',
"-p",
GCP_PROJECT_ID,
"-c",
"regional",
"-l",
"europe-north1",
"gs://%s/" % GCP_BUCKET_NAME,
]
)
with NamedTemporaryFile(suffix=".png") as file:
self.execute_cmd(["curl", "-s", GCP_VIDEO_SOURCE_URL, "-o", file.name])
self.execute_cmd(['gsutil', 'cp', file.name, GCP_REFERENCE_IMAGE_URL])
self.execute_cmd(['gsutil', 'cp', file.name, GCP_ANNOTATE_IMAGE_URL])
def delete_bucket(self):
self.execute_cmd(['gsutil', 'rm', '-r', "gs://%s/" % GCP_BUCKET_NAME])
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Create and remove a bucket for system tests.')
parser.add_argument(
'--action',
dest='action',
required=True,
choices=('create-bucket', 'delete-bucket', 'before-tests', 'after-tests'),
)
action = parser.parse_args().action
helper = GCPVisionTestHelper()
gcp_authenticator = GcpAuthenticator(GCP_AI_KEY)
helper.log.info('Starting action: {}'.format(action))
gcp_authenticator.gcp_store_authentication()
try:
gcp_authenticator.gcp_authenticate()
if action == 'before-tests':
pass
elif action == 'after-tests':
pass
elif action == 'create-bucket':
helper.create_bucket()
elif action == 'delete-bucket':
helper.delete_bucket()
else:
raise Exception("Unknown action: {}".format(action))
finally:
gcp_authenticator.gcp_restore_authentication()
helper.log.info('Finishing action: {}'.format(action))

Просмотреть файл

@ -119,8 +119,8 @@ class BaseGcpSystemTestCase(unittest.TestCase, LoggingMixin):
class DagGcpSystemTestCase(BaseGcpSystemTestCase):
def __init__(self,
method_name,
dag_id,
gcp_key,
dag_id=None,
dag_name=None,
require_local_executor=False,
example_dags_folder=CONTRIB_OPERATORS_EXAMPLES_DAG_FOLDER,
@ -161,24 +161,21 @@ class DagGcpSystemTestCase(BaseGcpSystemTestCase):
target_path = os.path.join(target_folder, file_name)
if remove:
try:
self.log.info("Remove symlink: {} -> {} ".format(
target_path, source_path))
self.log.info("Remove symlink: %s -> %s", target_path, source_path)
os.remove(target_path)
except OSError:
pass
else:
if not os.path.exists(target_path):
self.log.info("Symlink: {} -> {} ".format(target_path, source_path))
self.log.info("Symlink: %s -> %s ", target_path, source_path)
os.symlink(source_path, target_path)
else:
self.log.info("Symlink {} already exists. Not symlinking it.".
format(target_path))
self.log.info("Symlink %s already exists. Not symlinking it.", target_path)
def _store_dags_to_temporary_directory(self):
dag_folder = self._get_dag_folder()
self.temp_dir = mkdtemp()
self.log.info("Storing DAGS from {} to temporary directory {}".
format(dag_folder, self.temp_dir))
self.log.info("Storing DAGS from %s to temporary directory %s", dag_folder, self.temp_dir)
try:
os.mkdir(dag_folder)
except OSError:
@ -188,20 +185,19 @@ class DagGcpSystemTestCase(BaseGcpSystemTestCase):
def _restore_dags_from_temporary_directory(self):
dag_folder = self._get_dag_folder()
self.log.info("Restoring DAGS to {} from temporary directory {}"
.format(dag_folder, self.temp_dir))
self.log.info("Restoring DAGS to %s from temporary directory %s", dag_folder, self.temp_dir)
for file in os.listdir(self.temp_dir):
move(os.path.join(self.temp_dir, file), os.path.join(dag_folder, file))
def _run_dag(self):
self.log.info("Attempting to run DAG: {}".format(self.dag_id))
def _run_dag(self, dag_id=None):
self.log.info("Attempting to run DAG: %s", self.dag_id)
if not self.setup_called:
raise AirflowException("Please make sure to call super.setUp() in your "
"test class!")
dag_folder = self._get_dag_folder()
dag_bag = models.DagBag(dag_folder=dag_folder, include_examples=False)
self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
dag = dag_bag.get_dag(self.dag_id)
dag = dag_bag.get_dag(self.dag_id or dag_id)
if dag is None:
raise AirflowException(
"The Dag {} could not be found. It's either an import problem or "

Просмотреть файл

@ -18,7 +18,8 @@
# under the License.
import unittest
from airflow.utils.decorators import apply_defaults
from airflow.utils.decorators import apply_defaults, cached_property
from airflow.exceptions import AirflowException
@ -72,3 +73,29 @@ class ApplyDefaultTest(unittest.TestCase):
default_args = {'random_params': True}
with self.assertRaisesRegexp(AirflowException, 'Argument.*test_param.*required'):
DummyClass(default_args=default_args)
class FixtureClass:
@cached_property
def value(self):
"""Fixture docstring"""
return 1, object()
class FixtureSubClass(FixtureClass):
pass
class CachedPropertyTest(unittest.TestCase):
def setUp(self):
self.test_obj = FixtureClass()
self.test_sub_obj = FixtureSubClass()
def test_cache_works(self):
self.assertIs(self.test_obj.value, self.test_obj.value)
self.assertIs(self.test_sub_obj.value, self.test_sub_obj.value)
def test_docstring(self):
self.assertEqual(FixtureClass.value.__doc__, "Fixture docstring")
self.assertEqual(FixtureSubClass.value.__doc__, "Fixture docstring")