Fix race in Celery tests by pre-creating result tables (#8909)

We noticed our Celery tests failing sometimes with

> (psycopg2.errors.UniqueViolation) duplicate key value violates unique
> constraint "pg_type_typname_nsp_index"
> DETAIL:  Key (typname, typnamespace)=(celery_tasksetmeta, 2200) already exists

It appears this is a race condition in SQLAlchemy's "create_all()"
function, where it first checks which tables exist, builds up a list of
`CREATE TABLE` statements, then issues them. Thus if two celery worker
processes start at the same time, they will find the the table doesn't
yet exist, and both try to create it.

This is _probably_ a bug in SQLA, but this should be an easy enough fix
here, to just ensure that the table exists before launching any Celery tasks.
This commit is contained in:
Ash Berlin-Taylor 2020-05-19 13:21:44 +01:00 коммит произвёл GitHub
Родитель 375d1ca229
Коммит bae5cc2f5c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 12 добавлений и 0 удалений

Просмотреть файл

@ -75,6 +75,17 @@ def _prepare_app(broker_url=None, execute=None):
patch_app = mock.patch('airflow.executors.celery_executor.app', test_app)
patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
backend = test_app.backend
if hasattr(backend, 'ResultSession'):
# Pre-create the database tables now, otherwise SQLA vis Celery has a
# race condition where it one of the subprocesses can die with "Table
# already exists" error, because SQLA checks for which tables exist,
# then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT
# EXISTS
session = backend.ResultSession()
session.close()
with patch_app, patch_execute:
try:
yield test_app
@ -165,6 +176,7 @@ class TestCeleryExecutor(unittest.TestCase):
self.assertEqual(1, len(executor.queued_tasks))
self.assertEqual(executor.queued_tasks[key], value_tuple)
@pytest.mark.backend("mysql", "postgres")
def test_exception_propagation(self):
with _prepare_app(), self.assertLogs(celery_executor.log) as cm: