From 7f8be97da4ae6ce57579ad3ce055cb0569d16309 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 23 Dec 2020 09:27:29 +0000 Subject: [PATCH] Dispose connections when running tasks with os.fork & CeleryExecutor (#13265) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Without this fix, when using CeleryExecutor and default config (i.e. `AIRFLOW__CORE__EXECUTE_TASKS_NEW_PYTHON_INTERPRETER=False`), tasks are run in fork and the pooled connections are shared to a forked process. This causes Celery tasks to hang infinitely (tasks will stay in queued state) with the following error: ``` [2020-12-22 18:49:39,085: WARNING/ForkPoolWorker-2] Failed to log action with (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq ``` >It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process. Sqlalchmey docs: https://docs.sqlalchemy.org/en/14/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork --- airflow/executors/celery_executor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index ad5c76e5bf..8bbaed15ae 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -103,6 +103,9 @@ def _execute_in_fork(command_to_exec: CommandType) -> None: try: from airflow.cli.cli_parser import get_parser + settings.engine.pool.dispose() + settings.engine.dispose() + parser = get_parser() # [1:] - remove "airflow" from the start of the command args = parser.parse_args(command_to_exec[1:])