Bug 1171707 - Fix locking when concurrently updating performance series

Before if two celery jobs were updating the same series, one would overwrite
the other because the locking code did not actually work (it just always
unconditonally got a new lock without checking if anything was using it).
This fixes that.
This commit is contained in:
William Lachance 2015-06-29 15:03:44 -04:00
Родитель 92c66e2c1a
Коммит 4a74b3704e
4 изменённых файлов: 122 добавлений и 10 удалений

Просмотреть файл

@ -6,11 +6,14 @@ import time
import json
import pytest
import copy
import threading
from django.conf import settings
from django.core.management import call_command
from treeherder.model.derived.base import DatasetNotFoundError
from treeherder.model.derived import ArtifactsModel
from treeherder.model.derived.jobs import JobsModel
from tests.sample_data_generator import job_data, result_set
from tests import test_utils
@ -18,6 +21,20 @@ slow = pytest.mark.slow
xfail = pytest.mark.xfail
class FakePerfData(object):
SERIES = [{'geomean': 1, 'result_set_id': 1,
'push_timestamp': int(time.time())}]
TIME_INTERVAL = 86400
SIGNATURE = 'cheezburger'
SERIES_TYPE = 'talos_data'
@staticmethod
def get_fake_lock_string():
return 'sps_{}_{}_{}'.format(FakePerfData.TIME_INTERVAL,
FakePerfData.SERIES_TYPE,
FakePerfData.SIGNATURE)
def test_unicode(jm):
"""Unicode representation of a ``JobModel`` is the project name."""
assert unicode(jm) == unicode(jm.project)
@ -447,6 +464,80 @@ def test_store_performance_artifact(
assert performance_artifact_signatures == series_signatures
def test_store_performance_series(jm, test_project):
# basic case: everything works as expected
jm.store_performance_series(FakePerfData.TIME_INTERVAL,
FakePerfData.SERIES_TYPE,
FakePerfData.SIGNATURE,
FakePerfData.SERIES)
stored_series = jm.get_jobs_dhub().execute(
proc="jobs.selects.get_performance_series",
placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE])
blob = json.loads(stored_series[0]['blob'])
assert len(blob) == 1
assert blob[0] == FakePerfData.SERIES[0]
jm.disconnect()
def test_store_performance_series_timeout_recover(jm, test_project):
# timeout case 1: a lock is on our series, but it will expire
# use a thread to simulate locking and then unlocking the table
# FIXME: this is rather fragile and depends on the thread being
# run more or less immediately so that the lock is engaged
def _lock_unlock():
with JobsModel(test_project) as jm2:
jm2.get_jobs_dhub().execute(
proc='generic.locks.get_lock',
placeholders=[FakePerfData.get_fake_lock_string()])
time.sleep(1)
jm2.get_jobs_dhub().execute(
proc='generic.locks.release_lock',
placeholders=[FakePerfData.get_fake_lock_string()])
t = threading.Thread(target=_lock_unlock)
t.start()
# will fail at first due to lock, but we should recover and insert
jm.store_performance_series(FakePerfData.TIME_INTERVAL,
FakePerfData.SERIES_TYPE,
FakePerfData.SIGNATURE,
FakePerfData.SERIES)
t.join()
stored_series = jm.get_jobs_dhub().execute(
proc="jobs.selects.get_performance_series",
placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE])
blob = json.loads(stored_series[0]['blob'])
assert len(blob) == 1
assert blob[0] == FakePerfData.SERIES[0]
jm.disconnect()
def test_store_performance_series_timeout_fail(jm, test_project):
# timeout case 2: a lock is on our series, but it will not expire in time
jm.get_jobs_dhub().execute(
proc='generic.locks.get_lock',
placeholders=[FakePerfData.get_fake_lock_string()])
old_timeout = settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT
settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT = 1
# this should fail -- we'll timeout before we're able to insert
jm.store_performance_series(FakePerfData.TIME_INTERVAL,
FakePerfData.SERIES_TYPE,
FakePerfData.SIGNATURE,
FakePerfData.SERIES)
stored_series = jm.get_jobs_dhub().execute(
proc="jobs.selects.get_performance_series",
placeholders=[FakePerfData.TIME_INTERVAL, FakePerfData.SIGNATURE])
assert not stored_series
settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT = old_timeout
jm.disconnect()
def test_remove_existing_jobs_single_existing(jm, sample_data, initial_data, refdata,
mock_log_parser, sample_resultset):
"""Remove single existing job prior to loading"""

Просмотреть файл

@ -2040,25 +2040,37 @@ into chunks of chunk_size size. Returns the number of result sets deleted"""
def store_performance_series(
self, t_range, series_type, signature, series_data):
lock_string = "sps_{0}_{1}_{2}".format(
t_range, series_type, signature)
# Use MySQL GETLOCK function to gaurd against concurrent celery tasks
# Use MySQL GETLOCK function to guard against concurrent celery tasks
# overwriting each other's blobs. The lock incorporates the time
# interval and signature combination and is specific to a single
# json blob.
lock = self.jobs_execute(
proc='generic.locks.get_lock',
debug_show=self.DEBUG,
placeholders=[lock_string, 60])
lock_string = "sps_{0}_{1}_{2}".format(
t_range, series_type, signature)
lock_timeout = settings.PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT
if lock[0]['lock'] != 1:
# first, wait for lock to become free
started = time.time()
while time.time() < (started + lock_timeout):
is_lock_free = bool(self.jobs_execute(
proc='generic.locks.is_free_lock',
debug_show=self.DEBUG,
placeholders=[lock_string])[0]['lock'])
if is_lock_free:
break
time.sleep(0.1)
if not is_lock_free:
logger.error(
'store_performance_series lock_string, '
'{0}, timed out!'.format(lock_string)
)
return
# now, acquire the lock
self.jobs_execute(
proc='generic.locks.get_lock',
debug_show=self.DEBUG,
placeholders=[lock_string])
try:
now_timestamp = int(time.time())

Просмотреть файл

@ -35,7 +35,13 @@
"locks": {
"get_lock": {
"sql":"SELECT GET_LOCK(?, ?) AS 'lock'",
"sql":"SELECT GET_LOCK(?, 60) AS 'lock'",
"host_type":"master_host"
},
"is_free_lock": {
"sql":"SELECT IS_FREE_LOCK(?) AS 'lock'",
"host_type":"master_host"
},

Просмотреть файл

@ -355,6 +355,9 @@ BZ_MAX_COMMENT_LENGTH = 40000
# like ftp.mozilla.org or hg.mozilla.org
TREEHERDER_REQUESTS_TIMEOUT = 30
# timeout for acquiring lock to update performance series
PERFHERDER_UPDATE_SERIES_LOCK_TIMEOUT = 60
# Build the default pulse uri this is passed to kombu
PULSE_URI = 'amqps://{}:{}@pulse.mozilla.org/'.format(
os.environ.get('PULSE_USERNAME', 'guest'),