From 6e82f1d7c9fa391c636a0155cdb19aa6cbda0821 Mon Sep 17 00:00:00 2001 From: Kengo Seki Date: Tue, 17 Apr 2018 10:53:05 +0200 Subject: [PATCH] [AIRFLOW-2299] Add S3 Select functionarity to S3FileTransformOperator Currently, S3FileTransformOperator downloads the whole file from S3 before transforming and uploading it. Adding extraction feature using S3 Select to this operator improves its efficiency and usablitily. Closes #3227 from sekikn/AIRFLOW-2299 --- airflow/hooks/S3_hook.py | 40 +++++++++++++ .../operators/s3_file_transform_operator.py | 59 ++++++++++++++----- setup.py | 2 +- tests/hooks/test_s3_hook.py | 8 +++ .../test_s3_file_transform_operator.py | 30 +++++++++- 5 files changed, 121 insertions(+), 18 deletions(-) diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index f75f5e62e8..7a4b8b0d21 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -177,6 +177,46 @@ class S3Hook(AwsHook): obj = self.get_key(key, bucket_name) return obj.get()['Body'].read().decode('utf-8') + def select_key(self, key, bucket_name=None, + expression='SELECT * FROM S3Object', + expression_type='SQL', + input_serialization={'CSV': {}}, + output_serialization={'CSV': {}}): + """ + Reads a key with S3 Select. + + :param key: S3 key that will point to the file + :type key: str + :param bucket_name: Name of the bucket in which the file is stored + :type bucket_name: str + :param expression: S3 Select expression + :type expression: str + :param expression_type: S3 Select expression type + :type expression_type: str + :param input_serialization: S3 Select input data serialization format + :type input_serialization: str + :param output_serialization: S3 Select output data serialization format + :type output_serialization: str + + .. seealso:: + For more details about S3 Select parameters: + http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.select_object_content + """ + if not bucket_name: + (bucket_name, key) = self.parse_s3_url(key) + + response = self.get_conn().select_object_content( + Bucket=bucket_name, + Key=key, + Expression=expression, + ExpressionType=expression_type, + InputSerialization=input_serialization, + OutputSerialization=output_serialization) + + return ''.join(event['Records']['Payload'] + for event in response['Payload'] + if 'Records' in event) + def check_for_wildcard_key(self, wildcard_key, bucket_name=None, delimiter=''): """ diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index 1d39ace797..67286b0ae7 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -36,10 +36,13 @@ class S3FileTransformOperator(BaseOperator): The locations of the source and the destination files in the local filesystem is provided as an first and second arguments to the transformation script. The transformation script is expected to read the - data from source , transform it and write the output to the local + data from source, transform it and write the output to the local destination file. The operator then takes over control and uploads the local destination file to S3. + S3 Select is also available to filter the source contents. Users can + omit the transformation script if S3 Select expression is specified. + :param source_s3_key: The key to be retrieved from S3 :type source_s3_key: str :param source_aws_conn_id: source s3 connection @@ -52,6 +55,8 @@ class S3FileTransformOperator(BaseOperator): :type replace: bool :param transform_script: location of the executable transformation script :type transform_script: str + :param select_expression: S3 Select expression + :type select_expression: str """ template_fields = ('source_s3_key', 'dest_s3_key') @@ -63,7 +68,8 @@ class S3FileTransformOperator(BaseOperator): self, source_s3_key, dest_s3_key, - transform_script, + transform_script=None, + select_expression=None, source_aws_conn_id='aws_default', dest_aws_conn_id='aws_default', replace=False, @@ -75,33 +81,54 @@ class S3FileTransformOperator(BaseOperator): self.dest_aws_conn_id = dest_aws_conn_id self.replace = replace self.transform_script = transform_script + self.select_expression = select_expression def execute(self, context): + if self.transform_script is None and self.select_expression is None: + raise AirflowException( + "Either transform_script or select_expression must be specified") + source_s3 = S3Hook(aws_conn_id=self.source_aws_conn_id) dest_s3 = S3Hook(aws_conn_id=self.dest_aws_conn_id) + self.log.info("Downloading source S3 file %s", self.source_s3_key) if not source_s3.check_for_key(self.source_s3_key): - raise AirflowException("The source key {0} does not exist".format(self.source_s3_key)) + raise AirflowException( + "The source key {0} does not exist".format(self.source_s3_key)) source_s3_key_object = source_s3.get_key(self.source_s3_key) + with NamedTemporaryFile("wb") as f_source, NamedTemporaryFile("wb") as f_dest: self.log.info( "Dumping S3 file %s contents to local file %s", self.source_s3_key, f_source.name ) - source_s3_key_object.download_fileobj(Fileobj=f_source) - f_source.flush() - transform_script_process = subprocess.Popen( - [self.transform_script, f_source.name, f_dest.name], - stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) - (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate() - self.log.info("Transform script stdout %s", transform_script_stdoutdata) - if transform_script_process.returncode > 0: - raise AirflowException("Transform script failed %s", transform_script_stderrdata) - else: - self.log.info( - "Transform script successful. Output temporarily located at %s", - f_dest.name + + if self.select_expression is not None: + content = source_s3.select_key( + key=self.source_s3_key, + expression=self.select_expression ) + f_source.write(content.encode("utf-8")) + else: + source_s3_key_object.download_fileobj(Fileobj=f_source) + f_source.flush() + + if self.transform_script is not None: + transform_script_process = subprocess.Popen( + [self.transform_script, f_source.name, f_dest.name], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) + (transform_script_stdoutdata, transform_script_stderrdata) = \ + transform_script_process.communicate() + self.log.info("Transform script stdout %s", transform_script_stdoutdata) + if transform_script_process.returncode > 0: + raise AirflowException( + "Transform script failed %s", transform_script_stderrdata) + else: + self.log.info( + "Transform script successful. Output temporarily located at %s", + f_dest.name + ) + self.log.info("Uploading transformed file to S3") f_dest.flush() dest_s3.load_file( diff --git a/setup.py b/setup.py index 8e7b939aff..700f3ae699 100644 --- a/setup.py +++ b/setup.py @@ -155,7 +155,7 @@ oracle = ['cx_Oracle>=5.1.2'] postgres = ['psycopg2-binary>=2.7.4'] ssh = ['paramiko>=2.1.1', 'pysftp>=0.2.9'] salesforce = ['simple-salesforce>=0.72'] -s3 = ['boto3>=1.4.0'] +s3 = ['boto3>=1.7.0'] samba = ['pysmbclient>=0.1.3'] slack = ['slackclient>=1.0.0'] statsd = ['statsd>=3.0.1, <4.0'] diff --git a/tests/hooks/test_s3_hook.py b/tests/hooks/test_s3_hook.py index e3e0eb50cd..94d0e36c08 100644 --- a/tests/hooks/test_s3_hook.py +++ b/tests/hooks/test_s3_hook.py @@ -18,6 +18,7 @@ # under the License. # +import mock import unittest from airflow import configuration @@ -162,6 +163,13 @@ class TestS3Hook(unittest.TestCase): self.assertEqual(hook.read_key('my_key', 'mybucket'), u'Contént') + # As of 1.3.2, Moto doesn't support select_object_content yet. + @mock.patch('airflow.contrib.hooks.aws_hook.AwsHook.get_client_type') + def test_select_key(self, mock_get_client_type): + mock_get_client_type.return_value.select_object_content.return_value = \ + {'Payload': [{'Records': {'Payload': u'Contént'}}]} + hook = S3Hook(aws_conn_id=None) + self.assertEqual(hook.select_key('my_key', 'mybucket'), u'Contént') @mock_s3 def test_check_for_wildcard_key(self): diff --git a/tests/operators/test_s3_file_transform_operator.py b/tests/operators/test_s3_file_transform_operator.py index 8b1ec8685c..6b72c58f78 100644 --- a/tests/operators/test_s3_file_transform_operator.py +++ b/tests/operators/test_s3_file_transform_operator.py @@ -49,7 +49,7 @@ class TestS3FileTransformOperator(unittest.TestCase): @mock.patch('subprocess.Popen') @mock_s3 - def test_execute(self, mock_Popen): + def test_execute_with_transform_script(self, mock_Popen): transform_script_process = mock_Popen.return_value transform_script_process.communicate.return_value = [None, None] transform_script_process.returncode = 0 @@ -68,5 +68,33 @@ class TestS3FileTransformOperator(unittest.TestCase): source_s3_key=s3_url.format(bucket, input_key), dest_s3_key=s3_url.format(bucket, output_key), transform_script=self.transform_script, + replace=True, task_id="task_id") t.execute(None) + + @mock.patch('airflow.hooks.S3_hook.S3Hook.select_key', return_value="input") + @mock_s3 + def test_execute_with_select_expression(self, mock_select_key): + bucket = "bucket" + input_key = "foo" + output_key = "bar" + bio = io.BytesIO(b"input") + + conn = boto3.client('s3') + conn.create_bucket(Bucket=bucket) + conn.upload_fileobj(Bucket=bucket, Key=input_key, Fileobj=bio) + + s3_url = "s3://{0}/{1}" + select_expression = "SELECT * FROM S3Object s" + t = S3FileTransformOperator( + source_s3_key=s3_url.format(bucket, input_key), + dest_s3_key=s3_url.format(bucket, output_key), + select_expression=select_expression, + replace=True, + task_id="task_id") + t.execute(None) + + mock_select_key.assert_called_once_with( + key=s3_url.format(bucket, input_key), + expression=select_expression + )