Add response_filter parameter to SimpleHttpOperator (#9885)

This commit is contained in:
Dani Hodovic 2020-07-22 15:03:23 +03:00 коммит произвёл GitHub
Родитель c2db0dfeb1
Коммит ac93419d1d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 46 добавлений и 0 удалений

Просмотреть файл

@ -71,6 +71,15 @@ task_get_op = SimpleHttpOperator(
dag=dag,
)
# [END howto_operator_http_task_get_op]
# [START howto_operator_http_task_get_op_response_filter]
task_get_op = SimpleHttpOperator(
task_id='get_op_response_filter',
method='GET',
endpoint='get',
response_filter=lambda response: response.json()['nested']['property'],
dag=dag,
)
# [END howto_operator_http_task_get_op_response_filter]
# [START howto_operator_http_task_put_op]
task_put_op = SimpleHttpOperator(
task_id='put_op',

Просмотреть файл

@ -17,6 +17,8 @@
# under the License.
from typing import Any, Callable, Dict, Optional
import requests
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.http.hooks.http import HttpHook
@ -46,6 +48,9 @@ class SimpleHttpOperator(BaseOperator):
:param response_check: A check against the 'requests' response object.
Returns True for 'pass' and False otherwise.
:type response_check: A lambda or defined function.
:param response_filter: A function allowing you to manipulate the response
text. e.g response_filter=lambda response: json.loads(response.text)
:type response_filter: A lambda or defined function.
:param extra_options: Extra options for the 'requests' library, see the
'requests' documentation (options to modify timeout, ssl, etc.)
:type extra_options: A dictionary of options, where key is string and value
@ -65,6 +70,7 @@ class SimpleHttpOperator(BaseOperator):
data: Any = None,
headers: Optional[Dict[str, str]] = None,
response_check: Optional[Callable[..., Any]] = None,
response_filter: Optional[Callable[[requests.Response], Any]] = None,
extra_options: Optional[Dict[str, Any]] = None,
http_conn_id: str = 'http_default',
log_response: bool = False,
@ -76,6 +82,7 @@ class SimpleHttpOperator(BaseOperator):
self.headers = headers or {}
self.data = data or {}
self.response_check = response_check
self.response_filter = response_filter
self.extra_options = extra_options or {}
self.log_response = log_response
if kwargs.get('xcom_push') is not None:
@ -95,4 +102,6 @@ class SimpleHttpOperator(BaseOperator):
if self.response_check:
if not self.response_check(response):
raise AirflowException("Response check returned False.")
if self.response_filter:
return self.response_filter(response)
return response.text

Просмотреть файл

@ -59,6 +59,21 @@ Here we are calling a ``GET`` request and pass params to it. The task will succe
:start-after: [START howto_operator_http_task_get_op]
:end-before: [END howto_operator_http_task_get_op]
SimpleHttpOperator returns the response body as text by default. If you want to modify the response before passing
it on the next task downstream use ``response_filter``. This is useful if:
- the API you are consuming returns a large JSON payload and you're interested in a subset of the data
- the API returns data in xml or csv and you want to convert it to JSON
- you're interested in the headers of the response instead of the body
Below is an example of retrieving data from a REST API and only returning a nested property instead of the full
response body.
.. exampleinclude:: /../airflow/providers/http/example_dags/example_http.py
:language: python
:start-after: [START howto_operator_http_task_get_op_response_filter]
:end-before: [END howto_operator_http_task_get_op_response_filter]
In the third example we are performing a ``PUT`` operation to put / set data according to the data that is being
provided to the request.

Просмотреть файл

@ -79,3 +79,16 @@ class TestSimpleHttpOp(unittest.TestCase):
mock.call('invalid response')
]
mock_info.assert_has_calls(calls, any_order=True)
@requests_mock.mock()
def test_filters_response(self, m):
m.get('http://www.example.com', json={'value': 5})
operator = SimpleHttpOperator(
task_id='test_HTTP_op',
method='GET',
endpoint='/',
http_conn_id='HTTP_EXAMPLE',
response_filter=lambda response: response.json()
)
result = operator.execute(None)
assert result == {'value': 5}