[AIRFLOW-2299] Add S3 Select functionarity to S3FileTransformOperator

Currently, S3FileTransformOperator downloads the
whole file from S3
before transforming and uploading it. Adding
extraction feature using
S3 Select to this operator improves its efficiency
and usablitily.

Closes #3227 from sekikn/AIRFLOW-2299
This commit is contained in:
Kengo Seki 2018-04-17 10:53:05 +02:00 коммит произвёл Fokko Driesprong
Родитель a148043107
Коммит 6e82f1d7c9
5 изменённых файлов: 121 добавлений и 18 удалений

Просмотреть файл

@ -177,6 +177,46 @@ class S3Hook(AwsHook):
obj = self.get_key(key, bucket_name)
return obj.get()['Body'].read().decode('utf-8')
def select_key(self, key, bucket_name=None,
expression='SELECT * FROM S3Object',
expression_type='SQL',
input_serialization={'CSV': {}},
output_serialization={'CSV': {}}):
"""
Reads a key with S3 Select.
:param key: S3 key that will point to the file
:type key: str
:param bucket_name: Name of the bucket in which the file is stored
:type bucket_name: str
:param expression: S3 Select expression
:type expression: str
:param expression_type: S3 Select expression type
:type expression_type: str
:param input_serialization: S3 Select input data serialization format
:type input_serialization: str
:param output_serialization: S3 Select output data serialization format
:type output_serialization: str
.. seealso::
For more details about S3 Select parameters:
http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.select_object_content
"""
if not bucket_name:
(bucket_name, key) = self.parse_s3_url(key)
response = self.get_conn().select_object_content(
Bucket=bucket_name,
Key=key,
Expression=expression,
ExpressionType=expression_type,
InputSerialization=input_serialization,
OutputSerialization=output_serialization)
return ''.join(event['Records']['Payload']
for event in response['Payload']
if 'Records' in event)
def check_for_wildcard_key(self,
wildcard_key, bucket_name=None, delimiter=''):
"""

Просмотреть файл

@ -36,10 +36,13 @@ class S3FileTransformOperator(BaseOperator):
The locations of the source and the destination files in the local
filesystem is provided as an first and second arguments to the
transformation script. The transformation script is expected to read the
data from source , transform it and write the output to the local
data from source, transform it and write the output to the local
destination file. The operator then takes over control and uploads the
local destination file to S3.
S3 Select is also available to filter the source contents. Users can
omit the transformation script if S3 Select expression is specified.
:param source_s3_key: The key to be retrieved from S3
:type source_s3_key: str
:param source_aws_conn_id: source s3 connection
@ -52,6 +55,8 @@ class S3FileTransformOperator(BaseOperator):
:type replace: bool
:param transform_script: location of the executable transformation script
:type transform_script: str
:param select_expression: S3 Select expression
:type select_expression: str
"""
template_fields = ('source_s3_key', 'dest_s3_key')
@ -63,7 +68,8 @@ class S3FileTransformOperator(BaseOperator):
self,
source_s3_key,
dest_s3_key,
transform_script,
transform_script=None,
select_expression=None,
source_aws_conn_id='aws_default',
dest_aws_conn_id='aws_default',
replace=False,
@ -75,33 +81,54 @@ class S3FileTransformOperator(BaseOperator):
self.dest_aws_conn_id = dest_aws_conn_id
self.replace = replace
self.transform_script = transform_script
self.select_expression = select_expression
def execute(self, context):
if self.transform_script is None and self.select_expression is None:
raise AirflowException(
"Either transform_script or select_expression must be specified")
source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id)
dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id)
self.log.info("Downloading source S3 file %s", self.source_s3_key)
if not source_s3.check_for_key(self.source_s3_key):
raise AirflowException("The source key {0} does not exist".format(self.source_s3_key))
raise AirflowException(
"The source key {0} does not exist".format(self.source_s3_key))
source_s3_key_object = source_s3.get_key(self.source_s3_key)
with NamedTemporaryFile("wb") as f_source, NamedTemporaryFile("wb") as f_dest:
self.log.info(
"Dumping S3 file %s contents to local file %s",
self.source_s3_key, f_source.name
)
source_s3_key_object.download_fileobj(Fileobj=f_source)
f_source.flush()
transform_script_process = subprocess.Popen(
[self.transform_script, f_source.name, f_dest.name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
(transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate()
self.log.info("Transform script stdout %s", transform_script_stdoutdata)
if transform_script_process.returncode > 0:
raise AirflowException("Transform script failed %s", transform_script_stderrdata)
else:
self.log.info(
"Transform script successful. Output temporarily located at %s",
f_dest.name
if self.select_expression is not None:
content = source_s3.select_key(
key=self.source_s3_key,
expression=self.select_expression
)
f_source.write(content.encode("utf-8"))
else:
source_s3_key_object.download_fileobj(Fileobj=f_source)
f_source.flush()
if self.transform_script is not None:
transform_script_process = subprocess.Popen(
[self.transform_script, f_source.name, f_dest.name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
(transform_script_stdoutdata, transform_script_stderrdata) = \
transform_script_process.communicate()
self.log.info("Transform script stdout %s", transform_script_stdoutdata)
if transform_script_process.returncode > 0:
raise AirflowException(
"Transform script failed %s", transform_script_stderrdata)
else:
self.log.info(
"Transform script successful. Output temporarily located at %s",
f_dest.name
)
self.log.info("Uploading transformed file to S3")
f_dest.flush()
dest_s3.load_file(

Просмотреть файл

@ -155,7 +155,7 @@ oracle = ['cx_Oracle>=5.1.2']
postgres = ['psycopg2-binary>=2.7.4']
ssh = ['paramiko>=2.1.1', 'pysftp>=0.2.9']
salesforce = ['simple-salesforce>=0.72']
s3 = ['boto3>=1.4.0']
s3 = ['boto3>=1.7.0']
samba = ['pysmbclient>=0.1.3']
slack = ['slackclient>=1.0.0']
statsd = ['statsd>=3.0.1, <4.0']

Просмотреть файл

@ -18,6 +18,7 @@
# under the License.
#
import mock
import unittest
from airflow import configuration
@ -162,6 +163,13 @@ class TestS3Hook(unittest.TestCase):
self.assertEqual(hook.read_key('my_key', 'mybucket'), u'Contént')
# As of 1.3.2, Moto doesn't support select_object_content yet.
@mock.patch('airflow.contrib.hooks.aws_hook.AwsHook.get_client_type')
def test_select_key(self, mock_get_client_type):
mock_get_client_type.return_value.select_object_content.return_value = \
{'Payload': [{'Records': {'Payload': u'Contént'}}]}
hook = S3Hook(aws_conn_id=None)
self.assertEqual(hook.select_key('my_key', 'mybucket'), u'Contént')
@mock_s3
def test_check_for_wildcard_key(self):

Просмотреть файл

@ -49,7 +49,7 @@ class TestS3FileTransformOperator(unittest.TestCase):
@mock.patch('subprocess.Popen')
@mock_s3
def test_execute(self, mock_Popen):
def test_execute_with_transform_script(self, mock_Popen):
transform_script_process = mock_Popen.return_value
transform_script_process.communicate.return_value = [None, None]
transform_script_process.returncode = 0
@ -68,5 +68,33 @@ class TestS3FileTransformOperator(unittest.TestCase):
source_s3_key=s3_url.format(bucket, input_key),
dest_s3_key=s3_url.format(bucket, output_key),
transform_script=self.transform_script,
replace=True,
task_id="task_id")
t.execute(None)
@mock.patch('airflow.hooks.S3_hook.S3Hook.select_key', return_value="input")
@mock_s3
def test_execute_with_select_expression(self, mock_select_key):
bucket = "bucket"
input_key = "foo"
output_key = "bar"
bio = io.BytesIO(b"input")
conn = boto3.client('s3')
conn.create_bucket(Bucket=bucket)
conn.upload_fileobj(Bucket=bucket, Key=input_key, Fileobj=bio)
s3_url = "s3://{0}/{1}"
select_expression = "SELECT * FROM S3Object s"
t = S3FileTransformOperator(
source_s3_key=s3_url.format(bucket, input_key),
dest_s3_key=s3_url.format(bucket, output_key),
select_expression=select_expression,
replace=True,
task_id="task_id")
t.execute(None)
mock_select_key.assert_called_once_with(
key=s3_url.format(bucket, input_key),
expression=select_expression
)