[AIRFLOW-4964] Add BigQuery Data Transfer Hook and Operator (#5769)

* [AIRFLOW-4964] Add BigQuery Data Transfer Hook and Operator
This commit is contained in:
Tomek 2019-09-06 12:48:42 +02:00 коммит произвёл Jarek Potiuk
Родитель 0076e17a91
Коммит 1b1d79c0a6
13 изменённых файлов: 1343 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example Airflow DAG that creates and deletes Bigquery data transfer configurations.
"""
import os
import time
from google.protobuf.json_format import ParseDict
from google.cloud.bigquery_datatransfer_v1.types import TransferConfig
import airflow
from airflow import models
from airflow.gcp.operators.bigquery_dts import (
BigQueryCreateDataTransferOperator,
BigQueryDeleteDataTransferConfigOperator,
BigQueryDataTransferServiceStartTransferRunsOperator,
)
from airflow.gcp.sensors.bigquery_dts import (
BigQueryDataTransferServiceTransferRunSensor,
)
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
BUCKET_URI = os.environ.get(
"GCP_DTS_BUCKET_URI", "gs://cloud-ml-tables-data/bank-marketing.csv"
)
GCP_DTS_BQ_DATASET = os.environ.get("GCP_DTS_BQ_DATASET", "test_dts")
GCP_DTS_BQ_TABLE = os.environ.get("GCP_DTS_BQ_TABLE", "GCS_Test")
# [START howto_bigquery_dts_create_args]
# In the case of Airflow, the customer needs to create a transfer
# config with the automatic scheduling disabled and then trigger
# a transfer run using a specialized Airflow operator
schedule_options = {"disable_auto_scheduling": True}
PARAMS = {
"field_delimiter": ",",
"max_bad_records": "0",
"skip_leading_rows": "1",
"data_path_template": BUCKET_URI,
"destination_table_name_template": GCP_DTS_BQ_TABLE,
"file_format": "CSV",
}
TRANSFER_CONFIG = ParseDict(
{
"destination_dataset_id": GCP_DTS_BQ_DATASET,
"display_name": "GCS Test Config",
"data_source_id": "google_cloud_storage",
"schedule_options": schedule_options,
"params": PARAMS,
},
TransferConfig(),
)
# [END howto_bigquery_dts_create_args]
default_args = {"start_date": airflow.utils.dates.days_ago(1)}
with models.DAG(
"example_gcp_bigquery_dts",
default_args=default_args,
schedule_interval=None, # Override to match your needs
) as dag:
# [START howto_bigquery_create_data_transfer]
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG,
project_id=GCP_PROJECT_ID,
task_id="gcp_bigquery_create_transfer",
)
transfer_config_id = (
"{{ task_instance.xcom_pull('gcp_bigquery_create_transfer', "
"key='transfer_config_id') }}"
)
# [END howto_bigquery_create_data_transfer]
# [START howto_bigquery_start_transfer]
gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
task_id="gcp_bigquery_start_transfer",
transfer_config_id=transfer_config_id,
requested_run_time={"seconds": int(time.time() + 60)},
)
run_id = (
"{{ task_instance.xcom_pull('gcp_bigquery_start_transfer', " "key='run_id') }}"
)
# [END howto_bigquery_start_transfer]
# [START howto_bigquery_dts_sensor]
gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
task_id="gcp_run_sensor",
transfer_config_id=transfer_config_id,
run_id=run_id,
expected_statuses={"SUCCEEDED"},
)
# [END howto_bigquery_dts_sensor]
# [START howto_bigquery_delete_data_transfer]
gcp_bigquery_delete_transfer = BigQueryDeleteDataTransferConfigOperator(
transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
)
# [END howto_bigquery_delete_data_transfer]
(
gcp_bigquery_create_transfer # noqa
>> gcp_bigquery_start_transfer # noqa
>> gcp_run_sensor # noqa
>> gcp_bigquery_delete_transfer # noqa
)

Просмотреть файл

@ -0,0 +1,277 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
This module contains a BigQuery Hook.
"""
from typing import Union, Sequence, Tuple
from copy import copy
from google.protobuf.json_format import MessageToDict, ParseDict
from google.api_core.retry import Retry
from google.cloud.bigquery_datatransfer_v1 import DataTransferServiceClient
from google.cloud.bigquery_datatransfer_v1.types import (
TransferConfig,
StartManualTransferRunsResponse,
TransferRun,
)
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
def get_object_id(obj: dict) -> str:
"""
Returns unique id of the object.
"""
return obj["name"].rpartition("/")[-1]
class BiqQueryDataTransferServiceHook(GoogleCloudBaseHook):
"""
Hook for Google Bigquery Transfer API.
All the methods in the hook where ``project_id`` is used must be called with
keyword arguments rather than positional.
"""
_conn = None
def __init__(
self, gcp_conn_id: str = "google_cloud_default", delegate_to: str = None
) -> None:
super().__init__(gcp_conn_id=gcp_conn_id, delegate_to=delegate_to)
@staticmethod
def _disable_auto_scheduling(config: Union[dict, TransferConfig]) -> TransferConfig:
"""
In the case of Airflow, the customer needs to create a transfer config
with the automatic scheduling disabled (UI, CLI or an Airflow operator) and
then trigger a transfer run using a specialized Airflow operator that will
call start_manual_transfer_runs.
:param config: Data transfer configuration to create.
:type config: Union[dict, google.cloud.bigquery_datatransfer_v1.types.TransferConfig]
"""
config = MessageToDict(config) if isinstance(config, TransferConfig) else config
new_config = copy(config)
schedule_options = new_config.get("schedule_options")
if schedule_options:
disable_auto_scheduling = schedule_options.get(
"disable_auto_scheduling", None
)
if disable_auto_scheduling is None:
schedule_options["disable_auto_scheduling"] = True
else:
new_config["schedule_options"] = {"disable_auto_scheduling": True}
return ParseDict(new_config, TransferConfig())
def get_conn(self) -> DataTransferServiceClient:
"""
Retrieves connection to Google Bigquery.
:return: Google Bigquery API client
:rtype: google.cloud.bigquery_datatransfer_v1.DataTransferServiceClient
"""
if not self._conn:
self._conn = DataTransferServiceClient(
credentials=self._get_credentials(), client_info=self.client_info
)
return self._conn
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def create_transfer_config(
self,
transfer_config: Union[dict, TransferConfig],
project_id: str = None,
authorization_code: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
) -> TransferConfig:
"""
Creates a new data transfer configuration.
:param transfer_config: Data transfer configuration to create.
:type transfer_config: Union[dict, google.cloud.bigquery_datatransfer_v1.types.TransferConfig]
:param project_id: The BigQuery project id where the transfer configuration should be
created. If set to None or missing, the default project_id from the GCP connection is used.
:type project_id: str
:param authorization_code: authorization code to use with this transfer configuration.
This is required if new credentials are needed.
:type authorization_code: Optional[str]
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: Optional[google.api_core.retry.Retry]
:param timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: Optional[float]
:param metadata: Additional metadata that is provided to the method.
:type metadata: Optional[Sequence[Tuple[str, str]]]
:return: A ``google.cloud.bigquery_datatransfer_v1.types.TransferConfig`` instance.
"""
assert project_id is not None
client = self.get_conn()
parent = client.project_path(project_id)
return client.create_transfer_config(
parent=parent,
transfer_config=self._disable_auto_scheduling(transfer_config),
authorization_code=authorization_code,
retry=retry,
timeout=timeout,
metadata=metadata,
)
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def delete_transfer_config(
self,
transfer_config_id: str,
project_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
) -> None:
"""
Deletes transfer configuration.
:param transfer_config_id: Id of transfer config to be used.
:type transfer_config_id: str
:param project_id: The BigQuery project id where the transfer configuration should be
created. If set to None or missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: Optional[google.api_core.retry.Retry]
:param timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: Optional[float]
:param metadata: Additional metadata that is provided to the method.
:type metadata: Optional[Sequence[Tuple[str, str]]]
:return: None
"""
assert project_id is not None
client = self.get_conn()
name = client.project_transfer_config_path(
project=project_id, transfer_config=transfer_config_id
)
return client.delete_transfer_config(
name=name, retry=retry, timeout=timeout, metadata=metadata
)
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def start_manual_transfer_runs(
self,
transfer_config_id: str,
project_id: str = None,
requested_time_range: dict = None,
requested_run_time: dict = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
) -> StartManualTransferRunsResponse:
"""
Start manual transfer runs to be executed now with schedule_time equal
to current time. The transfer runs can be created for a time range where
the run_time is between start_time (inclusive) and end_time
(exclusive), or for a specific run_time.
:param transfer_config_id: Id of transfer config to be used.
:type transfer_config_id: str
:param requested_time_range: Time range for the transfer runs that should be started.
If a dict is provided, it must be of the same form as the protobuf
message `~google.cloud.bigquery_datatransfer_v1.types.TimeRange`
:type requested_time_range: Union[dict, ~google.cloud.bigquery_datatransfer_v1.types.TimeRange]
:param requested_run_time: Specific run_time for a transfer run to be started. The
requested_run_time must not be in the future. If a dict is provided, it
must be of the same form as the protobuf message
`~google.cloud.bigquery_datatransfer_v1.types.Timestamp`
:type requested_run_time: Union[dict, ~google.cloud.bigquery_datatransfer_v1.types.Timestamp]
:param project_id: The BigQuery project id where the transfer configuration should be
created. If set to None or missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: Optional[google.api_core.retry.Retry]
:param timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: Optional[float]
:param metadata: Additional metadata that is provided to the method.
:type metadata: Optional[Sequence[Tuple[str, str]]]
:return: An ``google.cloud.bigquery_datatransfer_v1.types.StartManualTransferRunsResponse`` instance.
"""
assert project_id is not None
client = self.get_conn()
parent = client.project_transfer_config_path(
project=project_id, transfer_config=transfer_config_id
)
return client.start_manual_transfer_runs(
parent=parent,
requested_time_range=requested_time_range,
requested_run_time=requested_run_time,
retry=retry,
timeout=timeout,
metadata=metadata,
)
@GoogleCloudBaseHook.catch_http_exception
@GoogleCloudBaseHook.fallback_to_default_project_id
def get_transfer_run(
self,
run_id: str,
transfer_config_id: str,
project_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
) -> TransferRun:
"""
Returns information about the particular transfer run.
:param run_id: ID of the transfer run.
:type run_id: str
:param transfer_config_id: ID of transfer config to be used.
:type transfer_config_id: str
:param project_id: The BigQuery project id where the transfer configuration should be
created. If set to None or missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: Optional[google.api_core.retry.Retry]
:param timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: Optional[float]
:param metadata: Additional metadata that is provided to the method.
:type metadata: Optional[Sequence[Tuple[str, str]]]
:return: An ``google.cloud.bigquery_datatransfer_v1.types.TransferRun`` instance.
"""
assert project_id is not None
client = self.get_conn()
name = client.project_run_path(
project=project_id, transfer_config=transfer_config_id, run=run_id
)
return client.get_transfer_run(
name=name, retry=retry, timeout=timeout, metadata=metadata
)

Просмотреть файл

@ -0,0 +1,256 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains Google BigQuery Data Transfer Service operators.
"""
from typing import Sequence, Tuple
from google.protobuf.json_format import MessageToDict
from google.api_core.retry import Retry
from airflow.gcp.hooks.bigquery_dts import (
BiqQueryDataTransferServiceHook,
get_object_id,
)
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class BigQueryCreateDataTransferOperator(BaseOperator):
"""
Creates a new data transfer configuration.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BigQueryCreateDataTransferOperator`
:param transfer_config: Data transfer configuration to create.
:type transfer_config: dict
:param project_id: The BigQuery project id where the transfer configuration should be
created. If set to None or missing, the default project_id from the GCP connection is used.
:type project_id: str
:param authorization_code: authorization code to use with this transfer configuration.
This is required if new credentials are needed.
:type authorization_code: Optional[str]
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: Optional[google.api_core.retry.Retry]
:param timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: Optional[float]
:param metadata: Additional metadata that is provided to the method.
:type metadata: Optional[Sequence[Tuple[str, str]]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
template_fields = (
"transfer_config",
"project_id",
"authorization_code",
"gcp_conn_id",
)
@apply_defaults
def __init__(
self,
transfer_config: dict,
project_id: str = None,
authorization_code: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.transfer_config = transfer_config
self.authorization_code = authorization_code
self.project_id = project_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
hook = BiqQueryDataTransferServiceHook(gcp_conn_id=self.gcp_conn_id)
self.log.info("Creating DTS transfer config")
response = hook.create_transfer_config(
project_id=self.project_id,
transfer_config=self.transfer_config,
authorization_code=self.authorization_code,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
result = MessageToDict(response)
self.log.info("Created DTS transfer config %s", get_object_id(result))
self.xcom_push(context, key="transfer_config_id", value=get_object_id(result))
return result
class BigQueryDeleteDataTransferConfigOperator(BaseOperator):
"""
Deletes transfer configuration.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BigQueryDeleteDataTransferConfigOperator`
:param transfer_config_id: Id of transfer config to be used.
:type transfer_config_id: str
:param project_id: The BigQuery project id where the transfer configuration should be
created. If set to None or missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: Optional[google.api_core.retry.Retry]
:param timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: Optional[float]
:param metadata: Additional metadata that is provided to the method.
:type metadata: Optional[Sequence[Tuple[str, str]]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
template_fields = ("transfer_config_id", "project_id", "gcp_conn_id")
@apply_defaults
def __init__(
self,
transfer_config_id: str,
project_id: str = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.project_id = project_id
self.transfer_config_id = transfer_config_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
hook = BiqQueryDataTransferServiceHook(gcp_conn_id=self.gcp_conn_id)
hook.delete_transfer_config(
transfer_config_id=self.transfer_config_id,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
class BigQueryDataTransferServiceStartTransferRunsOperator(BaseOperator):
"""
Start manual transfer runs to be executed now with schedule_time equal
to current time. The transfer runs can be created for a time range where
the run_time is between start_time (inclusive) and end_time
(exclusive), or for a specific run_time.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BigQueryDataTransferServiceStartTransferRunsOperator`
:param transfer_config_id: Id of transfer config to be used.
:type transfer_config_id: str
:param requested_time_range: Time range for the transfer runs that should be started.
If a dict is provided, it must be of the same form as the protobuf
message `~google.cloud.bigquery_datatransfer_v1.types.TimeRange`
:type requested_time_range: Union[dict, ~google.cloud.bigquery_datatransfer_v1.types.TimeRange]
:param requested_run_time: Specific run_time for a transfer run to be started. The
requested_run_time must not be in the future. If a dict is provided, it
must be of the same form as the protobuf message
`~google.cloud.bigquery_datatransfer_v1.types.Timestamp`
:type requested_run_time: Union[dict, ~google.cloud.bigquery_datatransfer_v1.types.Timestamp]
:param project_id: The BigQuery project id where the transfer configuration should be
created. If set to None or missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: Optional[google.api_core.retry.Retry]
:param timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type timeout: Optional[float]
:param metadata: Additional metadata that is provided to the method.
:type metadata: Optional[Sequence[Tuple[str, str]]]
:param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
:type gcp_conn_id: str
"""
template_fields = (
"transfer_config_id",
"project_id",
"requested_time_range",
"requested_run_time",
"gcp_conn_id",
)
@apply_defaults
def __init__(
self,
transfer_config_id: str,
project_id: str = None,
requested_time_range: dict = None,
requested_run_time: dict = None,
retry: Retry = None,
timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
gcp_conn_id="google_cloud_default",
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.project_id = project_id
self.transfer_config_id = transfer_config_id
self.requested_time_range = requested_time_range
self.requested_run_time = requested_run_time
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
def execute(self, context):
hook = BiqQueryDataTransferServiceHook(gcp_conn_id=self.gcp_conn_id)
self.log.info('Submitting manual transfer for %s', self.transfer_config_id)
response = hook.start_manual_transfer_runs(
transfer_config_id=self.transfer_config_id,
requested_time_range=self.requested_time_range,
requested_run_time=self.requested_run_time,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
result = MessageToDict(response)
run_id = None
if 'runs' in result:
run_id = get_object_id(result['runs'][0])
self.xcom_push(context, key="run_id", value=run_id)
self.log.info('Transfer run %s submitted successfully.', run_id)
return result

Просмотреть файл

@ -0,0 +1,112 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module contains a Google BigQuery Data Transfer Service sensor.
"""
from typing import Sequence, Tuple, Union, Set
from google.api_core.retry import Retry
from google.protobuf.json_format import MessageToDict
from airflow.gcp.hooks.bigquery_dts import BiqQueryDataTransferServiceHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
class BigQueryDataTransferServiceTransferRunSensor(BaseSensorOperator):
"""
Waits for Data Transfer Service run to complete.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:BigQueryDataTransferServiceTransferRunSensor`
:param expected_statuses: The expected state of the operation.
See:
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferOperations#Status
:type expected_statuses: Union[Set[str], str]
:param run_id: ID of the transfer run.
:type run_id: str
:param transfer_config_id: ID of transfer config to be used.
:type transfer_config_id: str
:param project_id: The BigQuery project id where the transfer configuration should be
created. If set to None or missing, the default project_id from the GCP connection is used.
:type project_id: str
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: Optional[google.api_core.retry.Retry]
:param request_timeout: The amount of time, in seconds, to wait for the request to
complete. Note that if retry is specified, the timeout applies to each individual
attempt.
:type request_timeout: Optional[float]
:param metadata: Additional metadata that is provided to the method.
:type metadata: Optional[Sequence[Tuple[str, str]]]
:return: An ``google.cloud.bigquery_datatransfer_v1.types.TransferRun`` instance.
"""
template_fields = (
"run_id",
"transfer_config_id",
"expected_statuses",
"project_id",
)
@apply_defaults
def __init__(
self,
run_id: str,
transfer_config_id: str,
expected_statuses: Union[Set[str], str] = 'SUCCEEDED',
project_id: str = None,
gcp_conn_id: str = "google_cloud_default",
retry: Retry = None,
request_timeout: float = None,
metadata: Sequence[Tuple[str, str]] = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.run_id = run_id
self.transfer_config_id = transfer_config_id
self.retry = retry
self.request_timeout = request_timeout
self.metadata = metadata
self.expected_statuses = (
{expected_statuses}
if isinstance(expected_statuses, str)
else expected_statuses
)
self.project_id = project_id
self.gcp_cloud_conn_id = gcp_conn_id
def poke(self, context):
hook = BiqQueryDataTransferServiceHook(gcp_conn_id=self.gcp_cloud_conn_id)
run = hook.get_transfer_run(
run_id=self.run_id,
transfer_config_id=self.transfer_config_id,
project_id=self.project_id,
retry=self.retry,
timeout=self.request_timeout,
metadata=self.metadata,
)
result = MessageToDict(run)
state = result["state"]
self.log.info("Status of %s run: %s", self.run_id, state)
return state in self.expected_statuses

Просмотреть файл

@ -0,0 +1,137 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Google Cloud BigQuery Data Transfer Service Operators
=====================================================
The `BigQuery Data Transfer Service <https://cloud.google.com/bigquery/transfer/>`__
automates data movement from SaaS applications to Google BigQuery on a scheduled, managed basis.
Your analytics team can lay the foundation for a data warehouse without writing a single line of code.
BigQuery Data Transfer Service initially supports Google application sources like Google Ads,
Campaign Manager, Google Ad Manager and YouTube. Through BigQuery Data Transfer Service, users also
gain access to data connectors that allow you to easily transfer data from Teradata and Amazon S3 to BigQuery.
.. contents::
:depth: 1
:local:
Prerequisite Tasks
^^^^^^^^^^^^^^^^^^
.. include:: _partials/prerequisite_tasks.rst
.. _howto/operator:BigQueryDTSDocuments:
.. _howto/operator:BigQueryCreateDataTransferOperator:
Creating transfer configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To create DTS transfer configuration you can use
:class:`airflow.gcp.operators.bigquery_dts.BigQueryCreateDataTransferOperator`.
In the case of Airflow, the customer needs to create a transfer config with the automatic scheduling disabled
and then trigger a transfer run using a specialized Airflow operator that will call StartManualTransferRuns API
for example :class:`airflow.gcp.operators.bigquery_dts.BigQueryDataTransferServiceStartTransferRunsOperator`.
:class:`airflow.gcp.operators.bigquery_dts.BigQueryCreateDataTransferOperator` checks if automatic
scheduling option is present in passed configuration. If present then nothing is done, otherwise it's value is
set to ``True``.
.. exampleinclude:: ../../../../airflow/gcp/example_dags/example_bigquery_dts.py
:language: python
:start-after: [START howto_bigquery_dts_create_args]
:end-before: [END howto_bigquery_dts_create_args]
You can create the operator with or without project id. If project id is missing
it will be retrieved from the GCP connection used. Basic usage of the operator:
.. exampleinclude:: ../../../../airflow/gcp/example_dags/example_bigquery_dts.py
:language: python
:dedent: 4
:start-after: [START howto_bigquery_create_data_transfer]
:end-before: [END howto_bigquery_create_data_transfer]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.gcp.operators.bigquery_dts.BigQueryCreateDataTransferOperator`
parameters which allows you to dynamically determine values. The result is saved to :ref:`XCom <concepts:xcom>`,
which allows it to be used by other operators. Additionaly, id of the new config is accessible in
:ref:`XCom <concepts:xcom>` under ``transfer_config_id`` key.
.. _howto/operator:BigQueryDeleteDataTransferConfigOperator:
Deleting transfer configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
To delete DTS transfer configuration you can use
:class:`airflow.gcp.operators.bigquery_dts.BigQueryDeleteDataTransferConfigOperator`.
Basic usage of the operator:
.. exampleinclude:: ../../../../airflow/gcp/example_dags/example_bigquery_dts.py
:language: python
:dedent: 4
:start-after: [START howto_bigquery_delete_data_transfer]
:end-before: [END howto_bigquery_delete_data_transfer]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.gcp.operators.bigquery_dts.BigQueryCreateDataTransferOperator`
parameters which allows you to dynamically determine values.
.. _howto/operator:BigQueryDataTransferServiceStartTransferRunsOperator:
.. _howto/operator:BigQueryDataTransferServiceTransferRunSensor:
Manually starting transfer runs
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Start manual transfer runs to be executed now with schedule_time equal to current time.
:class:`airflow.gcp.operators.bigquery_dts.BigQueryDataTransferServiceStartTransferRunsOperator`.
Basic usage of the operator:
.. exampleinclude:: ../../../../airflow/gcp/example_dags/example_bigquery_dts.py
:language: python
:dedent: 4
:start-after: [START howto_bigquery_start_transfer]
:end-before: [END howto_bigquery_start_transfer]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.gcp.operators.bigquery_dts.BigQueryDataTransferServiceStartTransferRunsOperator`
parameters which allows you to dynamically determine values.
To check if operation succeeded you can use
:class:`airflow.gcp.sensors.bigquery_dts.BigQueryDataTransferServiceTransferRunSensor`.
.. exampleinclude:: ../../../../airflow/gcp/example_dags/example_bigquery_dts.py
:language: python
:dedent: 4
:start-after: [START howto_bigquery_dts_sensor]
:end-before: [END howto_bigquery_dts_sensor]
You can use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.gcp.sensors.bigquery_dts.BigQueryDataTransferServiceTransferRunSensor`
parameters which allows you to dynamically determine values.
Reference
^^^^^^^^^
For further information, look at:
* `Client Library Documentation <https://googleapis.github.io/google-cloud-python/latest/bigquery_datatransfer/index.html>`__
* `Product Documentation <https://cloud.google.com/bigquery/transfer/>`__

Просмотреть файл

@ -360,6 +360,20 @@ BigQuery
They also use :class:`airflow.contrib.hooks.bigquery_hook.BigQueryHook` to communicate with Google Cloud Platform.
BigQuery Data Transfer Service
''''''''''''''''''''''''''''''
:class:`airflow.gcp.operators.bigquery_dts.BigQueryCreateDataTransferOperator`
Creates a new data transfer configuration.
:class:`airflow.gcp.operators.bigquery_dts.BigQueryDeleteDataTransferConfigOperator`
Deletes transfer configuration.
:class:`airflow.gcp.sensors.bigquery_dts.BigQueryDataTransferServiceTransferRunSensor`
Waits for Data Transfer Service run to complete.
They also use :class:`airflow.gcp.hooks.bigquery_dts.BiqQueryDataTransferServiceHook` to communicate with Google Cloud Platform.
Cloud Spanner
'''''''''''''

Просмотреть файл

@ -194,6 +194,7 @@ gcp = [
'google-auth>=1.0.0, <2.0.0dev',
'google-cloud-automl>=0.4.0',
'google-cloud-bigtable==0.33.0',
'google-cloud-bigquery-datatransfer>=0.4.0',
'google-cloud-container>=0.1.1',
'google-cloud-dlp>=0.11.0',
'google-cloud-language>=1.1.1',

Просмотреть файл

@ -29,6 +29,7 @@ from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
# Please keep these variables in alphabetical order.
GCP_AI_KEY = 'gcp_ai.json'
GCP_AUTOML_KEY = 'gcp_automl.json'
GCP_BIGQUERY_KEY = 'gcp_bigquery.json'
GCP_BIGTABLE_KEY = 'gcp_bigtable.json'
GCP_BIGQUERY_KEY = 'gcp_bigquery.json'
GCP_CLOUD_BUILD_KEY = 'gcp_cloud_build.json'

Просмотреть файл

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from copy import deepcopy
from google.protobuf.json_format import ParseDict
from google.cloud.bigquery_datatransfer_v1 import DataTransferServiceClient
from google.cloud.bigquery_datatransfer_v1.types import TransferConfig
from tests.compat import mock
from tests.contrib.utils.base_gcp_mock import mock_base_gcp_hook_no_default_project_id
from airflow.version import version
from airflow.gcp.hooks.bigquery_dts import BiqQueryDataTransferServiceHook
CREDENTIALS = "test-creds"
PROJECT_ID = "id"
PARAMS = {
"field_delimiter": ",",
"max_bad_records": "0",
"skip_leading_rows": "1",
"data_path_template": "bucket",
"destination_table_name_template": "name",
"file_format": "CSV",
}
TRANSFER_CONFIG = ParseDict(
{
"destination_dataset_id": "dataset",
"display_name": "GCS Test Config",
"data_source_id": "google_cloud_storage",
"params": PARAMS,
},
TransferConfig(),
)
TRANSFER_CONFIG_ID = "id1234"
class BigQueryDataTransferHookTestCase(unittest.TestCase):
def setUp(self) -> None:
with mock.patch(
"airflow.gcp.hooks.bigquery_dts.GoogleCloudBaseHook.__init__",
new=mock_base_gcp_hook_no_default_project_id,
):
self.hook = BiqQueryDataTransferServiceHook( # type: ignore
gcp_conn_id=None
)
self.hook._get_credentials = mock.MagicMock( # type: ignore
return_value=CREDENTIALS
)
def test_version_information(self):
expected_version = "airflow_v" + version
self.assertEqual(expected_version, self.hook.client_info.client_library_version)
def test_disable_auto_scheduling(self):
expected = deepcopy(TRANSFER_CONFIG)
expected.schedule_options.disable_auto_scheduling = True
self.assertEqual(expected, self.hook._disable_auto_scheduling(TRANSFER_CONFIG))
@mock.patch(
"airflow.gcp.hooks.bigquery_dts."
"DataTransferServiceClient.create_transfer_config"
)
def test_create_transfer_config(self, service_mock):
self.hook.create_transfer_config(
transfer_config=TRANSFER_CONFIG, project_id=PROJECT_ID
)
parent = DataTransferServiceClient.project_path(PROJECT_ID)
expected_config = deepcopy(TRANSFER_CONFIG)
expected_config.schedule_options.disable_auto_scheduling = True
service_mock.assert_called_once_with(
parent=parent,
transfer_config=expected_config,
authorization_code=None,
metadata=None,
retry=None,
timeout=None,
)
@mock.patch(
"airflow.gcp.hooks.bigquery_dts."
"DataTransferServiceClient.delete_transfer_config"
)
def test_delete_transfer_config(self, service_mock):
self.hook.delete_transfer_config(
transfer_config_id=TRANSFER_CONFIG_ID, project_id=PROJECT_ID
)
name = DataTransferServiceClient.project_transfer_config_path(
PROJECT_ID, TRANSFER_CONFIG_ID
)
service_mock.assert_called_once_with(
name=name, metadata=None, retry=None, timeout=None
)
@mock.patch(
"airflow.gcp.hooks.bigquery_dts."
"DataTransferServiceClient.start_manual_transfer_runs"
)
def test_start_manual_transfer_runs(self, service_mock):
self.hook.start_manual_transfer_runs(
transfer_config_id=TRANSFER_CONFIG_ID, project_id=PROJECT_ID
)
parent = DataTransferServiceClient.project_transfer_config_path(
PROJECT_ID, TRANSFER_CONFIG_ID
)
service_mock.assert_called_once_with(
parent=parent,
requested_time_range=None,
requested_run_time=None,
metadata=None,
retry=None,
timeout=None,
)

Просмотреть файл

@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.compat import mock
from airflow.gcp.operators.bigquery_dts import (
BigQueryCreateDataTransferOperator,
BigQueryDeleteDataTransferConfigOperator,
BigQueryDataTransferServiceStartTransferRunsOperator,
)
PROJECT_ID = "id"
TRANSFER_CONFIG = {
"data_source_id": "google_cloud_storage",
"destination_dataset_id": "example_dataset",
"params": {},
"display_name": "example-transfer",
"disabled": False,
"data_refresh_window_days": 0,
"schedule": "first sunday of quarter 00:00",
}
TRANSFER_CONFIG_ID = "id1234"
NAME = "projects/123abc/locations/321cba/transferConfig/1a2b3c"
class BigQueryCreateDataTransferOperatorTestCase(unittest.TestCase):
@mock.patch(
"airflow.gcp.operators.bigquery_dts."
"BiqQueryDataTransferServiceHook.create_transfer_config"
)
@mock.patch("airflow.gcp.operators.bigquery_dts.get_object_id")
def test_execute(self, mock_name, mock_hook):
mock_name.return_value = TRANSFER_CONFIG_ID
mock_xcom = mock.MagicMock()
op = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG, project_id=PROJECT_ID, task_id="id"
)
op.xcom_push = mock_xcom
op.execute(None)
mock_hook.assert_called_once_with(
authorization_code=None,
metadata=None,
transfer_config=TRANSFER_CONFIG,
project_id=PROJECT_ID,
retry=None,
timeout=None,
)
class BigQueryDeleteDataTransferConfigOperatorTestCase(unittest.TestCase):
@mock.patch(
"airflow.gcp.operators.bigquery_dts."
"BiqQueryDataTransferServiceHook.delete_transfer_config"
)
def test_execute(self, mock_hook):
op = BigQueryDeleteDataTransferConfigOperator(
transfer_config_id=TRANSFER_CONFIG_ID, task_id="id", project_id=PROJECT_ID
)
op.execute(None)
mock_hook.assert_called_once_with(
metadata=None,
transfer_config_id=TRANSFER_CONFIG_ID,
project_id=PROJECT_ID,
retry=None,
timeout=None,
)
class BigQueryDataTransferServiceStartTransferRunsOperatorTestCase(unittest.TestCase):
@mock.patch(
"airflow.gcp.operators.bigquery_dts."
"BiqQueryDataTransferServiceHook.start_manual_transfer_runs"
)
def test_execute(self, mock_hook):
op = BigQueryDataTransferServiceStartTransferRunsOperator(
transfer_config_id=TRANSFER_CONFIG_ID, task_id="id", project_id=PROJECT_ID
)
op.execute(None)
mock_hook.assert_called_once_with(
transfer_config_id=TRANSFER_CONFIG_ID,
project_id=PROJECT_ID,
requested_time_range=None,
requested_run_time=None,
metadata=None,
retry=None,
timeout=None,
)

Просмотреть файл

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from tests.gcp.operators.test_bigquery_dts_system_helper import (
GcpBigqueryDtsTestHelper,
)
from tests.contrib.utils.base_gcp_system_test_case import (
SKIP_TEST_WARNING,
TestDagGcpSystem,
)
from tests.contrib.utils.gcp_authenticator import GCP_BIGQUERY_KEY
from airflow.gcp.example_dags.example_bigquery_dts import (
GCP_PROJECT_ID,
GCP_DTS_BQ_DATASET,
GCP_DTS_BQ_TABLE,
BUCKET_URI
)
@unittest.skipIf(TestDagGcpSystem.skip_check(GCP_BIGQUERY_KEY), SKIP_TEST_WARNING)
class GcpBigqueryDtsSystemTest(TestDagGcpSystem):
def __init__(self, method_name="runTest"):
super(GcpBigqueryDtsSystemTest, self).__init__(
method_name, dag_id="example_gcp_bigquery_dts", gcp_key=GCP_BIGQUERY_KEY
)
self.helper = GcpBigqueryDtsTestHelper()
def setUp(self):
super().setUp()
self.gcp_authenticator.gcp_authenticate()
self.helper.create_dataset(
project_id=GCP_PROJECT_ID,
dataset=GCP_DTS_BQ_DATASET,
table=GCP_DTS_BQ_TABLE,
)
self.helper.upload_data(dataset=GCP_DTS_BQ_DATASET, table=GCP_DTS_BQ_TABLE, gcs_file=BUCKET_URI)
self.gcp_authenticator.gcp_revoke_authentication()
def tearDown(self):
self.gcp_authenticator.gcp_authenticate()
self.helper.delete_dataset(
project_id=GCP_PROJECT_ID, dataset=GCP_DTS_BQ_DATASET
)
self.gcp_authenticator.gcp_revoke_authentication()
super().tearDown()
def test_run_example_dag_function(self):
self._run_dag()

Просмотреть файл

@ -0,0 +1,48 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from tests.contrib.utils.logging_command_executor import LoggingCommandExecutor
class GcpBigqueryDtsTestHelper(LoggingCommandExecutor):
def create_dataset(self, project_id: str, dataset: str, table: str):
dataset_name = "{}:{}".format(project_id, dataset)
self.execute_cmd(["bq", "--location", "us", "mk", "--dataset", dataset_name])
table_name = "{}.{}".format(dataset_name, table)
self.execute_cmd(["bq", "mk", "--table", table_name, ""])
def upload_data(self, dataset: str, table: str, gcs_file: str):
table_name = "{}.{}".format(dataset, table)
self.execute_cmd(
[
"bq",
"--location",
"us",
"load",
"--autodetect",
"--source_format",
"CSV",
table_name,
gcs_file,
]
)
def delete_dataset(self, project_id: str, dataset: str):
dataset_name = "{}:{}".format(project_id, dataset)
self.execute_cmd(["bq", "rm", "-r", "-f", "-d", dataset_name])

Просмотреть файл

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
import mock
from airflow.gcp.sensors.bigquery_dts import (
BigQueryDataTransferServiceTransferRunSensor,
)
TRANSFER_CONFIG_ID = "config_id"
RUN_ID = "run_id"
PROJECT_ID = "project_id"
class TestBigQueryDataTransferServiceTransferRunSensor(unittest.TestCase):
@mock.patch(
"airflow.gcp.sensors.bigquery_dts."
"BiqQueryDataTransferServiceHook.get_transfer_run"
)
@mock.patch(
"airflow.gcp.sensors.bigquery_dts.MessageToDict",
return_value={"state": "success"},
)
def test_poke(self, mock_msg_to_dict, mock_hook):
op = BigQueryDataTransferServiceTransferRunSensor(
transfer_config_id=TRANSFER_CONFIG_ID,
run_id=RUN_ID,
task_id="id",
project_id=PROJECT_ID,
expected_statuses={"success"},
)
op.poke(None)
mock_hook.assert_called_once_with(
transfer_config_id=TRANSFER_CONFIG_ID,
run_id=RUN_ID,
project_id=PROJECT_ID,
metadata=None,
retry=None,
timeout=None,
)