[AIRFLOW-2913] Check bucket_key/bucket_name combination in S3KeySensor
When bucket_name is provided, and bucket_key is also provided as a full S3:// url, the full_url obtained eventually will be wrong. It will be like 's3://bucket_name/s3://bucket_name/object_key'. This should be avoided by adding checking and raise exception in such case. Closes #3762 from XD-DENG/patch-6
This commit is contained in:
Родитель
f2738b085e
Коммит
738d27edfe
|
@ -32,9 +32,11 @@ class S3KeySensor(BaseSensorOperator):
|
|||
a resource.
|
||||
|
||||
:param bucket_key: The key being waited on. Supports full s3:// style url
|
||||
or relative path from root level.
|
||||
or relative path from root level. When it's specified as a full s3://
|
||||
url, please leave bucket_name as `None`.
|
||||
:type bucket_key: str
|
||||
:param bucket_name: Name of the S3 bucket
|
||||
:param bucket_name: Name of the S3 bucket. Only needed when ``bucket_key``
|
||||
is not provided as a full s3:// url.
|
||||
:type bucket_name: str
|
||||
:param wildcard_match: whether the bucket_key should be interpreted as a
|
||||
Unix wildcard pattern
|
||||
|
@ -64,6 +66,12 @@ class S3KeySensor(BaseSensorOperator):
|
|||
bucket_key = parsed_url.path[1:]
|
||||
else:
|
||||
bucket_key = parsed_url.path
|
||||
else:
|
||||
parsed_url = urlparse(bucket_key)
|
||||
if parsed_url.scheme != '' or parsed_url.netloc != '':
|
||||
raise AirflowException('If bucket_name is provided, bucket_key' +
|
||||
' should be relative path from root' +
|
||||
' level, rather than a full s3:// url')
|
||||
self.bucket_name = bucket_name
|
||||
self.bucket_key = bucket_key
|
||||
self.wildcard_match = wildcard_match
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import unittest
|
||||
from airflow.exceptions import AirflowException
|
||||
from airflow.sensors.s3_key_sensor import S3KeySensor
|
||||
|
||||
|
||||
class S3KeySensorTests(unittest.TestCase):
|
||||
|
||||
def test_bucket_name_None_and_bucket_key_as_relative_path(self):
|
||||
"""
|
||||
Test if exception is raised when bucket_name is None
|
||||
and bucket_key is provided as relative path rather than s3:// url.
|
||||
:return:
|
||||
"""
|
||||
with self.assertRaises(AirflowException):
|
||||
S3KeySensor(bucket_key="file_in_bucket")
|
||||
|
||||
def test_bucket_name_provided_and_bucket_key_is_s3_url(self):
|
||||
"""
|
||||
Test if exception is raised when bucket_name is provided
|
||||
while bucket_key is provided as a full s3:// url.
|
||||
:return:
|
||||
"""
|
||||
with self.assertRaises(AirflowException):
|
||||
S3KeySensor(bucket_key="s3://test_bucket/file",
|
||||
bucket_name='test_bucket')
|
Загрузка…
Ссылка в новой задаче