diff --git a/airflow/contrib/example_dags/example_gcp_video_intelligence.py b/airflow/contrib/example_dags/example_gcp_video_intelligence.py new file mode 100644 index 0000000000..7be878d4f4 --- /dev/null +++ b/airflow/contrib/example_dags/example_gcp_video_intelligence.py @@ -0,0 +1,122 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that demonstrates operators for the Google Cloud Video Intelligence service in the Google +Cloud Platform. + +This DAG relies on the following OS environment variables: + +* GCP_BUCKET_NAME - Google Cloud Storage bucket where the file exists. +""" +import os + +# [START howto_operator_vision_retry_import] +from google.api_core.retry import Retry + +# [END howto_operator_vision_retry_import] + +import airflow +from airflow import models +from airflow.contrib.operators.gcp_video_intelligence_operator import ( + CloudVideoIntelligenceDetectVideoLabelsOperator, + CloudVideoIntelligenceDetectVideoExplicitContentOperator, + CloudVideoIntelligenceDetectVideoShotsOperator, +) +from airflow.operators.bash_operator import BashOperator + +default_args = {"start_date": airflow.utils.dates.days_ago(1)} + +# [START howto_operator_video_intelligence_os_args] +GCP_BUCKET_NAME = os.environ.get( + "GCP_VIDEO_INTELLIGENCE_BUCKET_NAME", "test-bucket-name" +) +# [END howto_operator_video_intelligence_os_args] + + +# [START howto_operator_video_intelligence_other_args] +INPUT_URI = "gs://{}/video.mp4".format(GCP_BUCKET_NAME) +# [END howto_operator_video_intelligence_other_args] + + +with models.DAG( + "example_gcp_video_intelligence", + default_args=default_args, + schedule_interval=None, # Override to match your needs +) as dag: + + # [START howto_operator_video_intelligence_detect_labels] + detect_video_label = CloudVideoIntelligenceDetectVideoLabelsOperator( + input_uri=INPUT_URI, + output_uri=None, + video_context=None, + timeout=5, + task_id="detect_video_label", + ) + # [END howto_operator_video_intelligence_detect_labels] + + # [START howto_operator_video_intelligence_detect_labels_result] + detect_video_label_result = BashOperator( + bash_command="echo {{ task_instance.xcom_pull('detect_video_label')" + "['annotationResults'][0]['shotLabelAnnotations'][0]['entity']}}", + task_id="detect_video_label_result", + ) + # [END howto_operator_video_intelligence_detect_labels_result] + + # [START howto_operator_video_intelligence_detect_explicit_content] + detect_video_explicit_content = CloudVideoIntelligenceDetectVideoExplicitContentOperator( + input_uri=INPUT_URI, + output_uri=None, + video_context=None, + retry=Retry(maximum=10.0), + timeout=5, + task_id="detect_video_explicit_content", + ) + # [END howto_operator_video_intelligence_detect_explicit_content] + + # [START howto_operator_video_intelligence_detect_explicit_content_result] + detect_video_explicit_content_result = BashOperator( + bash_command="echo {{ task_instance.xcom_pull('detect_video_explicit_content')" + "['annotationResults'][0]['explicitAnnotation']['frames'][0]}}", + task_id="detect_video_explicit_content_result", + ) + # [END howto_operator_video_intelligence_detect_explicit_content_result] + + # [START howto_operator_video_intelligence_detect_video_shots] + detect_video_shots = CloudVideoIntelligenceDetectVideoShotsOperator( + input_uri=INPUT_URI, + output_uri=None, + video_context=None, + retry=Retry(maximum=10.0), + timeout=5, + task_id="detect_video_shots", + ) + # [END howto_operator_video_intelligence_detect_video_shots] + + # [START howto_operator_video_intelligence_detect_video_shots_result] + detect_video_shots_result = BashOperator( + bash_command="echo {{ task_instance.xcom_pull('detect_video_shots')" + "['annotationResults'][0]['shotAnnotations'][0]}}", + task_id="detect_video_shots_result", + ) + # [END howto_operator_video_intelligence_detect_video_shots_result] + + detect_video_label >> detect_video_label_result + detect_video_explicit_content >> detect_video_explicit_content_result + detect_video_shots >> detect_video_shots_result diff --git a/airflow/contrib/hooks/gcp_video_intelligence_hook.py b/airflow/contrib/hooks/gcp_video_intelligence_hook.py new file mode 100644 index 0000000000..f666ea4418 --- /dev/null +++ b/airflow/contrib/hooks/gcp_video_intelligence_hook.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from google.cloud.videointelligence_v1 import VideoIntelligenceServiceClient + +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + + +class CloudVideoIntelligenceHook(GoogleCloudBaseHook): + """ + Hook for Google Cloud Video Intelligence APIs. + + :param gcp_conn_id: The connection ID to use when fetching connection info. + :type gcp_conn_id: str + :param delegate_to: The account to impersonate, if any. + For this to work, the service account making the request must have + domain-wide delegation enabled. + :type delegate_to: str + """ + + _conn = None + + def __init__(self, gcp_conn_id="google_cloud_default", delegate_to=None): + super(CloudVideoIntelligenceHook, self).__init__(gcp_conn_id, delegate_to) + + def get_conn(self): + """ + Returns Gcp Video Intelligence Service client + + :rtype: google.cloud.videointelligence_v1.VideoIntelligenceServiceClient + """ + if not self._conn: + self._conn = VideoIntelligenceServiceClient(credentials=self._get_credentials()) + return self._conn + + def annotate_video( + self, + input_uri=None, + input_content=None, + features=None, + video_context=None, + output_uri=None, + location=None, + retry=None, + timeout=None, + metadata=None, + ): + """ + Performs video annotation. + + :param input_uri: Input video location. Currently, only Google Cloud Storage URIs are supported, + which must be specified in the following format: ``gs://bucket-id/object-id``. + :type input_uri: str + :param input_content: The video data bytes. + If unset, the input video(s) should be specified via ``input_uri``. + If set, ``input_uri`` should be unset. + :type input_content: bytes + :param features: Requested video annotation features. + :type features: list[google.cloud.videointelligence_v1.VideoIntelligenceServiceClient.enums.Feature] + :param output_uri: Optional, location where the output (in JSON format) should be stored. Currently, + only Google Cloud Storage URIs are supported, which must be specified in the following format: + ``gs://bucket-id/object-id``. + :type output_uri: str + :param video_context: Optional, Additional video context and/or feature-specific parameters. + :type video_context: dict or google.cloud.videointelligence_v1.types.VideoContext + :param location: Optional, cloud region where annotation should take place. Supported cloud regions: + us-east1, us-west1, europe-west1, asia-east1. + If no region is specified, a region will be determined based on video file location. + :type location: str + :param retry: Retry object used to determine when/if to retry requests. + If None is specified, requests will not be retried. + :type retry: google.api_core.retry.Retry + :param timeout: Optional, The amount of time, in seconds, to wait for the request to complete. + Note that if retry is specified, the timeout applies to each individual attempt. + :type timeout: float + :param metadata: Optional, Additional metadata that is provided to the method. + :type metadata: seq[tuple[str, str]] + """ + client = self.get_conn() + return client.annotate_video( + input_uri=input_uri, + input_content=input_content, + features=features, + video_context=video_context, + output_uri=output_uri, + location_id=location, + retry=retry, + timeout=timeout, + metadata=metadata, + ) diff --git a/airflow/contrib/operators/gcp_video_intelligence_operator.py b/airflow/contrib/operators/gcp_video_intelligence_operator.py new file mode 100644 index 0000000000..6a8d085a17 --- /dev/null +++ b/airflow/contrib/operators/gcp_video_intelligence_operator.py @@ -0,0 +1,245 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from google.protobuf.json_format import MessageToDict + +from airflow.contrib.hooks.gcp_video_intelligence_hook import CloudVideoIntelligenceHook +from airflow.models import BaseOperator +from google.cloud.videointelligence_v1 import enums + + +class CloudVideoIntelligenceDetectVideoLabelsOperator(BaseOperator): + """ + Performs video annotation, annotating video labels. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudVideoIntelligenceDetectVideoLabelsOperator`. + + :param input_uri: Input video location. Currently, only Google Cloud Storage URIs are supported, + which must be specified in the following format: ``gs://bucket-id/object-id``. + :type input_uri: str + :param input_content: The video data bytes. + If unset, the input video(s) should be specified via ``input_uri``. + If set, ``input_uri`` should be unset. + :type input_content: bytes + :param output_uri: Optional, location where the output (in JSON format) should be stored. Currently, only + Google Cloud Storage URIs are supported, which must be specified in the following format: + ``gs://bucket-id/object-id``. + :type output_uri: str + :param video_context: Optional, Additional video context and/or feature-specific parameters. + :type video_context: dict or google.cloud.videointelligence_v1.types.VideoContext + :param location: Optional, cloud region where annotation should take place. Supported cloud regions: + us-east1, us-west1, europe-west1, asia-east1. If no region is specified, a region will be determined + based on video file location. + :type location: str + :param retry: Retry object used to determine when/if to retry requests. + If None is specified, requests will not be retried. + :type retry: google.api_core.retry.Retry + :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud + Platform. Defaults to ``google_cloud_default``. + :type gcp_conn_id: str + """ + + # [START gcp_video_intelligence_detect_labels_template_fields] + template_fields = ("input_uri", "output_uri", "gcp_conn_id") + # [END gcp_video_intelligence_detect_labels_template_fields] + + def __init__( + self, + input_uri, + input_content=None, + output_uri=None, + video_context=None, + location=None, + retry=None, + gcp_conn_id="google_cloud_default", + *args, + **kwargs + ): + super(CloudVideoIntelligenceDetectVideoLabelsOperator, self).__init__(*args, **kwargs) + self.input_uri = input_uri + self.input_content = input_content + self.output_uri = output_uri + self.video_context = video_context + self.location = location + self.retry = retry + self.gcp_conn_id = gcp_conn_id + + def execute(self, context): + hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id) + operation = hook.annotate_video( + input_uri=self.input_uri, + input_content=self.input_content, + video_context=self.video_context, + location=self.location, + retry=self.retry, + features=[enums.Feature.LABEL_DETECTION], + ) + self.log.info("Processing video for label annotations") + result = MessageToDict(operation.result()) + self.log.info("Finished processing.") + return result + + +class CloudVideoIntelligenceDetectVideoExplicitContentOperator(BaseOperator): + """ + Performs video annotation, annotating explicit content. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudVideoIntelligenceDetectVideoExplicitContentOperator` + + :param input_uri: Input video location. Currently, only Google Cloud Storage URIs are supported, + which must be specified in the following format: ``gs://bucket-id/object-id``. + :type input_uri: str + :param input_content: The video data bytes. + If unset, the input video(s) should be specified via ``input_uri``. + If set, ``input_uri`` should be unset. + :type input_content: bytes + :param output_uri: Optional, location where the output (in JSON format) should be stored. Currently, only + Google Cloud Storage URIs are supported, which must be specified in the following format: + ``gs://bucket-id/object-id``. + :type output_uri: str + :param video_context: Optional, Additional video context and/or feature-specific parameters. + :type video_context: dict or google.cloud.videointelligence_v1.types.VideoContext + :param location: Optional, cloud region where annotation should take place. Supported cloud regions: + us-east1, us-west1, europe-west1, asia-east1. If no region is specified, a region will be determined + based on video file location. + :type location: str + :param retry: Retry object used to determine when/if to retry requests. + If None is specified, requests will not be retried. + :type retry: google.api_core.retry.Retry + :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud + Platform. Defaults to ``google_cloud_default``. + :type gcp_conn_id: str + """ + + # [START gcp_video_intelligence_detect_explicit_content_template_fields] + template_fields = ("input_uri", "output_uri", "gcp_conn_id") + # [END gcp_video_intelligence_detect_explicit_content_template_fields] + + def __init__( + self, + input_uri, + output_uri=None, + input_content=None, + video_context=None, + location=None, + retry=None, + gcp_conn_id="google_cloud_default", + *args, + **kwargs + ): + super(CloudVideoIntelligenceDetectVideoExplicitContentOperator, self).__init__(*args, **kwargs) + self.input_uri = input_uri + self.output_uri = output_uri + self.input_content = input_content + self.video_context = video_context + self.location = location + self.retry = retry + self.gcp_conn_id = gcp_conn_id + + def execute(self, context): + hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id) + operation = hook.annotate_video( + input_uri=self.input_uri, + input_content=self.input_content, + video_context=self.video_context, + location=self.location, + retry=self.retry, + features=[enums.Feature.EXPLICIT_CONTENT_DETECTION], + ) + self.log.info("Processing video for explicit content annotations") + result = MessageToDict(operation.result()) + self.log.info("Finished processing.") + return result + + +class CloudVideoIntelligenceDetectVideoShotsOperator(BaseOperator): + """ + Performs video annotation, annotating video shots. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CloudVideoIntelligenceDetectVideoShotsOperator` + + :param input_uri: Input video location. Currently, only Google Cloud Storage URIs are supported, + which must be specified in the following format: ``gs://bucket-id/object-id``. + :type input_uri: str + :param input_content: The video data bytes. + If unset, the input video(s) should be specified via ``input_uri``. + If set, ``input_uri`` should be unset. + :type input_content: bytes + :param output_uri: Optional, location where the output (in JSON format) should be stored. Currently, only + Google Cloud Storage URIs are supported, which must be specified in the following format: + ``gs://bucket-id/object-id``. + :type output_uri: str + :param video_context: Optional, Additional video context and/or feature-specific parameters. + :type video_context: dict or google.cloud.videointelligence_v1.types.VideoContext + :param location: Optional, cloud region where annotation should take place. Supported cloud regions: + us-east1, us-west1, europe-west1, asia-east1. If no region is specified, a region will be determined + based on video file location. + :type location: str + :param retry: Retry object used to determine when/if to retry requests. + If None is specified, requests will not be retried. + :type retry: google.api_core.retry.Retry + :param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud + Platform. Defaults to ``google_cloud_default``. + :type gcp_conn_id: str + """ + + # [START gcp_video_intelligence_detect_video_shots_template_fields] + template_fields = ("input_uri", "output_uri", "gcp_conn_id") + # [END gcp_video_intelligence_detect_video_shots_template_fields] + + def __init__( + self, + input_uri, + output_uri=None, + input_content=None, + video_context=None, + location=None, + retry=None, + gcp_conn_id="google_cloud_default", + *args, + **kwargs + ): + super(CloudVideoIntelligenceDetectVideoShotsOperator, self).__init__(*args, **kwargs) + self.input_uri = input_uri + self.output_uri = output_uri + self.input_content = input_content + self.video_context = video_context + self.location = location + self.retry = retry + self.gcp_conn_id = gcp_conn_id + + def execute(self, context): + hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id) + operation = hook.annotate_video( + input_uri=self.input_uri, + input_content=self.input_content, + video_context=self.video_context, + location=self.location, + retry=self.retry, + features=[enums.Feature.SHOT_CHANGE_DETECTION], + ) + self.log.info("Processing video for video shots annotations") + result = MessageToDict(operation.result()) + self.log.info("Finished processing.") + return result diff --git a/docs/howto/operator/gcp/video.rst b/docs/howto/operator/gcp/video.rst new file mode 100644 index 0000000000..cf95c0059f --- /dev/null +++ b/docs/howto/operator/gcp/video.rst @@ -0,0 +1,199 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Google Cloud Video Intelligence Operators +========================================= + +.. contents:: + :depth: 1 + :local: + +.. _howto/operator:CloudVideoIntelligenceDetectVideoLabelsOperator: + +CloudVideoIntelligenceDetectVideoLabelsOperator +----------------------------------------------- + +Performs video annotation, annotating video labels. + +For parameter definition, take a look at +:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoLabelsOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :start-after: [START howto_operator_video_intelligence_os_args] + :end-before: [END howto_operator_video_intelligence_os_args] + +Input uri is an uri to a file in Google Cloud Storage + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :start-after: [START howto_operator_video_intelligence_other_args] + :end-before: [END howto_operator_video_intelligence_other_args] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_video_intelligence_detect_labels] + :end-before: [END howto_operator_video_intelligence_detect_labels] + +You can use the annotation output via Xcom: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_video_intelligence_detect_labels_result] + :end-before: [END howto_operator_video_intelligence_detect_labels_result] + +Templating +"""""""""" + +.. literalinclude:: ../../../../airflow/contrib/operators/gcp_video_intelligence_operator.py + :language: python + :dedent: 4 + :start-after: [START gcp_video_intelligence_detect_labels_template_fields] + :end-before: [END gcp_video_intelligence_detect_labels_template_fields] + +.. _howto/operator:CloudVideoIntelligenceDetectVideoExplicitContentOperator: + +More information +"""""""""""""""" + +Note: The duration of video annotation operation is equal or longer than the annotation video itself. + + +CloudVideoIntelligenceDetectVideoExplicitContentOperator +-------------------------------------------------------- + +Performs video annotation, annotating explicit content. + +For parameter definition, take a look at +:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoExplicitContentOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :start-after: [START howto_operator_video_intelligence_os_args] + :end-before: [END howto_operator_video_intelligence_os_args] + +Input uri is an uri to a file in Google Cloud Storage + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :start-after: [START howto_operator_video_intelligence_other_args] + :end-before: [END howto_operator_video_intelligence_other_args] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_video_intelligence_detect_explicit_content] + :end-before: [END howto_operator_video_intelligence_detect_explicit_content] + +You can use the annotation output via Xcom: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_video_intelligence_detect_explicit_content_result] + :end-before: [END howto_operator_video_intelligence_detect_explicit_content_result] + +Templating +"""""""""" + +.. literalinclude:: ../../../../airflow/contrib/operators/gcp_video_intelligence_operator.py + :language: python + :dedent: 4 + :start-after: [START gcp_video_intelligence_detect_explicit_content_template_fields] + :end-before: [END gcp_video_intelligence_detect_explicit_content_template_fields] + +.. _howto/operator:CloudVideoIntelligenceDetectVideoShotsOperator: + +More information +"""""""""""""""" + +Note: The duration of video annotation operation is equal or longer than the annotation video itself. + + +CloudVideoIntelligenceDetectVideoShotsOperator +---------------------------------------------- + +Performs video annotation, annotating explicit content. + +For parameter definition, take a look at +:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoShotsOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :start-after: [START howto_operator_video_intelligence_os_args] + :end-before: [END howto_operator_video_intelligence_os_args] + +Input uri is an uri to a file in Google Cloud Storage + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :start-after: [START howto_operator_video_intelligence_other_args] + :end-before: [END howto_operator_video_intelligence_other_args] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_video_intelligence_detect_video_shots] + :end-before: [END howto_operator_video_intelligence_detect_video_shots] + +You can use the annotation output via Xcom: + +.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_video_intelligence_detect_video_shots_result] + :end-before: [END howto_operator_video_intelligence_detect_video_shots_result] + +Templating +"""""""""" + +.. literalinclude:: ../../../../airflow/contrib/operators/gcp_video_intelligence_operator.py + :language: python + :dedent: 4 + :start-after: [START gcp_video_intelligence_detect_video_shots_template_fields] + :end-before: [END gcp_video_intelligence_detect_video_shots_template_fields] + +More information +"""""""""""""""" + +Note: The duration of video annotation operation is equal or longer than the annotation video itself. diff --git a/docs/integration.rst b/docs/integration.rst index 1f8394a619..1b7bccbb1f 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -689,6 +689,18 @@ Cloud Translate Text Operators Translate a string or list of strings. +Cloud Video Intelligence +'''''''''''''''''''''''' + +:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoLabelsOperator` + Performs video annotation, annotating video labels. +:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoExplicitContentOperator` + Performs video annotation, annotating explicit content. +:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoShotsOperator` + Performs video annotation, annotating video shots. + +They also use :class:`airflow.contrib.hooks.gcp_video_intelligence_hook.CloudVideoIntelligenceHook` to communicate with Google Cloud Platform. + Google Kubernetes Engine '''''''''''''''''''''''' diff --git a/setup.py b/setup.py index ce7aee8a65..53d258be23 100644 --- a/setup.py +++ b/setup.py @@ -192,6 +192,7 @@ gcp = [ 'google-cloud-spanner>=1.7.1', 'google-cloud-storage~=1.14', 'google-cloud-translate>=1.3.3', + 'google-cloud-videointelligence>=1.7.0', 'google-cloud-vision>=0.35.2', 'google-cloud-texttospeech>=0.4.0', 'google-cloud-speech>=0.36.3', diff --git a/tests/contrib/hooks/test_gcp_video_intelligence_hook.py b/tests/contrib/hooks/test_gcp_video_intelligence_hook.py new file mode 100644 index 0000000000..3ee837365b --- /dev/null +++ b/tests/contrib/hooks/test_gcp_video_intelligence_hook.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest + +from airflow.contrib.hooks.gcp_video_intelligence_hook import CloudVideoIntelligenceHook +from google.cloud.videointelligence_v1 import enums + +from tests.contrib.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id +from tests.compat import mock + + +INPUT_URI = "gs://bucket-name/input-file" +OUTPUT_URI = "gs://bucket-name/output-file" + +FEATURES = [enums.Feature.LABEL_DETECTION] + +ANNOTATE_VIDEO_RESPONSE = {'test': 'test'} + + +class CloudVideoIntelligenceHookTestCase(unittest.TestCase): + def setUp(self): + with mock.patch( + "airflow.contrib.hooks.gcp_video_intelligence_hook.CloudVideoIntelligenceHook.__init__", + new=mock_base_gcp_hook_default_project_id, + ): + self.hook = CloudVideoIntelligenceHook(gcp_conn_id="test") + + @mock.patch("airflow.contrib.hooks.gcp_video_intelligence_hook.CloudVideoIntelligenceHook.get_conn") + def test_annotate_video(self, get_conn): + # Given + annotate_video_method = get_conn.return_value.annotate_video + get_conn.return_value.annotate_video.return_value = ANNOTATE_VIDEO_RESPONSE + + # When + result = self.hook.annotate_video(input_uri=INPUT_URI, features=FEATURES) + + # Then + self.assertIs(result, ANNOTATE_VIDEO_RESPONSE) + annotate_video_method.assert_called_once_with( + input_uri=INPUT_URI, + input_content=None, + features=FEATURES, + video_context=None, + output_uri=None, + location_id=None, + retry=None, + timeout=None, + metadata=None, + ) + + @mock.patch("airflow.contrib.hooks.gcp_video_intelligence_hook.CloudVideoIntelligenceHook.get_conn") + def test_annotate_video_with_output_uri(self, get_conn): + # Given + annotate_video_method = get_conn.return_value.annotate_video + get_conn.return_value.annotate_video.return_value = ANNOTATE_VIDEO_RESPONSE + + # When + result = self.hook.annotate_video(input_uri=INPUT_URI, output_uri=OUTPUT_URI, features=FEATURES) + + # Then + self.assertIs(result, ANNOTATE_VIDEO_RESPONSE) + annotate_video_method.assert_called_once_with( + input_uri=INPUT_URI, + output_uri=OUTPUT_URI, + input_content=None, + features=FEATURES, + video_context=None, + location_id=None, + retry=None, + timeout=None, + metadata=None, + ) diff --git a/tests/contrib/operators/test_gcp_video_intelligence_operator.py b/tests/contrib/operators/test_gcp_video_intelligence_operator.py new file mode 100644 index 0000000000..f5e051281f --- /dev/null +++ b/tests/contrib/operators/test_gcp_video_intelligence_operator.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from google.cloud.videointelligence_v1 import enums +from google.cloud.videointelligence_v1.proto.video_intelligence_pb2 import AnnotateVideoResponse +from airflow.contrib.operators.gcp_video_intelligence_operator import ( + CloudVideoIntelligenceDetectVideoLabelsOperator, + CloudVideoIntelligenceDetectVideoShotsOperator, + CloudVideoIntelligenceDetectVideoExplicitContentOperator, +) +from tests.compat import mock + +PROJECT_ID = "project-id" +GCP_CONN_ID = "gcp-conn-id" +CONFIG = {"encoding": "LINEAR16"} +AUDIO = {"uri": "gs://bucket/object"} + +INPUT_URI = "gs://test-bucket//test-video.mp4" + + +class CloudVideoIntelligenceOperatorsTestCase(unittest.TestCase): + @mock.patch("airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceHook") + def test_detect_video_labels_green_path(self, mock_hook): + + mocked_operation = mock.Mock() + mocked_operation.result = mock.Mock(return_value=AnnotateVideoResponse(annotation_results=[])) + mock_hook.return_value.annotate_video.return_value = mocked_operation + + CloudVideoIntelligenceDetectVideoLabelsOperator( + input_uri=INPUT_URI, task_id="id", gcp_conn_id=GCP_CONN_ID + ).execute(context={"task_instance": mock.Mock()}) + + mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID) + mock_hook.return_value.annotate_video.assert_called_once_with( + input_uri=INPUT_URI, + features=[enums.Feature.LABEL_DETECTION], + input_content=None, + video_context=None, + location=None, + retry=None, + ) + + @mock.patch("airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceHook") + def test_detect_video_explicit_content_green_path(self, mock_hook): + mocked_operation = mock.Mock() + mocked_operation.result = mock.Mock(return_value=AnnotateVideoResponse(annotation_results=[])) + mock_hook.return_value.annotate_video.return_value = mocked_operation + + CloudVideoIntelligenceDetectVideoExplicitContentOperator( + input_uri=INPUT_URI, task_id="id", gcp_conn_id=GCP_CONN_ID + ).execute(context={"task_instance": mock.Mock()}) + + mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID) + mock_hook.return_value.annotate_video.assert_called_once_with( + input_uri=INPUT_URI, + features=[enums.Feature.EXPLICIT_CONTENT_DETECTION], + input_content=None, + video_context=None, + location=None, + retry=None, + ) + + @mock.patch("airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceHook") + def test_detect_video_shots_green_path(self, mock_hook): + mocked_operation = mock.Mock() + mocked_operation.result = mock.Mock(return_value=AnnotateVideoResponse(annotation_results=[])) + mock_hook.return_value.annotate_video.return_value = mocked_operation + + CloudVideoIntelligenceDetectVideoShotsOperator( + input_uri=INPUT_URI, task_id="id", gcp_conn_id=GCP_CONN_ID + ).execute(context={"task_instance": mock.Mock()}) + + mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID) + mock_hook.return_value.annotate_video.assert_called_once_with( + input_uri=INPUT_URI, + features=[enums.Feature.SHOT_CHANGE_DETECTION], + input_content=None, + video_context=None, + location=None, + retry=None, + ) diff --git a/tests/contrib/operators/test_gcp_video_intelligence_operator_system.py b/tests/contrib/operators/test_gcp_video_intelligence_operator_system.py new file mode 100644 index 0000000000..44be698463 --- /dev/null +++ b/tests/contrib/operators/test_gcp_video_intelligence_operator_system.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest + +from tests.contrib.operators.test_gcp_video_intelligence_operator_system_helper import ( + GCPVideoIntelligenceHelper, +) +from tests.contrib.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, DagGcpSystemTestCase +from tests.contrib.utils.gcp_authenticator import GCP_AI_KEY + + +@unittest.skipIf(DagGcpSystemTestCase.skip_check(GCP_AI_KEY), SKIP_TEST_WARNING) +class CloudVideoIntelligenceExampleDagsTest(DagGcpSystemTestCase): + def __init__(self, method_name="runTest"): + super(CloudVideoIntelligenceExampleDagsTest, self).__init__( + method_name, dag_id="example_gcp_video_intelligence", gcp_key=GCP_AI_KEY + ) + self.helper = GCPVideoIntelligenceHelper() + + def setUp(self): + self.gcp_authenticator.gcp_authenticate() + try: + self.helper.create_bucket() + self.gcp_authenticator.gcp_revoke_authentication() + finally: + pass + super(CloudVideoIntelligenceExampleDagsTest, self).setUp() + + def tearDown(self): + self.gcp_authenticator.gcp_authenticate() + try: + self.helper.delete_bucket() + finally: + self.gcp_authenticator.gcp_revoke_authentication() + super(CloudVideoIntelligenceExampleDagsTest, self).tearDown() + + def test_run_example_dag_spanner(self): + self._run_dag() diff --git a/tests/contrib/operators/test_gcp_video_intelligence_operator_system_helper.py b/tests/contrib/operators/test_gcp_video_intelligence_operator_system_helper.py new file mode 100755 index 0000000000..4fda27af07 --- /dev/null +++ b/tests/contrib/operators/test_gcp_video_intelligence_operator_system_helper.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os + +import argparse + +from tests.contrib.utils.gcp_authenticator import GcpAuthenticator, GCP_AI_KEY +from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") +GCP_BUCKET_NAME = os.environ.get("GCP_VIDEO_INTELLIGENCE_BUCKET_NAME", "test-bucket-name") +GCP_VIDEO_SOURCE_URL = os.environ.get("GCP_VIDEO_INTELLIGENCE_VIDEO_SOURCE_URL", "http://nasa.gov") + + +class GCPVideoIntelligenceHelper(LoggingCommandExecutor): + def create_bucket(self): + self.execute_cmd( + [ + "gsutil", + "mb", + "-p", + GCP_PROJECT_ID, + "-c", + "regional", + "-l", + "europe-north1", + "gs://%s/" % GCP_BUCKET_NAME, + ] + ) + + self.execute_cmd( + cmd=[ + "bash", + "-c", + "curl %s | gsutil cp - gs://%s/video.mp4" % (GCP_VIDEO_SOURCE_URL, GCP_BUCKET_NAME) + ] + ) + + def delete_bucket(self): + self.execute_cmd(["gsutil", "rm", "-r", "gs://%s/" % GCP_BUCKET_NAME]) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Create or delete bucket with test file for system tests.") + parser.add_argument( + "--action", + dest="action", + required=True, + choices=("create-bucket", "delete-bucket", "before-tests", "after-tests"), + ) + action = parser.parse_args().action + + helper = GCPVideoIntelligenceHelper() + gcp_authenticator = GcpAuthenticator(GCP_AI_KEY) + helper.log.info("Starting action: {}".format(action)) + + gcp_authenticator.gcp_store_authentication() + try: + gcp_authenticator.gcp_authenticate() + if action == "before-tests": + pass + elif action == "after-tests": + pass + elif action == "create-bucket": + helper.create_bucket() + elif action == "delete-bucket": + helper.delete_bucket() + else: + raise Exception("Unknown action: %s", action) + finally: + gcp_authenticator.gcp_restore_authentication() + + helper.log.info("Finishing action: %s", action)