[AIRFLOW-4971] Add Google Display & Video 360 integration (#6170)

This commit is contained in:
Tomek 2019-10-22 11:51:40 +02:00 коммит произвёл Kamil Breguła
Родитель 4e661f535d
Коммит 16d7accb22
12 изменённых файлов: 1219 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that shows how to use DisplayVideo.
"""
from airflow import models
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
)
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360ReportSensor,
)
from airflow.utils import dates
# [START howto_display_video_env_variables]
BUCKET = "gs://test-display-video-bucket"
REPORT = {
"kind": "doubleclickbidmanager#query",
"metadata": {
"title": "Polidea Test Report",
"dataRange": "LAST_7_DAYS",
"format": "CSV",
"sendNotification": False,
},
"params": {
"type": "TYPE_GENERAL",
"groupBys": ["FILTER_DATE", "FILTER_PARTNER"],
"filters": [{"type": "FILTER_PARTNER", "value": 1486931}],
"metrics": ["METRIC_IMPRESSIONS", "METRIC_CLICKS"],
"includeInviteData": True,
},
"schedule": {"frequency": "ONE_TIME"},
}
PARAMS = {"dataRange": "LAST_14_DAYS", "timezoneCode": "America/New_York"}
# [END howto_display_video_env_variables]
default_args = {"start_date": dates.days_ago(1)}
with models.DAG(
"example_display_video",
default_args=default_args,
schedule_interval=None, # Override to match your needs
) as dag:
# [START howto_google_display_video_createquery_report_operator]
create_report = GoogleDisplayVideo360CreateReportOperator(
body=REPORT, task_id="create_report"
)
report_id = "{{ task_instance.xcom_pull('create_report', key='report_id') }}"
# [END howto_google_display_video_createquery_report_operator]
# [START howto_google_display_video_runquery_report_operator]
run_report = GoogleDisplayVideo360RunReportOperator(
report_id=report_id, params=PARAMS, task_id="run_report"
)
# [END howto_google_display_video_runquery_report_operator]
# [START howto_google_display_video_wait_report_operator]
wait_for_report = GoogleDisplayVideo360ReportSensor(
task_id="wait_for_report", report_id=report_id
)
# [END howto_google_display_video_wait_report_operator]
# [START howto_google_display_video_getquery_report_operator]
get_report = GoogleDisplayVideo360DownloadReportOperator(
report_id=report_id,
task_id="get_report",
bucket_name=BUCKET,
report_name="test1.csv",
)
# [END howto_google_display_video_getquery_report_operator]
# [START howto_google_display_video_deletequery_report_operator]
delete_report = GoogleDisplayVideo360DeleteReportOperator(
report_id=report_id, task_id="delete_report"
)
# [END howto_google_display_video_deletequery_report_operator]
create_report >> run_report >> wait_for_report >> get_report >> delete_report

Просмотреть файл

@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains Google DisplayVideo hook.
"""
from typing import Any, Dict, List, Optional
from googleapiclient.discovery import Resource, build
from airflow.gcp.hooks.base import GoogleCloudBaseHook
class GoogleDisplayVideo360Hook(GoogleCloudBaseHook):
"""
Hook for Google Display & Video 360.
"""
_conn = None # type: Optional[Any]
def __init__(
self,
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
) -> None:
super().__init__(gcp_conn_id, delegate_to)
self.api_version = api_version
def get_conn(self) -> Resource:
"""
Retrieves connection to DisplayVideo.
"""
if not self._conn:
http_authorized = self._authorize()
self._conn = build(
"doubleclickbidmanager",
self.api_version,
http=http_authorized,
cache_discovery=False,
)
return self._conn
def create_query(self, query: Dict[str, Any]) -> Dict:
"""
Creates a query.
:param query: Query object to be passed to request body.
:type query: Dict[str, Any]
"""
response = (
self.get_conn() # pylint:disable=no-member
.queries()
.createquery(body=query)
.execute(num_retries=self.num_retries)
)
return response
def delete_query(self, query_id: str) -> None:
"""
Deletes a stored query as well as the associated stored reports.
:param query_id: Query ID to delete.
:type query_id: str
"""
(
self.get_conn() # pylint:disable=no-member
.queries()
.deletequery(queryId=query_id)
.execute(num_retries=self.num_retries)
)
def get_query(self, query_id: str) -> Dict:
"""
Retrieves a stored query.
:param query_id: Query ID to retrieve.
:type query_id: str
"""
response = (
self.get_conn() # pylint:disable=no-member
.queries()
.getquery(queryId=query_id)
.execute(num_retries=self.num_retries)
)
return response
def list_queries(self, ) -> List[Dict]:
"""
Retrieves stored queries.
"""
response = (
self.get_conn() # pylint:disable=no-member
.queries()
.listqueries()
.execute(num_retries=self.num_retries)
)
return response.get('queries', [])
def run_query(self, query_id: str, params: Dict[str, Any]) -> None:
"""
Runs a stored query to generate a report.
:param query_id: Query ID to run.
:type query_id: str
:param params: Parameters for the report.
:type params: Dict[str, Any]
"""
(
self.get_conn() # pylint:disable=no-member
.queries()
.runquery(queryId=query_id, body=params)
.execute(num_retries=self.num_retries)
)

Просмотреть файл

@ -0,0 +1,339 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains Google DisplayVideo operators.
"""
import shutil
import tempfile
import urllib.request
from typing import Any, Dict, Optional
from urllib.parse import urlparse
from airflow import AirflowException
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.models.baseoperator import BaseOperator
from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
from airflow.utils.decorators import apply_defaults
class GoogleDisplayVideo360CreateReportOperator(BaseOperator):
"""
Creates a query.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleDisplayVideo360CreateReportOperator`
.. seealso::
Check also the official API docs:
`https://developers.google.com/bid-manager/v1/queries/createquery`
:param body: Report object passed to the request's body as described here:
https://developers.google.com/bid-manager/v1/queries#resource
:type body: Dict[str, Any]
:param api_version: The version of the api that will be requested for example 'v3'.
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the
request must have domain-wide delegation enabled.
:type delegate_to: str
"""
template_fields = ("body",)
template_ext = (".json",)
@apply_defaults
def __init__(
self,
body: Dict[str, Any],
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.body = body
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
def execute(self, context: Dict):
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
api_version=self.api_version,
)
self.log.info("Creating Display & Video 360 report.")
response = hook.create_query(query=self.body)
report_id = response["queryId"]
self.xcom_push(context, key="report_id", value=report_id)
self.log.info("Created report with ID: %s", report_id)
return response
class GoogleDisplayVideo360DeleteReportOperator(BaseOperator):
"""
Deletes a stored query as well as the associated stored reports.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleDisplayVideo360DeleteReportOperator`
.. seealso::
Check also the official API docs:
`https://developers.google.com/bid-manager/v1/queries/deletequery`
:param report_id: Report ID to delete.
:type report_id: str
:param report_name: Name of the report to delete.
:type report_name: str
:param api_version: The version of the api that will be requested for example 'v3'.
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the
request must have domain-wide delegation enabled.
:type delegate_to: str
"""
template_fields = ("report_id",)
@apply_defaults
def __init__(
self,
report_id: Optional[str] = None,
report_name: Optional[str] = None,
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.report_id = report_id
self.report_name = report_name
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
if report_name and report_id:
raise AirflowException("Use only one value - `report_name` or `report_id`.")
if not (report_name or report_id):
raise AirflowException(
"Provide one of the values: `report_name` or `report_id`."
)
def execute(self, context: Dict):
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
api_version=self.api_version,
)
if self.report_id:
reports_ids_to_delete = [self.report_id]
else:
reports = hook.list_queries()
reports_ids_to_delete = [
report["queryId"]
for report in reports
if report["metadata"]["title"] == self.report_name
]
for report_id in reports_ids_to_delete:
self.log.info("Deleting report with id: %s", report_id)
hook.delete_query(query_id=report_id)
self.log.info("Report deleted.")
class GoogleDisplayVideo360DownloadReportOperator(BaseOperator):
"""
Retrieves a stored query.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleDisplayVideo360DownloadReportOperator`
.. seealso::
Check also the official API docs:
`https://developers.google.com/bid-manager/v1/queries/getquery`
:param report_id: Report ID to retrieve.
:type report_id: str
:param bucket_name: The bucket to upload to.
:type bucket_name: str
:param report_name: The report name to set when uploading the local file.
:type report_name: str
:param chunk_size: File will be downloaded in chunks of this many bytes.
:type chunk_size: int
:param gzip: Option to compress local file or file data for upload
:type gzip: bool
:param api_version: The version of the api that will be requested for example 'v3'.
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the
request must have domain-wide delegation enabled.
:type delegate_to: str
"""
template_fields = ("report_id", "bucket_name", "report_name")
@apply_defaults
def __init__(
self,
report_id: str,
bucket_name: str,
report_name: Optional[str] = None,
gzip: bool = True,
chunk_size: int = 10 * 1024 * 1024,
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.report_id = report_id
self.chunk_size = chunk_size
self.gzip = gzip
self.bucket_name = self._set_bucket_name(bucket_name)
self.report_name = report_name
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
def _resolve_file_name(self, name: str) -> str:
csv = ".csv"
gzip = ".gz"
if not name.endswith(csv):
name += csv
if self.gzip:
name += gzip
return name
@staticmethod
def _set_bucket_name(name: str) -> str:
bucket = name if not name.startswith("gs://") else name[5:]
return bucket.strip("/")
def execute(self, context: Dict):
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
api_version=self.api_version,
)
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
resource = hook.get_query(query_id=self.report_id)
# Check if report is ready
if resource["metadata"]["running"]:
raise AirflowException('Report {} is still running'.format(self.report_id))
# If no custom report_name provided, use DV360 name
file_url = resource["metadata"]["googleCloudStoragePathForLatestReport"]
report_name = self.report_name or urlparse(file_url).path.split('/')[2]
report_name = self._resolve_file_name(report_name)
# Download the report
self.log.info("Starting downloading report %s", self.report_id)
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
with urllib.request.urlopen(file_url) as response:
shutil.copyfileobj(response, temp_file, length=self.chunk_size)
temp_file.flush()
# Upload the local file to bucket
gcs_hook.upload(
bucket_name=self.bucket_name,
object_name=report_name,
gzip=self.gzip,
filename=temp_file.name,
mime_type="text/csv",
)
self.log.info(
"Report %s was saved in bucket %s as %s.",
self.report_id,
self.bucket_name,
report_name,
)
self.xcom_push(context, key='report_name', value=report_name)
class GoogleDisplayVideo360RunReportOperator(BaseOperator):
"""
Runs a stored query to generate a report.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleDisplayVideo360RunReportOperator`
.. seealso::
Check also the official API docs:
`https://developers.google.com/bid-manager/v1/queries/runquery`
:param report_id: Report ID to run.
:type report_id: str
:param params: Parameters for running a report as described here:
https://developers.google.com/bid-manager/v1/queries/runquery
:type params: Dict[str, Any]
:param api_version: The version of the api that will be requested for example 'v3'.
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the
request must have domain-wide delegation enabled.
:type delegate_to: str
"""
template_fields = ("report_id", "params")
@apply_defaults
def __init__(
self,
report_id: str,
params: Dict[str, Any],
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.report_id = report_id
self.params = params
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
def execute(self, context: Dict):
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
api_version=self.api_version,
)
self.log.info(
"Running report %s with the following params:\n %s",
self.report_id,
self.params,
)
hook.run_query(query_id=self.report_id, params=self.params)

Просмотреть файл

@ -0,0 +1,74 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Sensor for detecting the completion of DV360 reports.
"""
from typing import Dict, Optional
from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
class GoogleDisplayVideo360ReportSensor(BaseSensorOperator):
"""
Sensor for detecting the completion of DV360 reports.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GoogleDisplayVideo360ReportSensor`
:param report_id: Report ID to delete.
:type report_id: str
:param api_version: The version of the api that will be requested for example 'v3'.
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the
request must have domain-wide delegation enabled.
:type delegate_to: str
"""
template_fields = ("report_id",)
def __init__(
self,
report_id: str,
api_version: str = "v1",
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.report_id = report_id
self.api_version = api_version
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
def poke(self, context: Dict) -> bool:
hook = GoogleDisplayVideo360Hook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to,
api_version=self.api_version,
)
response = hook.get_query(query_id=self.report_id)
if response and not response.get("metadata", {}).get("running"):
return True
return False

Просмотреть файл

@ -0,0 +1,121 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Google Display & Video 360 Operators
=======================================
`Google Display & Video 360 <https://marketingplatform.google.com/about/display-video-360/>`__ has the end-to-end
campaign management features you need.
.. contents::
:depth: 1
:local:
Prerequisite Tasks
^^^^^^^^^^^^^^^^^^
.. include:: _partials/prerequisite_tasks.rst
.. _howto/operator:GoogleDisplayVideo360CreateReportOperator:
Creating a report
^^^^^^^^^^^^^^^^^
To create Display&Video 360 report use
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateReportOperator`.
.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
:language: python
:dedent: 4
:start-after: [START howto_google_display_video_createquery_report_operator]
:end-before: [END howto_google_display_video_createquery_report_operator]
Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360CreateReportOperator`
parameters which allow you to dynamically determine values.
The result is saved to :ref:`XCom <concepts:xcom>`, which allows the result to be used by other operators.
.. _howto/operator:GoogleDisplayVideo360DeleteReportOperator:
Deleting a report
^^^^^^^^^^^^^^^^^
To delete Display&Video 360 report use
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DeleteReportOperator`.
.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
:language: python
:dedent: 4
:start-after: [START howto_google_display_video_deletequery_report_operator]
:end-before: [END howto_google_display_video_deletequery_report_operator]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DeleteReportOperator`
parameters which allow you to dynamically determine values.
.. _howto/operator:GoogleDisplayVideo360ReportSensor:
Waiting for report
^^^^^^^^^^^^^^^^^^
To delete Display&Video 360 report use
:class:`~airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360ReportSensor`.
.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
:language: python
:dedent: 4
:start-after: [START howto_google_display_video_wait_report_operator]
:end-before: [END howto_google_display_video_wait_report_operator]
Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360ReportSensor`
parameters which allow you to dynamically determine values.
.. _howto/operator:GoogleDisplayVideo360DownloadReportOperator:
Downloading a report
^^^^^^^^^^^^^^^^^^^^
To download a report to GCS bucket use
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DownloadReportOperator`.
.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
:language: python
:dedent: 4
:start-after: [START howto_google_display_video_getquery_report_operator]
:end-before: [END howto_google_display_video_getquery_report_operator]
Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360DownloadReportOperator`
parameters which allow you to dynamically determine values.
.. _howto/operator:GoogleDisplayVideo360RunReportOperator:
Running a report
^^^^^^^^^^^^^^^^
To run Display&Video 360 report use
:class:`~airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360RunReportOperator`.
.. exampleinclude:: ../../../../airflow/providers/google/marketing_platform/example_dags/example_display_video.py
:language: python
:dedent: 4
:start-after: [START howto_google_display_video_runquery_report_operator]
:end-before: [END howto_google_display_video_runquery_report_operator]
Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.marketing_platform.operators.display_video.GoogleDisplayVideo360RunReportOperator`
parameters which allow you to dynamically determine values.

Просмотреть файл

@ -855,6 +855,12 @@ These integrations allow you to perform various operations within various servic
- :mod:`airflow.providers.google.marketing_platform.operators.campaign_manager`
- :mod:`airflow.providers.google.marketing_platform.sensors.campaign_manager`
* - `Google Display&Video 360 <https://marketingplatform.google.com/about/display-video-360/>`__
- :doc:`How to use <howto/operator/gcp/display_video>`
- :mod:`airflow.providers.google.marketing_platform.hooks.display_video`
- :mod:`airflow.providers.google.marketing_platform.operators.display_video`
- :mod:`airflow.providers.google.marketing_platform.sensors.display_video`
* - `Google Drive <https://www.google.com/drive/>`__
-
- :mod:`airflow.contrib.hooks.gdrive_hook`

Просмотреть файл

@ -36,6 +36,7 @@ GCP_COMPUTE_KEY = 'gcp_compute.json'
GCP_DATAFLOW_KEY = 'gcp_dataflow.json'
GCP_DATAPROC_KEY = 'gcp_dataproc.json'
GCP_DATASTORE_KEY = 'gcp_datastore.json'
GCP_DISPLAY_VIDEO_KEY = 'google_display_video.json'
GCP_DLP_KEY = 'gcp_dlp.json'
GCP_FUNCTION_KEY = 'gcp_function.json'
GCP_GCS_KEY = 'gcp_gcs.json'

Просмотреть файл

@ -0,0 +1,139 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest import TestCase, mock
from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
from tests.gcp.utils.base_gcp_mock import mock_base_gcp_hook_default_project_id
API_VERSION = "v1"
GCP_CONN_ID = "google_cloud_default"
class TestGoogleDisplayVideo360Hook(TestCase):
def setUp(self):
with mock.patch(
"airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook.__init__",
new=mock_base_gcp_hook_default_project_id,
):
self.hook = GoogleDisplayVideo360Hook(gcp_conn_id=GCP_CONN_ID)
@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook._authorize"
)
@mock.patch("airflow.providers.google.marketing_platform.hooks."
"display_video.build")
def test_gen_conn(self, mock_build, mock_authorize):
result = self.hook.get_conn()
mock_build.assert_called_once_with(
"doubleclickbidmanager",
API_VERSION,
http=mock_authorize.return_value,
cache_discovery=False,
)
self.assertEqual(mock_build.return_value, result)
@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_create_query(self, get_conn_mock):
body = {"body": "test"}
return_value = "TEST"
get_conn_mock.return_value.queries.return_value.createquery.return_value.execute.return_value = (
return_value
)
result = self.hook.create_query(query=body)
get_conn_mock.return_value.queries.return_value.createquery.assert_called_once_with(
body=body
)
self.assertEqual(return_value, result)
@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_delete_query(self, get_conn_mock):
query_id = "QUERY_ID"
return_value = "TEST"
get_conn_mock.return_value.queries.return_value.deletequery.return_value.execute.return_value = (
return_value
)
self.hook.delete_query(query_id=query_id)
get_conn_mock.return_value.queries.return_value.deletequery.assert_called_once_with(
queryId=query_id
)
@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_get_query(self, get_conn_mock):
query_id = "QUERY_ID"
return_value = "TEST"
get_conn_mock.return_value.queries.return_value.getquery.return_value.execute.return_value = (
return_value
)
result = self.hook.get_query(query_id=query_id)
get_conn_mock.return_value.queries.return_value.getquery.assert_called_once_with(
queryId=query_id
)
self.assertEqual(return_value, result)
@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_list_queries(self, get_conn_mock):
queries = ["test"]
return_value = {"queries": queries}
get_conn_mock.return_value.queries.return_value.listqueries.return_value.execute.return_value = (
return_value
)
result = self.hook.list_queries()
get_conn_mock.return_value.queries.return_value.listqueries.assert_called_once_with()
self.assertEqual(queries, result)
@mock.patch(
"airflow.providers.google.marketing_platform.hooks."
"display_video.GoogleDisplayVideo360Hook.get_conn"
)
def test_run_query(self, get_conn_mock):
query_id = "QUERY_ID"
params = {"params": "test"}
self.hook.run_query(query_id=query_id, params=params)
get_conn_mock.return_value.queries.return_value.runquery.assert_called_once_with(
queryId=query_id, body=params
)

