Allow `replace` flag in gcs_to_gcs operator. (#9667)

* Allow `replace` flag in gcs_to_gcs operator.
If we are not replacing, list all files in the Destination GCS bucket and only keep those files which are present in Source GCS bucket and not in Destination GCS bucket
This commit is contained in:
royberkoweee 2020-07-14 19:21:31 +03:00 коммит произвёл GitHub
Родитель 4636fc6ede
Коммит ed5004cca7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 34 добавлений и 0 удалений

Просмотреть файл

@ -69,6 +69,8 @@ class GCSToGCSOperator(BaseOperator):
of copied to the new location. This is the equivalent of a mv command
as opposed to a cp command.
:type move_object: bool
:param replace: Whether you want to replace existing destination files or not.
:type replace: bool
:param delimiter: This is used to restrict the result to only the 'files' in a given 'folder'.
If source_objects = ['foo/bah/'] and delimiter = '.avro', then only the 'files' in the
folder 'foo/bah/' with '.avro' delimiter will be copied to the destination object.
@ -176,6 +178,7 @@ class GCSToGCSOperator(BaseOperator):
destination_object=None,
delimiter=None,
move_object=False,
replace=True,
gcp_conn_id='google_cloud_default',
google_cloud_storage_conn_id=None,
delegate_to=None,
@ -198,6 +201,7 @@ class GCSToGCSOperator(BaseOperator):
self.destination_object = destination_object
self.delimiter = delimiter
self.move_object = move_object
self.replace = replace
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.last_modified_time = last_modified_time
@ -276,6 +280,21 @@ class GCSToGCSOperator(BaseOperator):
self.log.info('Delimiter ignored because wildcard is in prefix')
prefix_, delimiter = prefix.split(WILDCARD, 1)
objects = hook.list(self.source_bucket, prefix=prefix_, delimiter=delimiter)
if not self.replace:
# If we are not replacing, list all files in the Destination GCS bucket
# and only keep those files which are present in
# Source GCS bucket and not in Destination GCS bucket
existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter)
objects = set(objects) - set(existing_objects)
if len(objects) > 0:
self.log.info(
'%s files are going to be synced: %s.', len(objects), objects
)
else:
self.log.info(
'There are no new files to sync. Have a nice day!')
for source_object in objects:
if self.destination_object is None:
destination_object = source_object

Просмотреть файл

@ -86,6 +86,21 @@ class TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase):
TEST_BUCKET, prefix="test_object", delimiter=""
)
@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_wildcard_with_replace_flag_false(self, mock_hook):
operator = GCSToGCSOperator(
task_id=TASK_ID, source_bucket=TEST_BUCKET,
source_object=SOURCE_OBJECT_WILDCARD_SUFFIX,
destination_bucket=DESTINATION_BUCKET,
replace=False)
operator.execute(None)
mock_calls = [
mock.call(TEST_BUCKET, prefix="test_object", delimiter=""),
mock.call(DESTINATION_BUCKET, prefix="test_object", delimiter=""),
]
mock_hook.return_value.list.assert_has_calls(mock_calls)
@mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook')
def test_execute_prefix_and_suffix(self, mock_hook):
operator = GCSToGCSOperator(