[AIRFLOW-2825]Fix S3ToHiveTransfer bug due to case

Because upper/lower case was not considered
in the file extension check, S3ToHiveTransfer
operator may mistakenly think a GZIP file with
uppercase ext ".GZ" is not a GZIP file and
raise exception.
This commit is contained in:
XD-DENG 2018-07-31 10:30:36 +08:00
Родитель dfa7b26dda
Коммит c7e54461c6
2 изменённых файлов: 16 добавлений и 6 удалений

Просмотреть файл

@ -153,7 +153,7 @@ class S3ToHiveTransfer(BaseOperator):
root, file_ext = os.path.splitext(s3_key_object.key)
if (self.select_expression and self.input_compressed and
file_ext != '.gz'):
file_ext.lower() != '.gz'):
raise AirflowException("GZIP is the only compression " +
"format Amazon S3 Select supports")

Просмотреть файл

@ -89,6 +89,11 @@ class S3ToHiveTransferTest(unittest.TestCase):
mode="wb") as f_gz_h:
self._set_fn(fn_gz, '.gz', True)
f_gz_h.writelines([header, line1, line2])
fn_gz_upper = self._get_fn('.txt', True) + ".GZ"
with gzip.GzipFile(filename=fn_gz_upper,
mode="wb") as f_gz_upper_h:
self._set_fn(fn_gz_upper, '.GZ', True)
f_gz_upper_h.writelines([header, line1, line2])
fn_bz2 = self._get_fn('.txt', True) + '.bz2'
with bz2.BZ2File(filename=fn_bz2,
mode="wb") as f_bz2_h:
@ -105,6 +110,11 @@ class S3ToHiveTransferTest(unittest.TestCase):
mode="wb") as f_gz_nh:
self._set_fn(fn_gz, '.gz', False)
f_gz_nh.writelines([line1, line2])
fn_gz_upper = self._get_fn('.txt', False) + ".GZ"
with gzip.GzipFile(filename=fn_gz_upper,
mode="wb") as f_gz_upper_nh:
self._set_fn(fn_gz_upper, '.GZ', False)
f_gz_upper_nh.writelines([line1, line2])
fn_bz2 = self._get_fn('.txt', False) + '.bz2'
with bz2.BZ2File(filename=fn_bz2,
mode="wb") as f_bz2_nh:
@ -143,7 +153,7 @@ class S3ToHiveTransferTest(unittest.TestCase):
# gz files contain mtime and filename in the header that
# causes filecmp to return False even if contents are identical
# Hence decompress to test for equality
if(ext == '.gz'):
if(ext.lower() == '.gz'):
with gzip.GzipFile(fn_1, 'rb') as f_1,\
NamedTemporaryFile(mode='wb') as f_txt_1,\
gzip.GzipFile(fn_2, 'rb') as f_2,\
@ -220,14 +230,14 @@ class S3ToHiveTransferTest(unittest.TestCase):
conn.create_bucket(Bucket='bucket')
# Testing txt, zip, bz2 files with and without header row
for (ext, has_header) in product(['.txt', '.gz', '.bz2'], [True, False]):
for (ext, has_header) in product(['.txt', '.gz', '.bz2', '.GZ'], [True, False]):
self.kwargs['headers'] = has_header
self.kwargs['check_headers'] = has_header
logging.info("Testing {0} format {1} header".
format(ext,
('with' if has_header else 'without'))
)
self.kwargs['input_compressed'] = ext != '.txt'
self.kwargs['input_compressed'] = ext.lower() != '.txt'
self.kwargs['s3_key'] = 's3://bucket/' + self.s3_key + ext
ip_fn = self._get_fn(ext, self.kwargs['headers'])
op_fn = self._get_fn(ext, False)
@ -260,8 +270,8 @@ class S3ToHiveTransferTest(unittest.TestCase):
# Only testing S3ToHiveTransfer calls S3Hook.select_key with
# the right parameters and its execute method succeeds here,
# since Moto doesn't support select_object_content as of 1.3.2.
for (ext, has_header) in product(['.txt', '.gz'], [True, False]):
input_compressed = ext != '.txt'
for (ext, has_header) in product(['.txt', '.gz', '.GZ'], [True, False]):
input_compressed = ext.lower() != '.txt'
key = self.s3_key + ext
self.kwargs['check_headers'] = False