Просмотреть файл

@ -0,0 +1,182 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest import TestCase, mock
from airflow.providers.google.marketing_platform.operators.display_video import (
GoogleDisplayVideo360CreateReportOperator, GoogleDisplayVideo360DeleteReportOperator,
GoogleDisplayVideo360DownloadReportOperator, GoogleDisplayVideo360RunReportOperator,
)
API_VERSION = "api_version"
GCP_CONN_ID = "google_cloud_default"
class TestGoogleDisplayVideo360CreateReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GoogleDisplayVideo360CreateReportOperator.xcom_push"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GoogleDisplayVideo360Hook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.BaseOperator"
)
def test_execute(self, mock_base_op, hook_mock, xcom_mock):
body = {"body": "test"}
query_id = "TEST"
hook_mock.return_value.create_query.return_value = {"queryId": query_id}
op = GoogleDisplayVideo360CreateReportOperator(
body=body, api_version=API_VERSION, task_id="test_task"
)
op.execute(context=None)
hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION
)
hook_mock.return_value.create_query.assert_called_once_with(query=body)
xcom_mock.assert_called_once_with(None, key="report_id", value=query_id)
class TestGoogleDisplayVideo360DeleteReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GoogleDisplayVideo360Hook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.BaseOperator"
)
def test_execute(self, mock_base_op, hook_mock):
query_id = "QUERY_ID"
op = GoogleDisplayVideo360DeleteReportOperator(
report_id=query_id, api_version=API_VERSION, task_id="test_task"
)
op.execute(context=None)
hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION
)
hook_mock.return_value.delete_query.assert_called_once_with(query_id=query_id)
class TestGoogleDisplayVideo360GetReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.shutil"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.urllib.request"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.tempfile"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GoogleDisplayVideo360DownloadReportOperator.xcom_push"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GoogleCloudStorageHook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GoogleDisplayVideo360Hook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.BaseOperator"
)
def test_execute(
self,
mock_base_op,
mock_hook,
mock_gcs_hook,
mock_xcom,
mock_temp,
mock_reuqest,
mock_shutil,
):
report_id = "REPORT_ID"
bucket_name = "BUCKET"
report_name = "TEST.csv"
filename = "test"
mock_temp.NamedTemporaryFile.return_value.__enter__.return_value.name = filename
mock_hook.return_value.get_query.return_value = {
"metadata": {
"running": False,
"googleCloudStoragePathForLatestReport": "test",
}
}
op = GoogleDisplayVideo360DownloadReportOperator(
report_id=report_id,
api_version=API_VERSION,
bucket_name=bucket_name,
report_name=report_name,
task_id="test_task",
)
op.execute(context=None)
mock_hook.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION
)
mock_hook.return_value.get_query.assert_called_once_with(query_id=report_id)
mock_gcs_hook.assert_called_once_with(
google_cloud_storage_conn_id=GCP_CONN_ID, delegate_to=None
)
mock_gcs_hook.return_value.upload.assert_called_once_with(
bucket_name=bucket_name,
filename=filename,
gzip=True,
mime_type="text/csv",
object_name=report_name + ".gz",
)
mock_xcom.assert_called_once_with(
None, key="report_name", value=report_name + ".gz"
)
class TestGoogleDisplayVideo360RunReportOperator(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.GoogleDisplayVideo360Hook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.operators."
"display_video.BaseOperator"
)
def test_execute(self, mock_base_op, hook_mock):
report_id = "QUERY_ID"
params = {"param": "test"}
op = GoogleDisplayVideo360RunReportOperator(
report_id=report_id,
params=params,
api_version=API_VERSION,
task_id="test_task",
)
op.execute(context=None)
hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION
)
hook_mock.return_value.run_query.assert_called_once_with(
query_id=report_id, params=params
)

