Dispose connections when running tasks with os.fork & CeleryExecutor (#13265)

Without this fix, when using CeleryExecutor and default config (i.e. `AIRFLOW__CORE__EXECUTE_TASKS_NEW_PYTHON_INTERPRETER=False`), tasks are run in fork and the pooled connections are shared to a forked process. This causes Celery tasks to hang infinitely (tasks will stay in queued state) with the following error:

```
[2020-12-22 18:49:39,085: WARNING/ForkPoolWorker-2] Failed to log action with (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq
```

>It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process.

Sqlalchmey docs: https://docs.sqlalchemy.org/en/14/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
This commit is contained in:
Kaxil Naik 2020-12-23 09:27:29 +00:00 коммит произвёл GitHub
Родитель cb4755f8ab
Коммит 7f8be97da4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 3 добавлений и 0 удалений

Просмотреть файл

@ -103,6 +103,9 @@ def _execute_in_fork(command_to_exec: CommandType) -> None:
try: try:
from airflow.cli.cli_parser import get_parser from airflow.cli.cli_parser import get_parser
settings.engine.pool.dispose()
settings.engine.dispose()
parser = get_parser() parser = get_parser()
# [1:] - remove "airflow" from the start of the command # [1:] - remove "airflow" from the start of the command
args = parser.parse_args(command_to_exec[1:]) args = parser.parse_args(command_to_exec[1:])