Add a unit test to make sure all update_measure calls are scheduled
This commit is contained in:
Родитель
414a1b88f4
Коммит
c301aac3db
|
@ -1,4 +1,4 @@
|
|||
[pytest]
|
||||
norecursedirs = .git .* static
|
||||
addopts = -rsxX --showlocals --tb=native --nomigrations --flake8 --cov-report term --cov-report xml --cov missioncontrol
|
||||
DJANGO_SETTINGS_MODULE = missioncontrol.settings
|
||||
DJANGO_SETTINGS_MODULE = tests.settings
|
||||
|
|
14
setup.cfg
14
setup.cfg
|
@ -1,3 +1,15 @@
|
|||
[flake8]
|
||||
max-line-length=100
|
||||
exclude=missioncontrol/*/migrations/*
|
||||
exclude=
|
||||
# ignore the migrations since they are created faulty by default
|
||||
missioncontrol/*/migrations/*,
|
||||
# No use in checking the Node modules
|
||||
node_modules/*/*/*,
|
||||
# No need to traverse our git directory
|
||||
.git,
|
||||
# There's no value in checking cache directories
|
||||
__pycache__,
|
||||
# test settings uses a hacky import * import which we normally
|
||||
# want to discourage but is convenient for this one specific
|
||||
# purpose
|
||||
tests/settings.py
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
from missioncontrol.settings import *
|
||||
|
||||
# this makes celery calls synchronous, useful for unit testing
|
||||
CELERY_ALWAYS_EAGER = True
|
||||
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True
|
|
@ -3,8 +3,11 @@ import datetime
|
|||
import pytest
|
||||
from freezegun import freeze_time
|
||||
from dateutil.tz import tzutc
|
||||
from unittest.mock import (call, patch)
|
||||
|
||||
from missioncontrol.base.models import Datum
|
||||
from missioncontrol.base.models import (Channel,
|
||||
Datum,
|
||||
Measure)
|
||||
from missioncontrol.etl.date import datetime_to_utc
|
||||
|
||||
|
||||
|
@ -88,3 +91,18 @@ def test_get_measure_summary(fake_measure_data, prepopulated_version_cache):
|
|||
'version': None
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def test_all_measure_update_tasks_scheduled(initial_data, *args):
|
||||
# this test is a bit tautological, but at least exercises the function
|
||||
expected_calls = []
|
||||
for channel_name in Channel.objects.values_list('name', flat=True):
|
||||
for (measure_name, platform_name) in Measure.objects.exclude(
|
||||
platform=None).values_list('name', 'platform__name'):
|
||||
expected_calls.append(call(args=[platform_name, channel_name,
|
||||
measure_name]))
|
||||
|
||||
from missioncontrol.etl.tasks import update_measures
|
||||
with patch('missioncontrol.etl.measure.update_measure.apply_async') as mock_task:
|
||||
update_measures()
|
||||
mock_task.assert_has_calls(expected_calls, any_order=True)
|
||||
|
|
Загрузка…
Ссылка в новой задаче