From 4a74b3704e14946dadd26b33407bbb0630ea83c0 Mon Sep 17 00:00:00 2001 From: William Lachance Date: Mon, 29 Jun 2015 15:03:44 -0400 Subject: [PATCH] Bug 1171707 - Fix locking when concurrently updating performance series Before if two celery jobs were updating the same series, one would overwrite the other because the locking code did not actually work (it just always unconditonally got a new lock without checking if anything was using it). This fixes that. --- tests/model/derived/test_jobs_model.py | 91 ++++++++++++++++++++++++++ treeherder/model/derived/jobs.py | 30 ++++++--- treeherder/model/sql/generic.json | 8 ++- treeherder/settings/base.py | 3 + 4 files changed, 122 insertions(+), 10 deletions(-) diff --git a/tests/model/derived/test_jobs_model.py b/tests/model/derived/test_jobs_model.py index 2030d3729..7d299346f 100644 --- a/tests/model/derived/test_jobs_model.py +++ b/tests/model/derived/test_jobs_model.py @@ -6,11 +6,14 @@ import time import json import pytest import copy +import threading +from django.conf import settings from django.core.management import call_command from treeherder.model.derived.base import DatasetNotFoundError from treeherder.model.derived import ArtifactsModel +from treeherder.model.derived.jobs import JobsModel from tests.sample_data_generator import job_data, result_set from tests import test_utils @@ -18,6 +21,20 @@ slow = pytest.mark.slow xfail = pytest.mark.xfail +class FakePerfData(object): + SERIES = [{'geomean': 1, 'result_set_id': 1, + 'push_timestamp': int(time.time())}] + TIME_INTERVAL = 86400 + SIGNATURE = 'cheezburger' + SERIES_TYPE = 'talos_data' + + @staticmethod + def get_fake_lock_string(): + return 'sps_{}_{}_{}'.format(FakePerfData.TIME_INTERVAL, + FakePerfData.SERIES_TYPE, + FakePerfData.SIGNATURE) + + def test_unicode(jm): """Unicode representation of a ``JobModel`` is the project name.""" assert unicode(jm) == unicode(jm.project) @@ -447,6 +464,80 @@ def test_store_performance_artifact( assert performance_artifact_signatures == series_signatures +def test_store_performance_series(jm, test_project): + + # basic case: everything works as expected + jm.store_performance_series(FakePerfData.TIME_INTERVAL, + FakePerfData.SERIES_TYPE, + FakePerfData.SIGNATURE, + FakePerfData.SERIES) + stored_series = jm.get_jobs_dhub().execute( + proc="jobs.selects.get_performance_series", + placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE]) + blob = json.loads(stored_series[0]['blob']) + assert len(blob) == 1 + assert blob[0] == FakePerfData.SERIES[0] + + jm.disconnect() + + +def test_store_performance_series_timeout_recover(jm, test_project): + # timeout case 1: a lock is on our series, but it will expire + + # use a thread to simulate locking and then unlocking the table + # FIXME: this is rather fragile and depends on the thread being + # run more or less immediately so that the lock is engaged + def _lock_unlock(): + with JobsModel(test_project) as jm2: + jm2.get_jobs_dhub().execute( + proc='generic.locks.get_lock', + placeholders=[FakePerfData.get_fake_lock_string()]) + time.sleep(1) + jm2.get_jobs_dhub().execute( + proc='generic.locks.release_lock', + placeholders=[FakePerfData.get_fake_lock_string()]) + t = threading.Thread(target=_lock_unlock) + t.start() + + # will fail at first due to lock, but we should recover and insert + jm.store_performance_series(FakePerfData.TIME_INTERVAL, + FakePerfData.SERIES_TYPE, + FakePerfData.SIGNATURE, + FakePerfData.SERIES) + t.join() + stored_series = jm.get_jobs_dhub().execute( + proc="jobs.selects.get_performance_series", + placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE]) + + blob = json.loads(stored_series[0]['blob']) + assert len(blob) == 1 + assert blob[0] == FakePerfData.SERIES[0] + + jm.disconnect() + + +def test_store_performance_series_timeout_fail(jm, test_project): + # timeout case 2: a lock is on our series, but it will not expire in time + + jm.get_jobs_dhub().execute( + proc='generic.locks.get_lock', + placeholders=[FakePerfData.get_fake_lock_string()]) + old_timeout = settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT + settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT = 1 + # this should fail -- we'll timeout before we're able to insert + jm.store_performance_series(FakePerfData.TIME_INTERVAL, + FakePerfData.SERIES_TYPE, + FakePerfData.SIGNATURE, + FakePerfData.SERIES) + stored_series = jm.get_jobs_dhub().execute( + proc="jobs.selects.get_performance_series", + placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE]) + assert not stored_series + + settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT = old_timeout + jm.disconnect() + + def test_remove_existing_jobs_single_existing(jm, sample_data, initial_data, refdata, mock_log_parser, sample_resultset): """Remove single existing job prior to loading""" diff --git a/treeherder/model/derived/jobs.py b/treeherder/model/derived/jobs.py index 7ce59bb50..775733d7a 100644 --- a/treeherder/model/derived/jobs.py +++ b/treeherder/model/derived/jobs.py @@ -2040,25 +2040,37 @@ into chunks of chunk_size size. Returns the number of result sets deleted""" def store_performance_series( self, t_range, series_type, signature, series_data): - lock_string = "sps_{0}_{1}_{2}".format( - t_range, series_type, signature) - - # Use MySQL GETLOCK function to gaurd against concurrent celery tasks + # Use MySQL GETLOCK function to guard against concurrent celery tasks # overwriting each other's blobs. The lock incorporates the time # interval and signature combination and is specific to a single # json blob. - lock = self.jobs_execute( - proc='generic.locks.get_lock', - debug_show=self.DEBUG, - placeholders=[lock_string, 60]) + lock_string = "sps_{0}_{1}_{2}".format( + t_range, series_type, signature) + lock_timeout = settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT - if lock[0]['lock'] != 1: + # first, wait for lock to become free + started = time.time() + while time.time() < (started + lock_timeout): + is_lock_free = bool(self.jobs_execute( + proc='generic.locks.is_free_lock', + debug_show=self.DEBUG, + placeholders=[lock_string])[0]['lock']) + if is_lock_free: + break + time.sleep(0.1) + if not is_lock_free: logger.error( 'store_performance_series lock_string, ' '{0}, timed out!'.format(lock_string) ) return + # now, acquire the lock + self.jobs_execute( + proc='generic.locks.get_lock', + debug_show=self.DEBUG, + placeholders=[lock_string]) + try: now_timestamp = int(time.time()) diff --git a/treeherder/model/sql/generic.json b/treeherder/model/sql/generic.json index 009780942..9862b1b58 100644 --- a/treeherder/model/sql/generic.json +++ b/treeherder/model/sql/generic.json @@ -35,7 +35,13 @@ "locks": { "get_lock": { - "sql":"SELECT GET_LOCK(?, ?) AS 'lock'", + "sql":"SELECT GET_LOCK(?, 60) AS 'lock'", + + "host_type":"master_host" + }, + "is_free_lock": { + + "sql":"SELECT IS_FREE_LOCK(?) AS 'lock'", "host_type":"master_host" }, diff --git a/treeherder/settings/base.py b/treeherder/settings/base.py index 072f06ab3..c21fa8b4a 100644 --- a/treeherder/settings/base.py +++ b/treeherder/settings/base.py @@ -355,6 +355,9 @@ BZ_MAX_COMMENT_LENGTH = 40000 # like ftp.mozilla.org or hg.mozilla.org TREEHERDER_REQUESTS_TIMEOUT = 30 +# timeout for acquiring lock to update performance series +PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT = 60 + # Build the default pulse uri this is passed to kombu PULSE_URI = 'amqps://{}:{}@pulse.mozilla.org/'.format( os.environ.get('PULSE_USERNAME', 'guest'),