[AIRFLOW-6125] [AIP-21] Rename S3 operator and SFTP operator (#7112)

PR contains changes regarding AIP-21 (renaming GCP operators and hooks):
* renamed GCP modules
* fixed tests
This commit is contained in:
Michał Słowikowski 2020-01-09 14:53:59 +01:00 коммит произвёл Tomek Urbaszek
Родитель 70af358275
Коммит 5b6772cb83
6 изменённых файлов: 24 добавлений и 24 удалений

Просмотреть файл

@ -26,7 +26,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.utils.decorators import apply_defaults from airflow.utils.decorators import apply_defaults
class S3ToGoogleCloudStorageOperator(S3ListOperator): class S3ToGCSOperator(S3ListOperator):
""" """
Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage Synchronizes an S3 key, possibly a prefix, with a Google Cloud Storage
destination path. destination path.
@ -74,7 +74,7 @@ class S3ToGoogleCloudStorageOperator(S3ListOperator):
.. code-block:: python .. code-block:: python
s3_to_gcs_op = S3ToGoogleCloudStorageOperator( s3_to_gcs_op = S3ToGCSOperator(
task_id='s3_to_gcs_example', task_id='s3_to_gcs_example',
bucket='my-s3-bucket', bucket='my-s3-bucket',
prefix='data/customers-201804', prefix='data/customers-201804',

Просмотреть файл

@ -23,7 +23,7 @@ Example Airflow DAG for Google Cloud Storage to SFTP transfer operators.
import os import os
from airflow import models from airflow import models
from airflow.providers.google.cloud.operators.sftp_to_gcs import SFTPToGoogleCloudStorageOperator from airflow.providers.google.cloud.operators.sftp_to_gcs import SFTPToGCSOperator
from airflow.utils.dates import days_ago from airflow.utils.dates import days_ago
default_args = {"start_date": days_ago(1)} default_args = {"start_date": days_ago(1)}
@ -43,7 +43,7 @@ with models.DAG(
"example_sftp_to_gcs", default_args=default_args, schedule_interval=None "example_sftp_to_gcs", default_args=default_args, schedule_interval=None
) as dag: ) as dag:
# [START howto_operator_sftp_to_gcs_copy_single_file] # [START howto_operator_sftp_to_gcs_copy_single_file]
copy_file_from_sftp_to_gcs = SFTPToGoogleCloudStorageOperator( copy_file_from_sftp_to_gcs = SFTPToGCSOperator(
task_id="file-copy-sftp-to-gcs", task_id="file-copy-sftp-to-gcs",
source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_1), source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_1),
destination_bucket=BUCKET_SRC, destination_bucket=BUCKET_SRC,
@ -51,7 +51,7 @@ with models.DAG(
# [END howto_operator_sftp_to_gcs_copy_single_file] # [END howto_operator_sftp_to_gcs_copy_single_file]
# [START howto_operator_sftp_to_gcs_move_single_file_destination] # [START howto_operator_sftp_to_gcs_move_single_file_destination]
move_file_from_sftp_to_gcs_destination = SFTPToGoogleCloudStorageOperator( move_file_from_sftp_to_gcs_destination = SFTPToGCSOperator(
task_id="file-move-sftp-to-gcs-destination", task_id="file-move-sftp-to-gcs-destination",
source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_2), source_path=os.path.join(TMP_PATH, DIR, OBJECT_SRC_2),
destination_bucket=BUCKET_SRC, destination_bucket=BUCKET_SRC,
@ -61,7 +61,7 @@ with models.DAG(
# [END howto_operator_sftp_to_gcs_move_single_file_destination] # [END howto_operator_sftp_to_gcs_move_single_file_destination]
# [START howto_operator_sftp_to_gcs_copy_directory] # [START howto_operator_sftp_to_gcs_copy_directory]
copy_directory_from_sftp_to_gcs = SFTPToGoogleCloudStorageOperator( copy_directory_from_sftp_to_gcs = SFTPToGCSOperator(
task_id="dir-copy-sftp-to-gcs", task_id="dir-copy-sftp-to-gcs",
source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*"), source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*"),
destination_bucket=BUCKET_SRC, destination_bucket=BUCKET_SRC,
@ -69,7 +69,7 @@ with models.DAG(
# [END howto_operator_sftp_to_gcs_copy_directory] # [END howto_operator_sftp_to_gcs_copy_directory]
# [START howto_operator_sftp_to_gcs_move_specific_files] # [START howto_operator_sftp_to_gcs_move_specific_files]
move_specific_files_from_gcs_to_sftp = SFTPToGoogleCloudStorageOperator( move_specific_files_from_gcs_to_sftp = SFTPToGCSOperator(
task_id="dir-move-specific-files-sftp-to-gcs", task_id="dir-move-specific-files-sftp-to-gcs",
source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*.bin"), source_path=os.path.join(TMP_PATH, DIR, SUBDIR, "*.bin"),
destination_bucket=BUCKET_SRC, destination_bucket=BUCKET_SRC,

Просмотреть файл

@ -31,13 +31,13 @@ from airflow.utils.decorators import apply_defaults
WILDCARD = "*" WILDCARD = "*"
class SFTPToGoogleCloudStorageOperator(BaseOperator): class SFTPToGCSOperator(BaseOperator):
""" """
Transfer files to Google Cloud Storage from SFTP server. Transfer files to Google Cloud Storage from SFTP server.
.. seealso:: .. seealso::
For more information on how to use this operator, take a look at the guide: For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:SFTPToGoogleCloudStorageOperator` :ref:`howto/operator:SFTPToGCSOperator`
:param source_path: The sftp remote path. This is the specified file path :param source_path: The sftp remote path. This is the specified file path
for downloading the single file or multiple files from the SFTP server. for downloading the single file or multiple files from the SFTP server.

Просмотреть файл

@ -34,16 +34,16 @@ Prerequisite Tasks
.. include:: _partials/prerequisite_tasks.rst .. include:: _partials/prerequisite_tasks.rst
.. _howto/operator:SFTPToGoogleCloudStorageOperator: .. _howto/operator:SFTPToGCSOperator:
Operator Operator
^^^^^^^^ ^^^^^^^^
Transfer files between SFTP and Google Storage is performed with the Transfer files between SFTP and Google Storage is performed with the
:class:`~airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPToGoogleCloudStorageOperator` operator. :class:`~airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPToGCSOperator` operator.
Use :ref:`Jinja templating <jinja-templating>` with Use :ref:`Jinja templating <jinja-templating>` with
:template-fields:`airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPToGoogleCloudStorageOperator` :template-fields:`airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPToGCSOperator`
to define values dynamically. to define values dynamically.
Copying single files Copying single files

Просмотреть файл

@ -21,7 +21,7 @@ import unittest
import mock import mock
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator from airflow.contrib.operators.s3_to_gcs_operator import S3ToGCSOperator
TASK_ID = 'test-s3-gcs-operator' TASK_ID = 'test-s3-gcs-operator'
S3_BUCKET = 'test-bucket' S3_BUCKET = 'test-bucket'
@ -35,9 +35,9 @@ GCS_CONN_ID = 'google_cloud_default'
class TestS3ToGoogleCloudStorageOperator(unittest.TestCase): class TestS3ToGoogleCloudStorageOperator(unittest.TestCase):
def test_init(self): def test_init(self):
"""Test S3ToGoogleCloudStorageOperator instance is properly initialized.""" """Test S3ToGCSOperator instance is properly initialized."""
operator = S3ToGoogleCloudStorageOperator( operator = S3ToGCSOperator(
task_id=TASK_ID, task_id=TASK_ID,
bucket=S3_BUCKET, bucket=S3_BUCKET,
prefix=S3_PREFIX, prefix=S3_PREFIX,
@ -59,7 +59,7 @@ class TestS3ToGoogleCloudStorageOperator(unittest.TestCase):
def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): def test_execute(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook):
"""Test the execute function when the run is successful.""" """Test the execute function when the run is successful."""
operator = S3ToGoogleCloudStorageOperator( operator = S3ToGCSOperator(
task_id=TASK_ID, task_id=TASK_ID,
bucket=S3_BUCKET, bucket=S3_BUCKET,
prefix=S3_PREFIX, prefix=S3_PREFIX,
@ -94,7 +94,7 @@ class TestS3ToGoogleCloudStorageOperator(unittest.TestCase):
def test_execute_with_gzip(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook): def test_execute_with_gzip(self, gcs_mock_hook, s3_one_mock_hook, s3_two_mock_hook):
"""Test the execute function when the run is successful.""" """Test the execute function when the run is successful."""
operator = S3ToGoogleCloudStorageOperator( operator = S3ToGCSOperator(
task_id=TASK_ID, task_id=TASK_ID,
bucket=S3_BUCKET, bucket=S3_BUCKET,
prefix=S3_PREFIX, prefix=S3_PREFIX,

Просмотреть файл

@ -24,7 +24,7 @@ import unittest
import mock import mock
from airflow.exceptions import AirflowException from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.operators.sftp_to_gcs import SFTPToGoogleCloudStorageOperator from airflow.providers.google.cloud.operators.sftp_to_gcs import SFTPToGCSOperator
TASK_ID = "test-gcs-to-sftp-operator" TASK_ID = "test-gcs-to-sftp-operator"
GCP_CONN_ID = "GCP_CONN_ID" GCP_CONN_ID = "GCP_CONN_ID"
@ -52,11 +52,11 @@ DESTINATION_PATH_FILE = "destination_dir/copy.txt"
# pylint: disable=unused-argument # pylint: disable=unused-argument
class TestSFTPToGoogleCloudStorageOperator(unittest.TestCase): class TestSFTPToGCSOperator(unittest.TestCase):
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.GoogleCloudStorageHook") @mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.GoogleCloudStorageHook")
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPHook") @mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPHook")
def test_execute_copy_single_file(self, sftp_hook, gcs_hook): def test_execute_copy_single_file(self, sftp_hook, gcs_hook):
task = SFTPToGoogleCloudStorageOperator( task = SFTPToGCSOperator(
task_id=TASK_ID, task_id=TASK_ID,
source_path=SOURCE_OBJECT_NO_WILDCARD, source_path=SOURCE_OBJECT_NO_WILDCARD,
destination_bucket=TEST_BUCKET, destination_bucket=TEST_BUCKET,
@ -88,7 +88,7 @@ class TestSFTPToGoogleCloudStorageOperator(unittest.TestCase):
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.GoogleCloudStorageHook") @mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.GoogleCloudStorageHook")
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPHook") @mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPHook")
def test_execute_move_single_file(self, sftp_hook, gcs_hook): def test_execute_move_single_file(self, sftp_hook, gcs_hook):
task = SFTPToGoogleCloudStorageOperator( task = SFTPToGCSOperator(
task_id=TASK_ID, task_id=TASK_ID,
source_path=SOURCE_OBJECT_NO_WILDCARD, source_path=SOURCE_OBJECT_NO_WILDCARD,
destination_bucket=TEST_BUCKET, destination_bucket=TEST_BUCKET,
@ -128,7 +128,7 @@ class TestSFTPToGoogleCloudStorageOperator(unittest.TestCase):
[], [],
] ]
task = SFTPToGoogleCloudStorageOperator( task = SFTPToGCSOperator(
task_id=TASK_ID, task_id=TASK_ID,
source_path=SOURCE_OBJECT_WILDCARD_FILENAME, source_path=SOURCE_OBJECT_WILDCARD_FILENAME,
destination_bucket=TEST_BUCKET, destination_bucket=TEST_BUCKET,
@ -178,7 +178,7 @@ class TestSFTPToGoogleCloudStorageOperator(unittest.TestCase):
] ]
gcs_hook.return_value.list.return_value = SOURCE_FILES_LIST[:2] gcs_hook.return_value.list.return_value = SOURCE_FILES_LIST[:2]
task = SFTPToGoogleCloudStorageOperator( task = SFTPToGCSOperator(
task_id=TASK_ID, task_id=TASK_ID,
source_path=SOURCE_OBJECT_WILDCARD_FILENAME, source_path=SOURCE_OBJECT_WILDCARD_FILENAME,
destination_bucket=TEST_BUCKET, destination_bucket=TEST_BUCKET,
@ -200,7 +200,7 @@ class TestSFTPToGoogleCloudStorageOperator(unittest.TestCase):
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.GoogleCloudStorageHook") @mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.GoogleCloudStorageHook")
@mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPHook") @mock.patch("airflow.providers.google.cloud.operators.sftp_to_gcs.SFTPHook")
def test_execute_more_than_one_wildcard_exception(self, sftp_hook, gcs_hook): def test_execute_more_than_one_wildcard_exception(self, sftp_hook, gcs_hook):
task = SFTPToGoogleCloudStorageOperator( task = SFTPToGCSOperator(
task_id=TASK_ID, task_id=TASK_ID,
source_path=SOURCE_OBJECT_MULTIPLE_WILDCARDS, source_path=SOURCE_OBJECT_MULTIPLE_WILDCARDS,
destination_bucket=TEST_BUCKET, destination_bucket=TEST_BUCKET,