[AIRFLOW-4168] Create Google Cloud Video Intelligence Operators (#4985)

This commit is contained in:
Antoni Smoliński 2019-04-27 20:48:37 +02:00 коммит произвёл Jarek Potiuk
Родитель 60b9023ed9
Коммит df18b02d2a
10 изменённых файлов: 1016 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,122 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that demonstrates operators for the Google Cloud Video Intelligence service in the Google
Cloud Platform.
This DAG relies on the following OS environment variables:
* GCP_BUCKET_NAME - Google Cloud Storage bucket where the file exists.
"""
import os
# [START howto_operator_vision_retry_import]
from google.api_core.retry import Retry
# [END howto_operator_vision_retry_import]
import airflow
from airflow import models
from airflow.contrib.operators.gcp_video_intelligence_operator import (
CloudVideoIntelligenceDetectVideoLabelsOperator,
CloudVideoIntelligenceDetectVideoExplicitContentOperator,
CloudVideoIntelligenceDetectVideoShotsOperator,
)
from airflow.operators.bash_operator import BashOperator
default_args = {"start_date": airflow.utils.dates.days_ago(1)}
# [START howto_operator_video_intelligence_os_args]
GCP_BUCKET_NAME = os.environ.get(
"GCP_VIDEO_INTELLIGENCE_BUCKET_NAME", "test-bucket-name"
)
# [END howto_operator_video_intelligence_os_args]
# [START howto_operator_video_intelligence_other_args]
INPUT_URI = "gs://{}/video.mp4".format(GCP_BUCKET_NAME)
# [END howto_operator_video_intelligence_other_args]
with models.DAG(
"example_gcp_video_intelligence",
default_args=default_args,
schedule_interval=None, # Override to match your needs
) as dag:
# [START howto_operator_video_intelligence_detect_labels]
detect_video_label = CloudVideoIntelligenceDetectVideoLabelsOperator(
input_uri=INPUT_URI,
output_uri=None,
video_context=None,
timeout=5,
task_id="detect_video_label",
)
# [END howto_operator_video_intelligence_detect_labels]
# [START howto_operator_video_intelligence_detect_labels_result]
detect_video_label_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('detect_video_label')"
"['annotationResults'][0]['shotLabelAnnotations'][0]['entity']}}",
task_id="detect_video_label_result",
)
# [END howto_operator_video_intelligence_detect_labels_result]
# [START howto_operator_video_intelligence_detect_explicit_content]
detect_video_explicit_content = CloudVideoIntelligenceDetectVideoExplicitContentOperator(
input_uri=INPUT_URI,
output_uri=None,
video_context=None,
retry=Retry(maximum=10.0),
timeout=5,
task_id="detect_video_explicit_content",
)
# [END howto_operator_video_intelligence_detect_explicit_content]
# [START howto_operator_video_intelligence_detect_explicit_content_result]
detect_video_explicit_content_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('detect_video_explicit_content')"
"['annotationResults'][0]['explicitAnnotation']['frames'][0]}}",
task_id="detect_video_explicit_content_result",
)
# [END howto_operator_video_intelligence_detect_explicit_content_result]
# [START howto_operator_video_intelligence_detect_video_shots]
detect_video_shots = CloudVideoIntelligenceDetectVideoShotsOperator(
input_uri=INPUT_URI,
output_uri=None,
video_context=None,
retry=Retry(maximum=10.0),
timeout=5,
task_id="detect_video_shots",
)
# [END howto_operator_video_intelligence_detect_video_shots]
# [START howto_operator_video_intelligence_detect_video_shots_result]
detect_video_shots_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('detect_video_shots')"
"['annotationResults'][0]['shotAnnotations'][0]}}",
task_id="detect_video_shots_result",
)
# [END howto_operator_video_intelligence_detect_video_shots_result]
detect_video_label >> detect_video_label_result
detect_video_explicit_content >> detect_video_explicit_content_result
detect_video_shots >> detect_video_shots_result

Просмотреть файл

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from google.cloud.videointelligence_v1 import VideoIntelligenceServiceClient
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
class CloudVideoIntelligenceHook(GoogleCloudBaseHook):
"""
Hook for Google Cloud Video Intelligence APIs.
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
"""
_conn = None
def __init__(self, gcp_conn_id="google_cloud_default", delegate_to=None):
super(CloudVideoIntelligenceHook, self).__init__(gcp_conn_id, delegate_to)
def get_conn(self):
"""
Returns Gcp Video Intelligence Service client
:rtype: google.cloud.videointelligence_v1.VideoIntelligenceServiceClient
"""
if not self._conn:
self._conn = VideoIntelligenceServiceClient(credentials=self._get_credentials())
return self._conn
def annotate_video(
self,
input_uri=None,
input_content=None,
features=None,
video_context=None,
output_uri=None,
location=None,
retry=None,
timeout=None,
metadata=None,
):
"""
Performs video annotation.
:param input_uri: Input video location. Currently, only Google Cloud Storage URIs are supported,
which must be specified in the following format: ``gs://bucket-id/object-id``.
:type input_uri: str
:param input_content: The video data bytes.
If unset, the input video(s) should be specified via ``input_uri``.
If set, ``input_uri`` should be unset.
:type input_content: bytes
:param features: Requested video annotation features.
:type features: list[google.cloud.videointelligence_v1.VideoIntelligenceServiceClient.enums.Feature]
:param output_uri: Optional, location where the output (in JSON format) should be stored. Currently,
only Google Cloud Storage URIs are supported, which must be specified in the following format:
``gs://bucket-id/object-id``.
:type output_uri: str
:param video_context: Optional, Additional video context and/or feature-specific parameters.
:type video_context: dict or google.cloud.videointelligence_v1.types.VideoContext
:param location: Optional, cloud region where annotation should take place. Supported cloud regions:
us-east1, us-west1, europe-west1, asia-east1.
If no region is specified, a region will be determined based on video file location.
:type location: str
:param retry: Retry object used to determine when/if to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param timeout: Optional, The amount of time, in seconds, to wait for the request to complete.
Note that if retry is specified, the timeout applies to each individual attempt.
:type timeout: float
:param metadata: Optional, Additional metadata that is provided to the method.
:type metadata: seq[tuple[str, str]]
"""
client = self.get_conn()
return client.annotate_video(
input_uri=input_uri,
input_content=input_content,
features=features,
video_context=video_context,
output_uri=output_uri,
location_id=location,
retry=retry,
timeout=timeout,
metadata=metadata,
)

Просмотреть файл

@ -0,0 +1,245 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from google.protobuf.json_format import MessageToDict
from airflow.contrib.hooks.gcp_video_intelligence_hook import CloudVideoIntelligenceHook
from airflow.models import BaseOperator
from google.cloud.videointelligence_v1 import enums
class CloudVideoIntelligenceDetectVideoLabelsOperator(BaseOperator):
"""
Performs video annotation, annotating video labels.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVideoIntelligenceDetectVideoLabelsOperator`.
:param input_uri: Input video location. Currently, only Google Cloud Storage URIs are supported,
which must be specified in the following format: ``gs://bucket-id/object-id``.
:type input_uri: str
:param input_content: The video data bytes.
If unset, the input video(s) should be specified via ``input_uri``.
If set, ``input_uri`` should be unset.
:type input_content: bytes
:param output_uri: Optional, location where the output (in JSON format) should be stored. Currently, only
Google Cloud Storage URIs are supported, which must be specified in the following format:
``gs://bucket-id/object-id``.
:type output_uri: str
:param video_context: Optional, Additional video context and/or feature-specific parameters.
:type video_context: dict or google.cloud.videointelligence_v1.types.VideoContext
:param location: Optional, cloud region where annotation should take place. Supported cloud regions:
us-east1, us-west1, europe-west1, asia-east1. If no region is specified, a region will be determined
based on video file location.
:type location: str
:param retry: Retry object used to determine when/if to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud
Platform. Defaults to ``google_cloud_default``.
:type gcp_conn_id: str
"""
# [START gcp_video_intelligence_detect_labels_template_fields]
template_fields = ("input_uri", "output_uri", "gcp_conn_id")
# [END gcp_video_intelligence_detect_labels_template_fields]
def __init__(
self,
input_uri,
input_content=None,
output_uri=None,
video_context=None,
location=None,
retry=None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
):
super(CloudVideoIntelligenceDetectVideoLabelsOperator, self).__init__(*args, **kwargs)
self.input_uri = input_uri
self.input_content = input_content
self.output_uri = output_uri
self.video_context = video_context
self.location = location
self.retry = retry
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id)
operation = hook.annotate_video(
input_uri=self.input_uri,
input_content=self.input_content,
video_context=self.video_context,
location=self.location,
retry=self.retry,
features=[enums.Feature.LABEL_DETECTION],
)
self.log.info("Processing video for label annotations")
result = MessageToDict(operation.result())
self.log.info("Finished processing.")
return result
class CloudVideoIntelligenceDetectVideoExplicitContentOperator(BaseOperator):
"""
Performs video annotation, annotating explicit content.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVideoIntelligenceDetectVideoExplicitContentOperator`
:param input_uri: Input video location. Currently, only Google Cloud Storage URIs are supported,
which must be specified in the following format: ``gs://bucket-id/object-id``.
:type input_uri: str
:param input_content: The video data bytes.
If unset, the input video(s) should be specified via ``input_uri``.
If set, ``input_uri`` should be unset.
:type input_content: bytes
:param output_uri: Optional, location where the output (in JSON format) should be stored. Currently, only
Google Cloud Storage URIs are supported, which must be specified in the following format:
``gs://bucket-id/object-id``.
:type output_uri: str
:param video_context: Optional, Additional video context and/or feature-specific parameters.
:type video_context: dict or google.cloud.videointelligence_v1.types.VideoContext
:param location: Optional, cloud region where annotation should take place. Supported cloud regions:
us-east1, us-west1, europe-west1, asia-east1. If no region is specified, a region will be determined
based on video file location.
:type location: str
:param retry: Retry object used to determine when/if to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud
Platform. Defaults to ``google_cloud_default``.
:type gcp_conn_id: str
"""
# [START gcp_video_intelligence_detect_explicit_content_template_fields]
template_fields = ("input_uri", "output_uri", "gcp_conn_id")
# [END gcp_video_intelligence_detect_explicit_content_template_fields]
def __init__(
self,
input_uri,
output_uri=None,
input_content=None,
video_context=None,
location=None,
retry=None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
):
super(CloudVideoIntelligenceDetectVideoExplicitContentOperator, self).__init__(*args, **kwargs)
self.input_uri = input_uri
self.output_uri = output_uri
self.input_content = input_content
self.video_context = video_context
self.location = location
self.retry = retry
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id)
operation = hook.annotate_video(
input_uri=self.input_uri,
input_content=self.input_content,
video_context=self.video_context,
location=self.location,
retry=self.retry,
features=[enums.Feature.EXPLICIT_CONTENT_DETECTION],
)
self.log.info("Processing video for explicit content annotations")
result = MessageToDict(operation.result())
self.log.info("Finished processing.")
return result
class CloudVideoIntelligenceDetectVideoShotsOperator(BaseOperator):
"""
Performs video annotation, annotating video shots.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:CloudVideoIntelligenceDetectVideoShotsOperator`
:param input_uri: Input video location. Currently, only Google Cloud Storage URIs are supported,
which must be specified in the following format: ``gs://bucket-id/object-id``.
:type input_uri: str
:param input_content: The video data bytes.
If unset, the input video(s) should be specified via ``input_uri``.
If set, ``input_uri`` should be unset.
:type input_content: bytes
:param output_uri: Optional, location where the output (in JSON format) should be stored. Currently, only
Google Cloud Storage URIs are supported, which must be specified in the following format:
``gs://bucket-id/object-id``.
:type output_uri: str
:param video_context: Optional, Additional video context and/or feature-specific parameters.
:type video_context: dict or google.cloud.videointelligence_v1.types.VideoContext
:param location: Optional, cloud region where annotation should take place. Supported cloud regions:
us-east1, us-west1, europe-west1, asia-east1. If no region is specified, a region will be determined
based on video file location.
:type location: str
:param retry: Retry object used to determine when/if to retry requests.
If None is specified, requests will not be retried.
:type retry: google.api_core.retry.Retry
:param gcp_conn_id: Optional, The connection ID used to connect to Google Cloud
Platform. Defaults to ``google_cloud_default``.
:type gcp_conn_id: str
"""
# [START gcp_video_intelligence_detect_video_shots_template_fields]
template_fields = ("input_uri", "output_uri", "gcp_conn_id")
# [END gcp_video_intelligence_detect_video_shots_template_fields]
def __init__(
self,
input_uri,
output_uri=None,
input_content=None,
video_context=None,
location=None,
retry=None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
):
super(CloudVideoIntelligenceDetectVideoShotsOperator, self).__init__(*args, **kwargs)
self.input_uri = input_uri
self.output_uri = output_uri
self.input_content = input_content
self.video_context = video_context
self.location = location
self.retry = retry
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
hook = CloudVideoIntelligenceHook(gcp_conn_id=self.gcp_conn_id)
operation = hook.annotate_video(
input_uri=self.input_uri,
input_content=self.input_content,
video_context=self.video_context,
location=self.location,
retry=self.retry,
features=[enums.Feature.SHOT_CHANGE_DETECTION],
)
self.log.info("Processing video for video shots annotations")
result = MessageToDict(operation.result())
self.log.info("Finished processing.")
return result

Просмотреть файл

@ -0,0 +1,199 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Google Cloud Video Intelligence Operators
=========================================
.. contents::
:depth: 1
:local:
.. _howto/operator:CloudVideoIntelligenceDetectVideoLabelsOperator:
CloudVideoIntelligenceDetectVideoLabelsOperator
-----------------------------------------------
Performs video annotation, annotating video labels.
For parameter definition, take a look at
:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoLabelsOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:start-after: [START howto_operator_video_intelligence_os_args]
:end-before: [END howto_operator_video_intelligence_os_args]
Input uri is an uri to a file in Google Cloud Storage
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:start-after: [START howto_operator_video_intelligence_other_args]
:end-before: [END howto_operator_video_intelligence_other_args]
Using the operator
""""""""""""""""""
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_labels]
:end-before: [END howto_operator_video_intelligence_detect_labels]
You can use the annotation output via Xcom:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_labels_result]
:end-before: [END howto_operator_video_intelligence_detect_labels_result]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_video_intelligence_operator.py
:language: python
:dedent: 4
:start-after: [START gcp_video_intelligence_detect_labels_template_fields]
:end-before: [END gcp_video_intelligence_detect_labels_template_fields]
.. _howto/operator:CloudVideoIntelligenceDetectVideoExplicitContentOperator:
More information
""""""""""""""""
Note: The duration of video annotation operation is equal or longer than the annotation video itself.
CloudVideoIntelligenceDetectVideoExplicitContentOperator
--------------------------------------------------------
Performs video annotation, annotating explicit content.
For parameter definition, take a look at
:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoExplicitContentOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:start-after: [START howto_operator_video_intelligence_os_args]
:end-before: [END howto_operator_video_intelligence_os_args]
Input uri is an uri to a file in Google Cloud Storage
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:start-after: [START howto_operator_video_intelligence_other_args]
:end-before: [END howto_operator_video_intelligence_other_args]
Using the operator
""""""""""""""""""
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_explicit_content]
:end-before: [END howto_operator_video_intelligence_detect_explicit_content]
You can use the annotation output via Xcom:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_explicit_content_result]
:end-before: [END howto_operator_video_intelligence_detect_explicit_content_result]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_video_intelligence_operator.py
:language: python
:dedent: 4
:start-after: [START gcp_video_intelligence_detect_explicit_content_template_fields]
:end-before: [END gcp_video_intelligence_detect_explicit_content_template_fields]
.. _howto/operator:CloudVideoIntelligenceDetectVideoShotsOperator:
More information
""""""""""""""""
Note: The duration of video annotation operation is equal or longer than the annotation video itself.
CloudVideoIntelligenceDetectVideoShotsOperator
----------------------------------------------
Performs video annotation, annotating explicit content.
For parameter definition, take a look at
:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoShotsOperator`
Arguments
"""""""""
Some arguments in the example DAG are taken from the OS environment variables:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:start-after: [START howto_operator_video_intelligence_os_args]
:end-before: [END howto_operator_video_intelligence_os_args]
Input uri is an uri to a file in Google Cloud Storage
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:start-after: [START howto_operator_video_intelligence_other_args]
:end-before: [END howto_operator_video_intelligence_other_args]
Using the operator
""""""""""""""""""
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_video_shots]
:end-before: [END howto_operator_video_intelligence_detect_video_shots]
You can use the annotation output via Xcom:
.. literalinclude:: ../../../../airflow/contrib/example_dags/example_gcp_video_intelligence.py
:language: python
:dedent: 4
:start-after: [START howto_operator_video_intelligence_detect_video_shots_result]
:end-before: [END howto_operator_video_intelligence_detect_video_shots_result]
Templating
""""""""""
.. literalinclude:: ../../../../airflow/contrib/operators/gcp_video_intelligence_operator.py
:language: python
:dedent: 4
:start-after: [START gcp_video_intelligence_detect_video_shots_template_fields]
:end-before: [END gcp_video_intelligence_detect_video_shots_template_fields]
More information
""""""""""""""""
Note: The duration of video annotation operation is equal or longer than the annotation video itself.

Просмотреть файл

@ -689,6 +689,18 @@ Cloud Translate Text Operators
Translate a string or list of strings.
Cloud Video Intelligence
''''''''''''''''''''''''
:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoLabelsOperator`
Performs video annotation, annotating video labels.
:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoExplicitContentOperator`
Performs video annotation, annotating explicit content.
:class:`airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoShotsOperator`
Performs video annotation, annotating video shots.
They also use :class:`airflow.contrib.hooks.gcp_video_intelligence_hook.CloudVideoIntelligenceHook` to communicate with Google Cloud Platform.
Google Kubernetes Engine
''''''''''''''''''''''''

Просмотреть файл

@ -192,6 +192,7 @@ gcp = [
'google-cloud-spanner>=1.7.1',
'google-cloud-storage~=1.14',
'google-cloud-translate>=1.3.3',
'google-cloud-videointelligence>=1.7.0',
'google-cloud-vision>=0.35.2',
'google-cloud-texttospeech>=0.4.0',
'google-cloud-speech>=0.36.3',

Просмотреть файл

@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest
from airflow.contrib.hooks.gcp_video_intelligence_hook import CloudVideoIntelligenceHook
from google.cloud.videointelligence_v1 import enums
from tests.contrib.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id
from tests.compat import mock
INPUT_URI = "gs://bucket-name/input-file"
OUTPUT_URI = "gs://bucket-name/output-file"
FEATURES = [enums.Feature.LABEL_DETECTION]
ANNOTATE_VIDEO_RESPONSE = {'test': 'test'}
class CloudVideoIntelligenceHookTestCase(unittest.TestCase):
def setUp(self):
with mock.patch(
"airflow.contrib.hooks.gcp_video_intelligence_hook.CloudVideoIntelligenceHook.__init__",
new=mock_base_gcp_hook_default_project_id,
):
self.hook = CloudVideoIntelligenceHook(gcp_conn_id="test")
@mock.patch("airflow.contrib.hooks.gcp_video_intelligence_hook.CloudVideoIntelligenceHook.get_conn")
def test_annotate_video(self, get_conn):
# Given
annotate_video_method = get_conn.return_value.annotate_video
get_conn.return_value.annotate_video.return_value = ANNOTATE_VIDEO_RESPONSE
# When
result = self.hook.annotate_video(input_uri=INPUT_URI, features=FEATURES)
# Then
self.assertIs(result, ANNOTATE_VIDEO_RESPONSE)
annotate_video_method.assert_called_once_with(
input_uri=INPUT_URI,
input_content=None,
features=FEATURES,
video_context=None,
output_uri=None,
location_id=None,
retry=None,
timeout=None,
metadata=None,
)
@mock.patch("airflow.contrib.hooks.gcp_video_intelligence_hook.CloudVideoIntelligenceHook.get_conn")
def test_annotate_video_with_output_uri(self, get_conn):
# Given
annotate_video_method = get_conn.return_value.annotate_video
get_conn.return_value.annotate_video.return_value = ANNOTATE_VIDEO_RESPONSE
# When
result = self.hook.annotate_video(input_uri=INPUT_URI, output_uri=OUTPUT_URI, features=FEATURES)
# Then
self.assertIs(result, ANNOTATE_VIDEO_RESPONSE)
annotate_video_method.assert_called_once_with(
input_uri=INPUT_URI,
output_uri=OUTPUT_URI,
input_content=None,
features=FEATURES,
video_context=None,
location_id=None,
retry=None,
timeout=None,
metadata=None,
)

Просмотреть файл

@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from google.cloud.videointelligence_v1 import enums
from google.cloud.videointelligence_v1.proto.video_intelligence_pb2 import AnnotateVideoResponse
from airflow.contrib.operators.gcp_video_intelligence_operator import (
CloudVideoIntelligenceDetectVideoLabelsOperator,
CloudVideoIntelligenceDetectVideoShotsOperator,
CloudVideoIntelligenceDetectVideoExplicitContentOperator,
)
from tests.compat import mock
PROJECT_ID = "project-id"
GCP_CONN_ID = "gcp-conn-id"
CONFIG = {"encoding": "LINEAR16"}
AUDIO = {"uri": "gs://bucket/object"}
INPUT_URI = "gs://test-bucket//test-video.mp4"
class CloudVideoIntelligenceOperatorsTestCase(unittest.TestCase):
@mock.patch("airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceHook")
def test_detect_video_labels_green_path(self, mock_hook):
mocked_operation = mock.Mock()
mocked_operation.result = mock.Mock(return_value=AnnotateVideoResponse(annotation_results=[]))
mock_hook.return_value.annotate_video.return_value = mocked_operation
CloudVideoIntelligenceDetectVideoLabelsOperator(
input_uri=INPUT_URI, task_id="id", gcp_conn_id=GCP_CONN_ID
).execute(context={"task_instance": mock.Mock()})
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.annotate_video.assert_called_once_with(
input_uri=INPUT_URI,
features=[enums.Feature.LABEL_DETECTION],
input_content=None,
video_context=None,
location=None,
retry=None,
)
@mock.patch("airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceHook")
def test_detect_video_explicit_content_green_path(self, mock_hook):
mocked_operation = mock.Mock()
mocked_operation.result = mock.Mock(return_value=AnnotateVideoResponse(annotation_results=[]))
mock_hook.return_value.annotate_video.return_value = mocked_operation
CloudVideoIntelligenceDetectVideoExplicitContentOperator(
input_uri=INPUT_URI, task_id="id", gcp_conn_id=GCP_CONN_ID
).execute(context={"task_instance": mock.Mock()})
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.annotate_video.assert_called_once_with(
input_uri=INPUT_URI,
features=[enums.Feature.EXPLICIT_CONTENT_DETECTION],
input_content=None,
video_context=None,
location=None,
retry=None,
)
@mock.patch("airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceHook")
def test_detect_video_shots_green_path(self, mock_hook):
mocked_operation = mock.Mock()
mocked_operation.result = mock.Mock(return_value=AnnotateVideoResponse(annotation_results=[]))
mock_hook.return_value.annotate_video.return_value = mocked_operation
CloudVideoIntelligenceDetectVideoShotsOperator(
input_uri=INPUT_URI, task_id="id", gcp_conn_id=GCP_CONN_ID
).execute(context={"task_instance": mock.Mock()})
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID)
mock_hook.return_value.annotate_video.assert_called_once_with(
input_uri=INPUT_URI,
features=[enums.Feature.SHOT_CHANGE_DETECTION],
input_content=None,
video_context=None,
location=None,
retry=None,
)

Просмотреть файл

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.contrib.operators.test_gcp_video_intelligence_operator_system_helper import (
GCPVideoIntelligenceHelper,
)
from tests.contrib.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, DagGcpSystemTestCase
from tests.contrib.utils.gcp_authenticator import GCP_AI_KEY
@unittest.skipIf(DagGcpSystemTestCase.skip_check(GCP_AI_KEY), SKIP_TEST_WARNING)
class CloudVideoIntelligenceExampleDagsTest(DagGcpSystemTestCase):
def __init__(self, method_name="runTest"):
super(CloudVideoIntelligenceExampleDagsTest, self).__init__(
method_name, dag_id="example_gcp_video_intelligence", gcp_key=GCP_AI_KEY
)
self.helper = GCPVideoIntelligenceHelper()
def setUp(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.create_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
finally:
pass
super(CloudVideoIntelligenceExampleDagsTest, self).setUp()
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
try:
self.helper.delete_bucket()
finally:
self.gcp_authenticator.gcp_revoke_authentication()
super(CloudVideoIntelligenceExampleDagsTest, self).tearDown()
def test_run_example_dag_spanner(self):
self._run_dag()

Просмотреть файл

@ -0,0 +1,90 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import argparse
from tests.contrib.utils.gcp_authenticator import GcpAuthenticator, GCP_AI_KEY
from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
GCP_BUCKET_NAME = os.environ.get("GCP_VIDEO_INTELLIGENCE_BUCKET_NAME", "test-bucket-name")
GCP_VIDEO_SOURCE_URL = os.environ.get("GCP_VIDEO_INTELLIGENCE_VIDEO_SOURCE_URL", "http://nasa.gov")
class GCPVideoIntelligenceHelper(LoggingCommandExecutor):
def create_bucket(self):
self.execute_cmd(
[
"gsutil",
"mb",
"-p",
GCP_PROJECT_ID,
"-c",
"regional",
"-l",
"europe-north1",
"gs://%s/" % GCP_BUCKET_NAME,
]
)
self.execute_cmd(
cmd=[
"bash",
"-c",
"curl %s | gsutil cp - gs://%s/video.mp4" % (GCP_VIDEO_SOURCE_URL, GCP_BUCKET_NAME)
]
)
def delete_bucket(self):
self.execute_cmd(["gsutil", "rm", "-r", "gs://%s/" % GCP_BUCKET_NAME])
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Create or delete bucket with test file for system tests.")
parser.add_argument(
"--action",
dest="action",
required=True,
choices=("create-bucket", "delete-bucket", "before-tests", "after-tests"),
)
action = parser.parse_args().action
helper = GCPVideoIntelligenceHelper()
gcp_authenticator = GcpAuthenticator(GCP_AI_KEY)
helper.log.info("Starting action: {}".format(action))
gcp_authenticator.gcp_store_authentication()
try:
gcp_authenticator.gcp_authenticate()
if action == "before-tests":
pass
elif action == "after-tests":
pass
elif action == "create-bucket":
helper.create_bucket()
elif action == "delete-bucket":
helper.delete_bucket()
else:
raise Exception("Unknown action: %s", action)
finally:
gcp_authenticator.gcp_restore_authentication()
helper.log.info("Finishing action: %s", action)