changed suggested by Max. Removed the transform_executor arg. Expecting an executable

This commit is contained in:
LDAP/krishna_bhupatiraju 2015-04-24 18:27:52 +00:00
Родитель 375ec7ac40
Коммит f9308f1e74
1 изменённых файлов: 3 добавлений и 7 удалений

Просмотреть файл

@ -29,9 +29,7 @@ class S3FileTransformOperator(BaseOperator):
:type dest_s3_conn_id: str
:param replace: Replace dest S3 key if it already exists
:type replace: bool
:param transform_executor: location of the executor of the transformation script
:type transform_executor: str
:param transform_script: location of the transformation script
:param transform_script: location of the executable transformation script
:type transform_script: str
"""
@ -47,7 +45,6 @@ class S3FileTransformOperator(BaseOperator):
self,
source_s3_key,
dest_s3_key,
transform_executor,
transform_script,
source_s3_conn_id='s3_default',
dest_s3_conn_id='s3_default',
@ -59,7 +56,6 @@ class S3FileTransformOperator(BaseOperator):
self.dest_s3_key = dest_s3_key
self.dest_s3_conn_id = dest_s3_conn_id
self.replace = replace
self.transform_executor = transform_executor
self.transform_script = transform_script
self.source_s3 = S3Hook(s3_conn_id=source_s3_conn_id)
self.dest_s3 = S3Hook(s3_conn_id=dest_s3_conn_id)
@ -75,7 +71,7 @@ class S3FileTransformOperator(BaseOperator):
source_s3_key_object.get_contents_to_file(f_source)
f_source.flush()
self.source_s3.connection.close()
transform_script_process = subprocess.Popen([self.transform_executor, self.transform_script, f_source.name, f_dest.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
transform_script_process = subprocess.Popen([self.transform_script, f_source.name, f_dest.name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate()
logging.info("Transform script stdout " + transform_script_stdoutdata)
if transform_script_process.returncode > 0:
@ -88,7 +84,7 @@ class S3FileTransformOperator(BaseOperator):
filename=f_dest.name,
key=self.dest_s3_key,
replace=self.replace
)
)
logging.info("Upload successful")
self.dest_s3.connection.close()