Add S3ToFTPOperator (#11747)
Co-authored-by: javier.lopez <javier.lopez@promocionesfarma.com> Co-authored-by: Felix Uellendall <feluelle@users.noreply.github.com> Co-authored-by: Tobiasz Kędzierski <tobiaszkedzierski@gmail.com>
This commit is contained in:
Родитель
475f1ab267
Коммит
04d278f93f
|
@ -758,7 +758,7 @@ Here is the list of packages and their extras:
|
||||||
========================== ===========================
|
========================== ===========================
|
||||||
Package Extras
|
Package Extras
|
||||||
========================== ===========================
|
========================== ===========================
|
||||||
amazon apache.hive,google,imap,mongo,mysql,postgres,ssh
|
amazon apache.hive,ftp,google,imap,mongo,mysql,postgres,ssh
|
||||||
apache.druid apache.hive
|
apache.druid apache.hive
|
||||||
apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica
|
apache.hive amazon,microsoft.mssql,mysql,presto,samba,vertica
|
||||||
apache.livy http
|
apache.livy http
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from tempfile import NamedTemporaryFile
|
||||||
|
|
||||||
|
from airflow.models import BaseOperator
|
||||||
|
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
|
||||||
|
from airflow.providers.ftp.hooks.ftp import FTPHook
|
||||||
|
from airflow.utils.decorators import apply_defaults
|
||||||
|
|
||||||
|
|
||||||
|
class S3ToFTPOperator(BaseOperator):
|
||||||
|
"""
|
||||||
|
This operator enables the transferring of files from S3 to a FTP server.
|
||||||
|
|
||||||
|
:param ftp_conn_id: The ftp connection id. The name or identifier for
|
||||||
|
establishing a connection to the FTP server.
|
||||||
|
:type ftp_conn_id: str
|
||||||
|
:param ftp_path: The ftp remote path. This is the specified file path for
|
||||||
|
uploading file to the FTP server.
|
||||||
|
:type ftp_path: str
|
||||||
|
:param s3_conn_id: The s3 connection id. The name or identifier for
|
||||||
|
establishing a connection to S3.
|
||||||
|
:type s3_conn_id: str
|
||||||
|
:param s3_bucket: The targeted s3 bucket. This is the S3 bucket from
|
||||||
|
where the file is downloaded.
|
||||||
|
:type s3_bucket: str
|
||||||
|
:param s3_key: The targeted s3 key. This is the specified file path for
|
||||||
|
downloading the file from S3.
|
||||||
|
:type s3_key: str
|
||||||
|
"""
|
||||||
|
|
||||||
|
template_fields = ('s3_bucket', 's3_key')
|
||||||
|
|
||||||
|
@apply_defaults
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
s3_bucket,
|
||||||
|
s3_key,
|
||||||
|
ftp_path,
|
||||||
|
aws_conn_id='aws_default',
|
||||||
|
ftp_conn_id='ftp_default',
|
||||||
|
**kwargs,
|
||||||
|
) -> None:
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.s3_bucket = s3_bucket
|
||||||
|
self.s3_key = s3_key
|
||||||
|
self.ftp_path = ftp_path
|
||||||
|
self.aws_conn_id = aws_conn_id
|
||||||
|
self.ftp_conn_id = ftp_conn_id
|
||||||
|
|
||||||
|
def execute(self, context):
|
||||||
|
s3_hook = S3Hook(self.aws_conn_id)
|
||||||
|
ftp_hook = FTPHook(ftp_conn_id=self.ftp_conn_id)
|
||||||
|
|
||||||
|
s3_obj = s3_hook.get_key(self.s3_key, self.s3_bucket)
|
||||||
|
|
||||||
|
with NamedTemporaryFile() as local_tmp_file:
|
||||||
|
s3_obj.download_fileobj(local_tmp_file)
|
||||||
|
ftp_hook.store_file(self.ftp_path, local_tmp_file.name)
|
|
@ -336,6 +336,9 @@ transfers:
|
||||||
- source-integration-name: SSH File Transfer Protocol (SFTP)
|
- source-integration-name: SSH File Transfer Protocol (SFTP)
|
||||||
target-integration-name: Amazon Simple Storage Service (S3)
|
target-integration-name: Amazon Simple Storage Service (S3)
|
||||||
python-module: airflow.providers.amazon.aws.transfers.sftp_to_s3
|
python-module: airflow.providers.amazon.aws.transfers.sftp_to_s3
|
||||||
|
- source-integration-name: Amazon Simple Storage Service (S3)
|
||||||
|
target-integration-name: File Transfer Protocol (FTP)
|
||||||
|
python-module: airflow.providers.amazon.aws.transfers.s3_to_ftp
|
||||||
|
|
||||||
hook-class-names:
|
hook-class-names:
|
||||||
- airflow.providers.amazon.aws.hooks.s3.S3Hook
|
- airflow.providers.amazon.aws.hooks.s3.S3Hook
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
{
|
{
|
||||||
"amazon": [
|
"amazon": [
|
||||||
"apache.hive",
|
"apache.hive",
|
||||||
|
"ftp",
|
||||||
"google",
|
"google",
|
||||||
"imap",
|
"imap",
|
||||||
"mongo",
|
"mongo",
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
#
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
# or more contributor license agreements. See the NOTICE file
|
||||||
|
# distributed with this work for additional information
|
||||||
|
# regarding copyright ownership. The ASF licenses this file
|
||||||
|
# to you under the Apache License, Version 2.0 (the
|
||||||
|
# "License"); you may not use this file except in compliance
|
||||||
|
# with the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing,
|
||||||
|
# software distributed under the License is distributed on an
|
||||||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
# KIND, either express or implied. See the License for the
|
||||||
|
# specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
import unittest
|
||||||
|
from unittest import mock
|
||||||
|
|
||||||
|
from airflow.providers.amazon.aws.transfers.s3_to_ftp import S3ToFTPOperator
|
||||||
|
|
||||||
|
TASK_ID = 'test_s3_to_ftp'
|
||||||
|
BUCKET = 'test-s3-bucket'
|
||||||
|
S3_KEY = 'test/test_1_file.csv'
|
||||||
|
FTP_PATH = '/tmp/remote_path.txt'
|
||||||
|
AWS_CONN_ID = 'aws_default'
|
||||||
|
FTP_CONN_ID = 'ftp_default'
|
||||||
|
|
||||||
|
|
||||||
|
class TestS3ToFTPOperator(unittest.TestCase):
|
||||||
|
@mock.patch("airflow.providers.ftp.hooks.ftp.FTPHook.store_file")
|
||||||
|
@mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook.get_key")
|
||||||
|
@mock.patch("airflow.providers.amazon.aws.transfers.s3_to_ftp.NamedTemporaryFile")
|
||||||
|
def test_execute(self, mock_local_tmp_file, mock_s3_hook_get_key, mock_ftp_hook_store_file):
|
||||||
|
operator = S3ToFTPOperator(task_id=TASK_ID, s3_bucket=BUCKET, s3_key=S3_KEY, ftp_path=FTP_PATH)
|
||||||
|
operator.execute(None)
|
||||||
|
|
||||||
|
mock_s3_hook_get_key.assert_called_once_with(operator.s3_key, operator.s3_bucket)
|
||||||
|
|
||||||
|
mock_local_tmp_file_value = mock_local_tmp_file.return_value.__enter__.return_value
|
||||||
|
mock_s3_hook_get_key.return_value.download_fileobj.assert_called_once_with(mock_local_tmp_file_value)
|
||||||
|
mock_ftp_hook_store_file.assert_called_once_with(operator.ftp_path, mock_local_tmp_file_value.name)
|
Загрузка…
Ссылка в новой задаче