Add option to enable TCP keepalive for communication with Kubernetes API (#11406)

* Add option to enable TCP keepalive mechanism for communication with Kubernetes API

* Add keepalive default options to default_airflow.cfg

* Add reference to PR

* Quote parameters names in configuration

* Add problematic words to spelling_wordlist.txt
This commit is contained in:
Michał Misiewicz 2020-10-12 17:19:20 +02:00 коммит произвёл GitHub
Родитель 32f2a45819
Коммит da565c9019
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 85 добавлений и 0 удалений

Просмотреть файл

@ -2027,6 +2027,39 @@
type: string
example: '{"grace_period_seconds": 10}'
default: ""
- name: enable_tcp_keepalive
description: |
Enables TCP keepalive mechanism. This prevents Kubernetes API requests to hang indefinitely
when idle connection is time-outed on services like cloud load balancers or firewalls.
version_added: ~
type: boolean
example: ~
default: "False"
- name: tcp_keep_idle
description: |
When the `enable_tcp_keepalive` option is enabled, TCP probes a connection that has
been idle for `tcp_keep_idle` seconds.
version_added: ~
type: int
example: ~
default: "120"
- name: tcp_keep_intvl
description: |
When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond
to a keepalive probe, TCP retransmits the probe after `tcp_keep_intvl` seconds.
version_added: ~
type: int
example: ~
default: "30"
- name: tcp_keep_cnt
description: |
When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond
to a keepalive probe, TCP retransmits the probe `tcp_keep_cnt number` of times before
a connection is considered to be broken.
version_added: ~
type: int
example: ~
default: "6"
- name: smart_sensor
description: ~
options:

Просмотреть файл

@ -982,6 +982,23 @@ kube_client_request_args =
# Example: delete_option_kwargs = {{"grace_period_seconds": 10}}
delete_option_kwargs =
# Enables TCP keepalive mechanism. This prevents Kubernetes API requests to hang indefinitely
# when idle connection is time-outed on services like cloud load balancers or firewalls.
enable_tcp_keepalive = False
# When the `enable_tcp_keepalive` option is enabled, TCP probes a connection that has
# been idle for `tcp_keep_idle` seconds.
tcp_keep_idle = 120
# When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond
# to a keepalive probe, TCP retransmits the probe after `tcp_keep_intvl` seconds.
tcp_keep_intvl = 30
# When the `enable_tcp_keepalive` option is enabled, if Kubernetes API does not respond
# to a keepalive probe, TCP retransmits the probe `tcp_keep_cnt number` of times before
# a connection is considered to be broken.
tcp_keep_cnt = 6
[smart_sensor]
# When `use_smart_sensor` is True, Airflow redirects multiple qualified sensor tasks to
# smart sensor task.

Просмотреть файл

@ -65,6 +65,33 @@ except ImportError as e:
_import_err = e
def _enable_tcp_keepalive() -> None:
"""
This function enables TCP keepalive mechanism. This prevents urllib3 connection
to hang indefinitely when idle connection is time-outed on services like cloud
load balancers or firewalls.
See https://github.com/apache/airflow/pull/11406 for detailed explanation.
Please ping @michalmisiewicz or @dimberman in the PR if you want to modify this function.
"""
import socket
from urllib3.connection import HTTPConnection, HTTPSConnection
tcp_keep_idle = conf.get('kubernetes', 'tcp_keep_idle', fallback=120)
tcp_keep_intvl = conf.get('kubernetes', 'tcp_keep_intvl', fallback=30)
tcp_keep_cnt = conf.get('kubernetes', 'tcp_keep_cnt', fallback=6)
socket_options = [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, tcp_keep_idle),
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, tcp_keep_intvl),
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, tcp_keep_cnt),
]
HTTPSConnection.default_socket_options = HTTPSConnection.default_socket_options + socket_options
HTTPConnection.default_socket_options = HTTPConnection.default_socket_options + socket_options
def get_kube_client(in_cluster: bool = conf.getboolean('kubernetes', 'in_cluster'),
cluster_context: Optional[str] = None,
config_file: Optional[str] = None) -> client.CoreV1Api:
@ -89,5 +116,8 @@ def get_kube_client(in_cluster: bool = conf.getboolean('kubernetes', 'in_cluster
if config_file is None:
config_file = conf.get('kubernetes', 'config_file', fallback=None)
if conf.getboolean('kubernetes', 'enable_tcp_keepalive', fallback=False):
_enable_tcp_keepalive()
client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
return _get_client_with_patched_configuration(client_conf)

Просмотреть файл

@ -472,6 +472,7 @@ backreference
backtick
backticks
balancer
balancers
baseOperator
basedn
basestring
@ -538,6 +539,7 @@ cmdline
cmds
cname
cnf
cnt
codebase
codecov
colour
@ -829,6 +831,7 @@ integrations
interdependencies
internalIpOnly
ints
intvl
io
ip
isfile
@ -1103,6 +1106,7 @@ resetdb
resourceVersion
resumable
resultset
retransmits
rfc
ricard
rideable
@ -1236,6 +1240,7 @@ tagValue
tao
taskinstance
tblproperties
tcp
templatable
templateable
templated