Просмотреть файл

@ -0,0 +1,51 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.utils.base_gcp_system_test_case import SKIP_TEST_WARNING, TestDagGcpSystem
from tests.gcp.utils.gcp_authenticator import GCP_DISPLAY_VIDEO_KEY
from tests.providers.google.marketing_platform.operators.test_display_video_system_helper import (
GcpDisplayVideoSystemTestHelper,
)
# Requires the following scope:
# https://www.googleapis.com/auth/doubleclickbidmanager
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_DISPLAY_VIDEO_KEY), SKIP_TEST_WARNING)
class DisplayVideoSystemTest(TestDagGcpSystem):
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.create_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_bucket()
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
def __init__(self, method_name="runTest"):
super().__init__(
method_name, dag_id="example_display_video", gcp_key=GCP_DISPLAY_VIDEO_KEY
)
self.helper = GcpDisplayVideoSystemTestHelper()
def test_run_example_dag(self):
self._run_dag()

Просмотреть файл

@ -0,0 +1,31 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
BUCKET = "gs://test-display-video-bucket"
class GcpDisplayVideoSystemTestHelper(LoggingCommandExecutor):
def create_bucket(self):
self.execute_cmd(["gsutil", "mb", "gs://{bucket}".format(bucket=BUCKET)])
def delete_bucket(self):
self.execute_cmd(["gsutil", "rm", "-r", "gs://{bucket}".format(bucket=BUCKET)])

Просмотреть файл

@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from unittest import TestCase, mock
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360ReportSensor,
)
API_VERSION = "api_version"
GCP_CONN_ID = "google_cloud_default"
class TestGoogleDisplayVideo360ReportSensor(TestCase):
@mock.patch(
"airflow.providers.google.marketing_platform.sensors."
"display_video.GoogleDisplayVideo360Hook"
)
@mock.patch(
"airflow.providers.google.marketing_platform.sensors."
"display_video.BaseSensorOperator"
)
def test_poke(self, mock_base_op, hook_mock):
report_id = "REPORT_ID"
op = GoogleDisplayVideo360ReportSensor(
report_id=report_id, api_version=API_VERSION, task_id="test_task"
)
op.poke(context=None)
hook_mock.assert_called_once_with(
gcp_conn_id=GCP_CONN_ID, delegate_to=None, api_version=API_VERSION
)
hook_mock.return_value.get_query.assert_called_once_with(query_id=report_id)