[AIRFLOW-3069] Log all output of the S3 file transform script (#3914)

The output of the process spawned by S3FileTransformOperator is not
properly decoded, which makes reading logs rather difficult. Additonally,
the stderr stream is only shown when process exit code is not equal to 0.

- Send both output streams (stdout & stderr) to the logger.
- Decode the output, so that new lines can be displayed correctly.
- Include process exit code in the exception message, if the process fails.
- Remove a potential deadlock, caused by `stderr=subprocess.PIPE`.
- Don't load all output into memory.
This commit is contained in:
Szymon Biliński 2018-10-15 07:41:35 +02:00 коммит произвёл Fokko Driesprong
Родитель 1464c8edc3
Коммит fac5a8e623
2 изменённых файлов: 61 добавлений и 11 удалений

Просмотреть файл

@ -19,6 +19,7 @@
from tempfile import NamedTemporaryFile
import subprocess
import sys
from airflow.exceptions import AirflowException
from airflow.hooks.S3_hook import S3Hook
@ -98,6 +99,7 @@ class S3FileTransformOperator(BaseOperator):
self.replace = replace
self.transform_script = transform_script
self.select_expression = select_expression
self.output_encoding = sys.getdefaultencoding()
def execute(self, context):
if self.transform_script is None and self.select_expression is None:
@ -132,15 +134,23 @@ class S3FileTransformOperator(BaseOperator):
f_source.flush()
if self.transform_script is not None:
transform_script_process = subprocess.Popen(
process = subprocess.Popen(
[self.transform_script, f_source.name, f_dest.name],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
(transform_script_stdoutdata, transform_script_stderrdata) = \
transform_script_process.communicate()
self.log.info("Transform script stdout %s", transform_script_stdoutdata)
if transform_script_process.returncode > 0:
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True
)
self.log.info("Output:")
for line in iter(process.stdout.readline, b''):
self.log.info(line.decode(self.output_encoding).rstrip())
process.wait()
if process.returncode > 0:
raise AirflowException(
"Transform script failed %s", transform_script_stderrdata)
"Transform script failed: {0}".format(process.returncode)
)
else:
self.log.info(
"Transform script successful. Output temporarily located at %s",

Просмотреть файл

@ -22,6 +22,7 @@ import errno
import io
import os
import shutil
import sys
import unittest
from tempfile import mkdtemp
@ -29,6 +30,7 @@ import boto3
import mock
from moto import mock_s3
from airflow.exceptions import AirflowException
from airflow.operators.s3_file_transform_operator import S3FileTransformOperator
@ -48,11 +50,15 @@ class TestS3FileTransformOperator(unittest.TestCase):
raise e
@mock.patch('subprocess.Popen')
@mock.patch.object(S3FileTransformOperator, 'log')
@mock_s3
def test_execute_with_transform_script(self, mock_Popen):
transform_script_process = mock_Popen.return_value
transform_script_process.communicate.return_value = [None, None]
transform_script_process.returncode = 0
def test_execute_with_transform_script(self, mock_log, mock_Popen):
process_output = [b"Foo", b"Bar", b"Baz"]
process = mock_Popen.return_value
process.stdout.readline.side_effect = process_output
process.wait.return_value = None
process.returncode = 0
bucket = "bucket"
input_key = "foo"
@ -72,6 +78,40 @@ class TestS3FileTransformOperator(unittest.TestCase):
task_id="task_id")
t.execute(None)
mock_log.info.assert_has_calls([
mock.call(line.decode(sys.getdefaultencoding())) for line in process_output
])
@mock.patch('subprocess.Popen')
@mock_s3
def test_execute_with_failing_transform_script(self, mock_Popen):
process = mock_Popen.return_value
process.stdout.readline.side_effect = []
process.wait.return_value = None
process.returncode = 42
bucket = "bucket"
input_key = "foo"
output_key = "bar"
bio = io.BytesIO(b"input")
conn = boto3.client('s3')
conn.create_bucket(Bucket=bucket)
conn.upload_fileobj(Bucket=bucket, Key=input_key, Fileobj=bio)
s3_url = "s3://{0}/{1}"
t = S3FileTransformOperator(
source_s3_key=s3_url.format(bucket, input_key),
dest_s3_key=s3_url.format(bucket, output_key),
transform_script=self.transform_script,
replace=True,
task_id="task_id")
with self.assertRaises(AirflowException) as e:
t.execute(None)
self.assertEqual('Transform script failed: 42', str(e.exception))
@mock.patch('airflow.hooks.S3_hook.S3Hook.select_key', return_value="input")
@mock_s3
def test_execute_with_select_expression(self, mock_select_key):