[AIRFLOW-3531] Add gcs to gcs transfer operator. (#4331)

This commit is contained in:
Joshua Carp 2019-01-06 14:35:31 -05:00 коммит произвёл Kaxil Naik
Родитель 67d8ab760b
Коммит e5c4f6cadc
7 изменённых файлов: 276 добавлений и 7 удалений

Просмотреть файл

@ -26,7 +26,7 @@ from airflow.exceptions import AirflowException
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
# Time to sleep between active checks of the operation results
TIME_TO_SLEEP_IN_SECONDS = 1
TIME_TO_SLEEP_IN_SECONDS = 10
# noinspection PyAbstractClass
@ -56,10 +56,10 @@ class GCPTransferServiceHook(GoogleCloudBaseHook):
http=http_authorized, cache_discovery=False)
return self._conn
def create_transfer_job(self, project_id, description, schedule, transfer_spec):
def create_transfer_job(self, description, schedule, transfer_spec, project_id=None):
transfer_job = {
'status': 'ENABLED',
'projectId': project_id,
'projectId': project_id or self.project_id,
'description': description,
'transferSpec': transfer_spec,
'schedule': schedule or self._schedule_once_now(),

Просмотреть файл

@ -0,0 +1,127 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.models import BaseOperator
from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
from airflow.utils.decorators import apply_defaults
class GoogleCloudStorageToGoogleCloudStorageTransferOperator(BaseOperator):
"""
Copies objects from a bucket to another using the GCP Storage Transfer
Service.
:param source_bucket: The source Google cloud storage bucket where the
object is. (templated)
:type source_bucket: str
:param destination_bucket: The destination Google cloud storage bucket
where the object should be. (templated)
:type destination_bucket: str
:param project_id: The ID of the Google Cloud Platform Console project that
owns the job
:type project_id: str
:param gcp_conn_id: Optional connection ID to use when connecting to Google Cloud
Storage.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
:param description: Optional transfer service job description
:type description: str
:param schedule: Optional transfer service schedule; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
If not set, run transfer job once as soon as the operator runs
:type schedule: dict
:param object_conditions: Optional transfer service object conditions; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#ObjectConditions
:type object_conditions: dict
:param transfer_options: Optional transfer service transfer options; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec#TransferOptions
:type transfer_options: dict
:param wait: Wait for transfer to finish; defaults to `True`
:type wait: bool
**Example**:
.. code-block:: python
gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id='gcs_to_gcs_transfer_example',
source_bucket='my-source-bucket',
destination_bucket='my-destination-bucket',
project_id='my-gcp-project',
dag=my_dag)
"""
template_fields = ('source_bucket', 'destination_bucket', 'description', 'object_conditions')
ui_color = '#e09411'
@apply_defaults
def __init__(self,
source_bucket,
destination_bucket,
project_id=None,
gcp_conn_id='google_cloud_default',
delegate_to=None,
description=None,
schedule=None,
object_conditions=None,
transfer_options=None,
wait=True,
*args,
**kwargs):
super(GoogleCloudStorageToGoogleCloudStorageTransferOperator, self).__init__(
*args,
**kwargs)
self.source_bucket = source_bucket
self.destination_bucket = destination_bucket
self.project_id = project_id
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.description = description
self.schedule = schedule
self.object_conditions = object_conditions or {}
self.transfer_options = transfer_options or {}
self.wait = wait
def execute(self, context):
transfer_hook = GCPTransferServiceHook(
gcp_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)
job = transfer_hook.create_transfer_job(
project_id=self.project_id,
description=self.description,
schedule=self.schedule,
transfer_spec={
'gcsDataSource': {
'bucketName': self.source_bucket,
},
'gcsDataSink': {
'bucketName': self.destination_bucket,
},
'objectConditions': self.object_conditions,
'transferOptions': self.transfer_options,
}
)
if self.wait:
transfer_hook.wait_for_transfer_job(job)

Просмотреть файл

@ -33,7 +33,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
:param gcs_bucket: The destination Google Cloud Storage bucket
where you want to store the files. (templated)
:type gcs_bucket: str
:param project_id: The ID of the Google Cloud Platform Console project that
:param project_id: Optional ID of the Google Cloud Platform Console project that
owns the job
:type project_id: str
:param aws_conn_id: The source S3 connection
@ -51,10 +51,10 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs.
If not set, run transfer job once as soon as the operator runs
:type schedule: dict
:param object_conditions: Transfer service object conditions; see
:param object_conditions: Optional transfer service object conditions; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
:type object_conditions: dict
:param transfer_options: Transfer service transfer options; see
:param transfer_options: Optional transfer service transfer options; see
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/TransferSpec
:type transfer_options: dict
:param wait: Wait for transfer to finish
@ -79,7 +79,7 @@ class S3ToGoogleCloudStorageTransferOperator(BaseOperator):
def __init__(self,
s3_bucket,
gcs_bucket,
project_id,
project_id=None,
aws_conn_id='aws_default',
gcp_conn_id='google_cloud_default',
delegate_to=None,

Просмотреть файл

@ -181,6 +181,7 @@ Operators
.. autoclass:: airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator
.. autoclass:: airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator
.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator
.. autoclass:: airflow.contrib.operators.gcs_to_s3.GoogleCloudStorageToS3Operator
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator
.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator

Просмотреть файл

@ -1283,6 +1283,7 @@ Storage Operators
- :ref:`GoogleCloudStorageObjectCreateAclEntryOperator` : Creates a new ACL entry on the specified object.
- :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery.
- :ref:`GoogleCloudStorageToGoogleCloudStorageOperator` : Copies objects from a bucket to another, with renaming if requested.
- :ref:`GoogleCloudStorageToGoogleCloudStorageTransferOperator` : Copies objects from a bucket to another using Google Transfer service.
- :ref:`MySqlToGoogleCloudStorageOperator`: Copy data from any MySQL Database to Google cloud storage in JSON format.
.. _FileToGoogleCloudStorageOperator:
@ -1341,6 +1342,13 @@ GoogleCloudStorageToGoogleCloudStorageOperator
.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator
.. _GoogleCloudStorageToGoogleCloudStorageTransferOperator:
GoogleCloudStorageToGoogleCloudStorageTransferOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageTransferOperator
.. _MySqlToGoogleCloudStorageOperator:
MySqlToGoogleCloudStorageOperator

Просмотреть файл

@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from airflow.contrib.operators.gcs_to_gcs_transfer_operator import \
GoogleCloudStorageToGoogleCloudStorageTransferOperator
try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None
TASK_ID = 'test-gcs-gcs-transfer-operator'
SOURCE_BUCKET = 'test-source-bucket'
DESTINATION_BUCKET = 'test-destination-bucket'
PROJECT_ID = 'test-project'
DESCRIPTION = 'test-description'
SCHEDULE = {
'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018},
'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018},
}
class GoogleCloudStorageToGoogleCloudStorageTransferOperatorTest(unittest.TestCase):
def test_constructor(self):
"""Test GoogleCloudStorageToGoogleCloudStorageTransferOperator instance is properly initialized."""
operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id=TASK_ID,
source_bucket=SOURCE_BUCKET,
destination_bucket=DESTINATION_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
)
self.assertEqual(operator.task_id, TASK_ID)
self.assertEqual(operator.source_bucket, SOURCE_BUCKET)
self.assertEqual(operator.destination_bucket, DESTINATION_BUCKET)
self.assertEqual(operator.project_id, PROJECT_ID)
self.assertEqual(operator.description, DESCRIPTION)
self.assertEqual(operator.schedule, SCHEDULE)
@mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook')
def test_execute(self, mock_transfer_hook):
"""Test the execute function when the run is successful."""
operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id=TASK_ID,
source_bucket=SOURCE_BUCKET,
destination_bucket=DESTINATION_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
)
operator.execute(None)
mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
transfer_spec={
'gcsDataSource': {
'bucketName': SOURCE_BUCKET,
},
'gcsDataSink': {
'bucketName': DESTINATION_BUCKET,
},
'objectConditions': {},
'transferOptions': {}
}
)
mock_transfer_hook.return_value.wait_for_transfer_job.assert_called_once_with(
mock_transfer_hook.return_value.create_transfer_job.return_value
)
@mock.patch('airflow.contrib.operators.gcs_to_gcs_transfer_operator.GCPTransferServiceHook')
def test_execute_skip_wait(self, mock_transfer_hook):
"""Test the execute function when the run is successful."""
operator = GoogleCloudStorageToGoogleCloudStorageTransferOperator(
task_id=TASK_ID,
source_bucket=SOURCE_BUCKET,
destination_bucket=DESTINATION_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
wait=False,
)
operator.execute(None)
mock_transfer_hook.return_value.create_transfer_job.assert_called_once_with(
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=None,
transfer_spec={
'gcsDataSource': {
'bucketName': SOURCE_BUCKET,
},
'gcsDataSink': {
'bucketName': DESTINATION_BUCKET,
},
'objectConditions': {},
'transferOptions': {}
}
)
assert not mock_transfer_hook.return_value.wait_for_transfer_job.called

Просмотреть файл

@ -59,6 +59,7 @@ class S3ToGoogleCloudStorageTransferOperatorTest(unittest.TestCase):
gcs_bucket=GCS_BUCKET,
project_id=PROJECT_ID,
description=DESCRIPTION,
schedule=SCHEDULE,
)
self.assertEqual(operator.task_id, TASK_ID)
@ -66,6 +67,7 @@ class S3ToGoogleCloudStorageTransferOperatorTest(unittest.TestCase):
self.assertEqual(operator.gcs_bucket, GCS_BUCKET)
self.assertEqual(operator.project_id, PROJECT_ID)
self.assertEqual(operator.description, DESCRIPTION)
self.assertEqual(operator.schedule, SCHEDULE)
@mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.GCPTransferServiceHook')
@mock.patch('airflow.contrib.operators.s3_to_gcs_transfer_operator.S3Hook')