diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index a3d6db8636..70b6288e72 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -69,6 +69,8 @@ class GCSToGCSOperator(BaseOperator): of copied to the new location. This is the equivalent of a mv command as opposed to a cp command. :type move_object: bool + :param replace: Whether you want to replace existing destination files or not. + :type replace: bool :param delimiter: This is used to restrict the result to only the 'files' in a given 'folder'. If source_objects = ['foo/bah/'] and delimiter = '.avro', then only the 'files' in the folder 'foo/bah/' with '.avro' delimiter will be copied to the destination object. @@ -176,6 +178,7 @@ class GCSToGCSOperator(BaseOperator): destination_object=None, delimiter=None, move_object=False, + replace=True, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id=None, delegate_to=None, @@ -198,6 +201,7 @@ class GCSToGCSOperator(BaseOperator): self.destination_object = destination_object self.delimiter = delimiter self.move_object = move_object + self.replace = replace self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.last_modified_time = last_modified_time @@ -276,6 +280,21 @@ class GCSToGCSOperator(BaseOperator): self.log.info('Delimiter ignored because wildcard is in prefix') prefix_, delimiter = prefix.split(WILDCARD, 1) objects = hook.list(self.source_bucket, prefix=prefix_, delimiter=delimiter) + if not self.replace: + # If we are not replacing, list all files in the Destination GCS bucket + # and only keep those files which are present in + # Source GCS bucket and not in Destination GCS bucket + + existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter) + + objects = set(objects) - set(existing_objects) + if len(objects) > 0: + self.log.info( + '%s files are going to be synced: %s.', len(objects), objects + ) + else: + self.log.info( + 'There are no new files to sync. Have a nice day!') for source_object in objects: if self.destination_object is None: destination_object = source_object diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index 49c067b6ae..a51b101147 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -86,6 +86,21 @@ class TestGoogleCloudStorageToCloudStorageOperator(unittest.TestCase): TEST_BUCKET, prefix="test_object", delimiter="" ) + @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') + def test_execute_wildcard_with_replace_flag_false(self, mock_hook): + operator = GCSToGCSOperator( + task_id=TASK_ID, source_bucket=TEST_BUCKET, + source_object=SOURCE_OBJECT_WILDCARD_SUFFIX, + destination_bucket=DESTINATION_BUCKET, + replace=False) + + operator.execute(None) + mock_calls = [ + mock.call(TEST_BUCKET, prefix="test_object", delimiter=""), + mock.call(DESTINATION_BUCKET, prefix="test_object", delimiter=""), + ] + mock_hook.return_value.list.assert_has_calls(mock_calls) + @mock.patch('airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook') def test_execute_prefix_and_suffix(self, mock_hook): operator = GCSToGCSOperator(