From f9308f1e7440ec716476c643906de9305e605394 Mon Sep 17 00:00:00 2001 From: LDAP/krishna_bhupatiraju Date: Fri, 24 Apr 2015 18:27:52 +0000 Subject: [PATCH] changed suggested by Max. Removed the transform_executor arg. Expecting an executable --- airflow/operators/s3_file_transform_operator.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py index add4d1c332..fbde1218a3 100644 --- a/airflow/operators/s3_file_transform_operator.py +++ b/airflow/operators/s3_file_transform_operator.py @@ -29,9 +29,7 @@ class S3FileTransformOperator(BaseOperator): :type dest_s3_conn_id: str :param replace: Replace dest S3 key if it already exists :type replace: bool - :param transform_executor: location of the executor of the transformation script - :type transform_executor: str - :param transform_script: location of the transformation script + :param transform_script: location of the executable transformation script :type transform_script: str """ @@ -47,7 +45,6 @@ class S3FileTransformOperator(BaseOperator): self, source_s3_key, dest_s3_key, - transform_executor, transform_script, source_s3_conn_id='s3_default', dest_s3_conn_id='s3_default', @@ -59,7 +56,6 @@ class S3FileTransformOperator(BaseOperator): self.dest_s3_key = dest_s3_key self.dest_s3_conn_id = dest_s3_conn_id self.replace = replace - self.transform_executor = transform_executor self.transform_script = transform_script self.source_s3 = S3Hook(s3_conn_id=source_s3_conn_id) self.dest_s3 = S3Hook(s3_conn_id=dest_s3_conn_id) @@ -75,7 +71,7 @@ class S3FileTransformOperator(BaseOperator): source_s3_key_object.get_contents_to_file(f_source) f_source.flush() self.source_s3.connection.close() - transform_script_process = subprocess.Popen([self.transform_executor, self.transform_script, f_source.name, f_dest.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + transform_script_process = subprocess.Popen([self.transform_script, f_source.name, f_dest.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate() logging.info("Transform script stdout " + transform_script_stdoutdata) if transform_script_process.returncode > 0: @@ -88,7 +84,7 @@ class S3FileTransformOperator(BaseOperator): filename=f_dest.name, key=self.dest_s3_key, replace=self.replace - ) + ) logging.info("Upload successful") self.dest_s3.connection.close()