diff --git a/tests/model/derived/test_jobs_model.py b/tests/model/derived/test_jobs_model.py index 2030d3729..7d299346f 100644 --- a/tests/model/derived/test_jobs_model.py +++ b/tests/model/derived/test_jobs_model.py @@ -6,11 +6,14 @@ import time import json import pytest import copy +import threading +from django.conf import settings from django.core.management import call_command from treeherder.model.derived.base import DatasetNotFoundError from treeherder.model.derived import ArtifactsModel +from treeherder.model.derived.jobs import JobsModel from tests.sample_data_generator import job_data, result_set from tests import test_utils @@ -18,6 +21,20 @@ slow = pytest.mark.slow xfail = pytest.mark.xfail +class FakePerfData(object): + SERIES = [{'geomean': 1, 'result_set_id': 1, + 'push_timestamp': int(time.time())}] + TIME_INTERVAL = 86400 + SIGNATURE = 'cheezburger' + SERIES_TYPE = 'talos_data' + + @staticmethod + def get_fake_lock_string(): + return 'sps_{}_{}_{}'.format(FakePerfData.TIME_INTERVAL, + FakePerfData.SERIES_TYPE, + FakePerfData.SIGNATURE) + + def test_unicode(jm): """Unicode representation of a ``JobModel`` is the project name.""" assert unicode(jm) == unicode(jm.project) @@ -447,6 +464,80 @@ def test_store_performance_artifact( assert performance_artifact_signatures == series_signatures +def test_store_performance_series(jm, test_project): + + # basic case: everything works as expected + jm.store_performance_series(FakePerfData.TIME_INTERVAL, + FakePerfData.SERIES_TYPE, + FakePerfData.SIGNATURE, + FakePerfData.SERIES) + stored_series = jm.get_jobs_dhub().execute( + proc="jobs.selects.get_performance_series", + placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE]) + blob = json.loads(stored_series[0]['blob']) + assert len(blob) == 1 + assert blob[0] == FakePerfData.SERIES[0] + + jm.disconnect() + + +def test_store_performance_series_timeout_recover(jm, test_project): + # timeout case 1: a lock is on our series, but it will expire + + # use a thread to simulate locking and then unlocking the table + # FIXME: this is rather fragile and depends on the thread being + # run more or less immediately so that the lock is engaged + def _lock_unlock(): + with JobsModel(test_project) as jm2: + jm2.get_jobs_dhub().execute( + proc='generic.locks.get_lock', + placeholders=[FakePerfData.get_fake_lock_string()]) + time.sleep(1) + jm2.get_jobs_dhub().execute( + proc='generic.locks.release_lock', + placeholders=[FakePerfData.get_fake_lock_string()]) + t = threading.Thread(target=_lock_unlock) + t.start() + + # will fail at first due to lock, but we should recover and insert + jm.store_performance_series(FakePerfData.TIME_INTERVAL, + FakePerfData.SERIES_TYPE, + FakePerfData.SIGNATURE, + FakePerfData.SERIES) + t.join() + stored_series = jm.get_jobs_dhub().execute( + proc="jobs.selects.get_performance_series", + placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE]) + + blob = json.loads(stored_series[0]['blob']) + assert len(blob) == 1 + assert blob[0] == FakePerfData.SERIES[0] + + jm.disconnect() + + +def test_store_performance_series_timeout_fail(jm, test_project): + # timeout case 2: a lock is on our series, but it will not expire in time + + jm.get_jobs_dhub().execute( + proc='generic.locks.get_lock', + placeholders=[FakePerfData.get_fake_lock_string()]) + old_timeout = settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT + settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT = 1 + # this should fail -- we'll timeout before we're able to insert + jm.store_performance_series(FakePerfData.TIME_INTERVAL, + FakePerfData.SERIES_TYPE, + FakePerfData.SIGNATURE, + FakePerfData.SERIES) + stored_series = jm.get_jobs_dhub().execute( + proc="jobs.selects.get_performance_series", + placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE]) + assert not stored_series + + settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT = old_timeout + jm.disconnect() + + def test_remove_existing_jobs_single_existing(jm, sample_data, initial_data, refdata, mock_log_parser, sample_resultset): """Remove single existing job prior to loading""" diff --git a/treeherder/model/derived/jobs.py b/treeherder/model/derived/jobs.py index 7ce59bb50..775733d7a 100644 --- a/treeherder/model/derived/jobs.py +++ b/treeherder/model/derived/jobs.py @@ -2040,25 +2040,37 @@ into chunks of chunk_size size. Returns the number of result sets deleted""" def store_performance_series( self, t_range, series_type, signature, series_data): - lock_string = "sps_{0}_{1}_{2}".format( - t_range, series_type, signature) - - # Use MySQL GETLOCK function to gaurd against concurrent celery tasks + # Use MySQL GETLOCK function to guard against concurrent celery tasks # overwriting each other's blobs. The lock incorporates the time # interval and signature combination and is specific to a single # json blob. - lock = self.jobs_execute( - proc='generic.locks.get_lock', - debug_show=self.DEBUG, - placeholders=[lock_string, 60]) + lock_string = "sps_{0}_{1}_{2}".format( + t_range, series_type, signature) + lock_timeout = settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT - if lock[0]['lock'] != 1: + # first, wait for lock to become free + started = time.time() + while time.time() < (started + lock_timeout): + is_lock_free = bool(self.jobs_execute( + proc='generic.locks.is_free_lock', + debug_show=self.DEBUG, + placeholders=[lock_string])[0]['lock']) + if is_lock_free: + break + time.sleep(0.1) + if not is_lock_free: logger.error( 'store_performance_series lock_string, ' '{0}, timed out!'.format(lock_string) ) return + # now, acquire the lock + self.jobs_execute( + proc='generic.locks.get_lock', + debug_show=self.DEBUG, + placeholders=[lock_string]) + try: now_timestamp = int(time.time()) diff --git a/treeherder/model/sql/generic.json b/treeherder/model/sql/generic.json index 009780942..9862b1b58 100644 --- a/treeherder/model/sql/generic.json +++ b/treeherder/model/sql/generic.json @@ -35,7 +35,13 @@ "locks": { "get_lock": { - "sql":"SELECT GET_LOCK(?, ?) AS 'lock'", + "sql":"SELECT GET_LOCK(?, 60) AS 'lock'", + + "host_type":"master_host" + }, + "is_free_lock": { + + "sql":"SELECT IS_FREE_LOCK(?) AS 'lock'", "host_type":"master_host" }, diff --git a/treeherder/settings/base.py b/treeherder/settings/base.py index 072f06ab3..c21fa8b4a 100644 --- a/treeherder/settings/base.py +++ b/treeherder/settings/base.py @@ -355,6 +355,9 @@ BZ_MAX_COMMENT_LENGTH = 40000 # like ftp.mozilla.org or hg.mozilla.org TREEHERDER_REQUESTS_TIMEOUT = 30 +# timeout for acquiring lock to update performance series +PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT = 60 + # Build the default pulse uri this is passed to kombu PULSE_URI = 'amqps://{}:{}@pulse.mozilla.org/'.format( os.environ.get('PULSE_USERNAME', 'guest'),