зеркало из https://github.com/mozilla/treeherder.git
Bug 1346567 - Re enable Perfherder data cycler
This commit is contained in:
Родитель
369dbbb3fa
Коммит
a5df8a966b
|
@ -141,7 +141,6 @@ def test_repository(transactional_db):
|
||||||
repository_group_id=1,
|
repository_group_id=1,
|
||||||
description="",
|
description="",
|
||||||
performance_alerts_enabled=True,
|
performance_alerts_enabled=True,
|
||||||
expire_performance_data=False
|
|
||||||
)
|
)
|
||||||
return r
|
return r
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,8 @@ from django.core.management import call_command
|
||||||
from tests import test_utils
|
from tests import test_utils
|
||||||
from tests.autoclassify.utils import (create_failure_lines,
|
from tests.autoclassify.utils import (create_failure_lines,
|
||||||
test_line)
|
test_line)
|
||||||
|
from treeherder.model.management.commands.cycle_data import (MINIMUM_PERFHERDER_EXPIRE_INTERVAL,
|
||||||
|
PerfherderCycler)
|
||||||
from treeherder.model.models import (FailureLine,
|
from treeherder.model.models import (FailureLine,
|
||||||
Job,
|
Job,
|
||||||
JobDetail,
|
JobDetail,
|
||||||
|
@ -15,7 +17,9 @@ from treeherder.model.models import (FailureLine,
|
||||||
JobType,
|
JobType,
|
||||||
Machine,
|
Machine,
|
||||||
Push)
|
Push)
|
||||||
|
from treeherder.perf.exceptions import MaxRuntimeExceeded
|
||||||
from treeherder.perf.models import (PerformanceDatum,
|
from treeherder.perf.models import (PerformanceDatum,
|
||||||
|
PerformanceDatumManager,
|
||||||
PerformanceSignature)
|
PerformanceSignature)
|
||||||
from treeherder.services.elasticsearch import (all_documents,
|
from treeherder.services.elasticsearch import (all_documents,
|
||||||
count_index,
|
count_index,
|
||||||
|
@ -36,7 +40,7 @@ def test_cycle_all_data(test_repository, failure_classifications, sample_data,
|
||||||
job.submit_time = cycle_date_ts
|
job.submit_time = cycle_date_ts
|
||||||
job.save()
|
job.save()
|
||||||
|
|
||||||
call_command('cycle_data', sleep_time=0, days=1)
|
call_command('cycle_data', 'from:treeherder', sleep_time=0, days=1)
|
||||||
|
|
||||||
# There should be no jobs or failure lines after cycling
|
# There should be no jobs or failure lines after cycling
|
||||||
assert Job.objects.count() == 0
|
assert Job.objects.count() == 0
|
||||||
|
@ -86,7 +90,7 @@ def test_cycle_all_but_one_job(test_repository, failure_classifications, sample_
|
||||||
id=job_not_deleted.id).count()
|
id=job_not_deleted.id).count()
|
||||||
num_job_logs_before = JobLog.objects.count()
|
num_job_logs_before = JobLog.objects.count()
|
||||||
|
|
||||||
call_command('cycle_data', sleep_time=0, days=1, debug=True)
|
call_command('cycle_data', 'from:treeherder', sleep_time=0, days=1, debug=True)
|
||||||
|
|
||||||
assert Job.objects.count() == 1
|
assert Job.objects.count() == 1
|
||||||
assert JobLog.objects.count() == (num_job_logs_before -
|
assert JobLog.objects.count() == (num_job_logs_before -
|
||||||
|
@ -124,7 +128,7 @@ def test_cycle_all_data_in_chunks(test_repository, failure_classifications, samp
|
||||||
if settings.ELASTICSEARCH_URL:
|
if settings.ELASTICSEARCH_URL:
|
||||||
assert count_index() > 0
|
assert count_index() > 0
|
||||||
|
|
||||||
call_command('cycle_data', sleep_time=0, days=1, chunk_size=3)
|
call_command('cycle_data', 'from:treeherder', sleep_time=0, days=1, chunk_size=3)
|
||||||
|
|
||||||
# There should be no jobs after cycling
|
# There should be no jobs after cycling
|
||||||
assert Job.objects.count() == 0
|
assert Job.objects.count() == 0
|
||||||
|
@ -151,7 +155,7 @@ def test_cycle_job_model_reference_data(test_repository, failure_classifications
|
||||||
jt = JobType.objects.create(symbol='mu', name='mu')
|
jt = JobType.objects.create(symbol='mu', name='mu')
|
||||||
m = Machine.objects.create(name='machine_with_no_job')
|
m = Machine.objects.create(name='machine_with_no_job')
|
||||||
(jg_id, jt_id, m_id) = (jg.id, jt.id, m.id)
|
(jg_id, jt_id, m_id) = (jg.id, jt.id, m.id)
|
||||||
call_command('cycle_data', sleep_time=0, days=1, chunk_size=3)
|
call_command('cycle_data', 'from:treeherder', sleep_time=0, days=1, chunk_size=3)
|
||||||
|
|
||||||
# assert that reference data that should have been cycled, was cycled
|
# assert that reference data that should have been cycled, was cycled
|
||||||
assert JobGroup.objects.filter(id=jg_id).count() == 0
|
assert JobGroup.objects.filter(id=jg_id).count() == 0
|
||||||
|
@ -164,7 +168,6 @@ def test_cycle_job_model_reference_data(test_repository, failure_classifications
|
||||||
assert Machine.objects.filter(id__in=original_machine_ids).count() == len(original_machine_ids)
|
assert Machine.objects.filter(id__in=original_machine_ids).count() == len(original_machine_ids)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(reason="Perf data cycling temporarily disabled (bug 1346567)")
|
|
||||||
def test_cycle_job_with_performance_data(test_repository, failure_classifications,
|
def test_cycle_job_with_performance_data(test_repository, failure_classifications,
|
||||||
test_job, mock_log_parser,
|
test_job, mock_log_parser,
|
||||||
test_perf_signature):
|
test_perf_signature):
|
||||||
|
@ -181,7 +184,7 @@ def test_cycle_job_with_performance_data(test_repository, failure_classification
|
||||||
push_timestamp=test_job.push.time,
|
push_timestamp=test_job.push.time,
|
||||||
value=1.0)
|
value=1.0)
|
||||||
|
|
||||||
call_command('cycle_data', sleep_time=0, days=1, chunk_size=3)
|
call_command('cycle_data', 'from:treeherder', sleep_time=0, days=1, chunk_size=3)
|
||||||
|
|
||||||
# assert that the job got cycled
|
# assert that the job got cycled
|
||||||
assert Job.objects.count() == 0
|
assert Job.objects.count() == 0
|
||||||
|
@ -191,15 +194,20 @@ def test_cycle_job_with_performance_data(test_repository, failure_classification
|
||||||
assert p.job is None
|
assert p.job is None
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip(reason="Perf data cycling temporarily disabled (bug 1346567)")
|
@pytest.mark.parametrize('repository_name, command_options, subcommand_options, should_expire',
|
||||||
@pytest.mark.parametrize("test_repository_expire_data", [False, True])
|
[('autoland', '--days=365', None, True),
|
||||||
def test_cycle_performance_data(test_repository, push_stored,
|
('mozilla-inbound', '--days=365', None, True),
|
||||||
test_perf_signature,
|
('mozilla-beta', '--days=365', None, True),
|
||||||
test_repository_expire_data):
|
('mozilla-central', '--days=365', None, True),
|
||||||
test_repository.expire_performance_data = test_repository_expire_data
|
('autoland', '--days=401', None, False),
|
||||||
|
])
|
||||||
|
def test_cycle_performance_data(test_repository, repository_name, push_stored,
|
||||||
|
test_perf_signature, command_options, subcommand_options,
|
||||||
|
should_expire):
|
||||||
|
test_repository.name = repository_name
|
||||||
test_repository.save()
|
test_repository.save()
|
||||||
|
|
||||||
expired_timestamp = datetime.datetime.now() - datetime.timedelta(weeks=1)
|
expired_timestamp = datetime.datetime.now() - datetime.timedelta(days=400)
|
||||||
|
|
||||||
test_perf_signature_2 = PerformanceSignature.objects.create(
|
test_perf_signature_2 = PerformanceSignature.objects.create(
|
||||||
signature_hash='b'*40,
|
signature_hash='b'*40,
|
||||||
|
@ -220,7 +228,7 @@ def test_cycle_performance_data(test_repository, push_stored,
|
||||||
push2.time = expired_timestamp
|
push2.time = expired_timestamp
|
||||||
push2.save()
|
push2.save()
|
||||||
|
|
||||||
# a performance datum that *should not* be deleted
|
# this shouldn't be deleted in any circumstance
|
||||||
PerformanceDatum.objects.create(
|
PerformanceDatum.objects.create(
|
||||||
id=1,
|
id=1,
|
||||||
repository=test_repository,
|
repository=test_repository,
|
||||||
|
@ -230,8 +238,7 @@ def test_cycle_performance_data(test_repository, push_stored,
|
||||||
push_timestamp=push1.time,
|
push_timestamp=push1.time,
|
||||||
value=1.0)
|
value=1.0)
|
||||||
|
|
||||||
# a performance datum that *should* be deleted (but only if the
|
# the performance datum that which we're targetting
|
||||||
# repository is marked as having expirable performance data)
|
|
||||||
PerformanceDatum.objects.create(
|
PerformanceDatum.objects.create(
|
||||||
id=2,
|
id=2,
|
||||||
repository=test_repository,
|
repository=test_repository,
|
||||||
|
@ -241,13 +248,48 @@ def test_cycle_performance_data(test_repository, push_stored,
|
||||||
push_timestamp=push2.time,
|
push_timestamp=push2.time,
|
||||||
value=1.0)
|
value=1.0)
|
||||||
|
|
||||||
call_command('cycle_data', sleep_time=0, days=1)
|
command = filter(lambda arg: arg is not None,
|
||||||
|
['cycle_data', command_options, 'from:perfherder', subcommand_options])
|
||||||
|
call_command(*list(command)) # test repository isn't a main one
|
||||||
|
|
||||||
if test_repository_expire_data:
|
if should_expire:
|
||||||
assert list(PerformanceDatum.objects.values_list('id', flat=True)) == [1]
|
assert list(PerformanceDatum.objects.values_list('id', flat=True)) == [1]
|
||||||
assert list(PerformanceSignature.objects.values_list('id', flat=True)) == [
|
assert list(PerformanceSignature.objects.values_list('id', flat=True)) == [test_perf_signature.id]
|
||||||
test_perf_signature.id]
|
|
||||||
else:
|
else:
|
||||||
assert list(PerformanceDatum.objects.values_list('id', flat=True)) == [1, 2]
|
assert PerformanceDatum.objects.count() == 2
|
||||||
assert list(PerformanceSignature.objects.values_list('id', flat=True)) == [
|
assert PerformanceSignature.objects.count() == 2
|
||||||
test_perf_signature.id, test_perf_signature_2.id]
|
|
||||||
|
|
||||||
|
def test_performance_cycler_quit_indicator():
|
||||||
|
ten_minutes_ago = datetime.datetime.now() - datetime.timedelta(minutes=10)
|
||||||
|
max_one_second = datetime.timedelta(seconds=1)
|
||||||
|
|
||||||
|
two_seconds_ago = datetime.datetime.now() - datetime.timedelta(seconds=2)
|
||||||
|
max_five_minutes = datetime.timedelta(minutes=5)
|
||||||
|
|
||||||
|
with pytest.raises(MaxRuntimeExceeded):
|
||||||
|
PerformanceDatumManager._maybe_quit(started_at=ten_minutes_ago,
|
||||||
|
max_overall_runtime=max_one_second)
|
||||||
|
|
||||||
|
try:
|
||||||
|
PerformanceDatumManager._maybe_quit(started_at=two_seconds_ago,
|
||||||
|
max_overall_runtime=max_five_minutes)
|
||||||
|
except MaxRuntimeExceeded:
|
||||||
|
pytest.fail('Performance cycling shouldn\'t have quit')
|
||||||
|
|
||||||
|
|
||||||
|
def test_performance_cycler_doesnt_delete_too_recent_data():
|
||||||
|
down_to_last_year = MINIMUM_PERFHERDER_EXPIRE_INTERVAL
|
||||||
|
dangerously_recent = 40
|
||||||
|
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
PerfherderCycler(days=dangerously_recent,
|
||||||
|
chunk_size=1000,
|
||||||
|
sleep_time=0)
|
||||||
|
|
||||||
|
try:
|
||||||
|
PerfherderCycler(days=down_to_last_year,
|
||||||
|
chunk_size=1000,
|
||||||
|
sleep_time=0)
|
||||||
|
except ValueError:
|
||||||
|
pytest.fail('Should be able to expire data older than one year')
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import datetime
|
import datetime
|
||||||
|
import logging
|
||||||
|
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
|
|
||||||
|
@ -9,15 +10,104 @@ from treeherder.model.models import (Job,
|
||||||
Repository)
|
Repository)
|
||||||
from treeherder.perf.models import PerformanceDatum
|
from treeherder.perf.models import PerformanceDatum
|
||||||
|
|
||||||
|
logging.basicConfig(format='%(levelname)s:%(message)s')
|
||||||
|
|
||||||
|
TREEHERDER = 'treeherder'
|
||||||
|
PERFHERDER = 'perfherder'
|
||||||
|
TREEHERDER_SUBCOMMAND = 'from:treeherder'
|
||||||
|
PERFHERDER_SUBCOMMAND = 'from:perfherder'
|
||||||
|
MINIMUM_PERFHERDER_EXPIRE_INTERVAL = 365
|
||||||
|
|
||||||
|
|
||||||
|
class DataCycler:
|
||||||
|
source = ''
|
||||||
|
|
||||||
|
def __init__(self, days, chunk_size, sleep_time, is_debug=None,
|
||||||
|
logger=None, **kwargs):
|
||||||
|
self.cycle_interval = datetime.timedelta(days=days)
|
||||||
|
self.chunk_size = chunk_size
|
||||||
|
self.sleep_time = sleep_time
|
||||||
|
self.is_debug = is_debug or False
|
||||||
|
self.logger = logger
|
||||||
|
|
||||||
|
def cycle(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TreeherderCycler(DataCycler):
|
||||||
|
source = TREEHERDER.title()
|
||||||
|
|
||||||
|
def cycle(self):
|
||||||
|
for repository in Repository.objects.all():
|
||||||
|
self.logger.warning("Cycling repository: {0}".format(repository.name))
|
||||||
|
rs_deleted = Job.objects.cycle_data(repository,
|
||||||
|
self.cycle_interval,
|
||||||
|
self.chunk_size,
|
||||||
|
self.sleep_time)
|
||||||
|
self.logger.warning("Deleted {} jobs from {}"
|
||||||
|
.format(rs_deleted, repository.name))
|
||||||
|
|
||||||
|
self.remove_leftovers()
|
||||||
|
|
||||||
|
def remove_leftovers(self):
|
||||||
|
def extract_id_groups(id_names, used_dependencies):
|
||||||
|
id_groups = {id_name: set() for id_name in id_names}
|
||||||
|
|
||||||
|
for dependency in used_dependencies:
|
||||||
|
for id_name in id_names:
|
||||||
|
id_groups[id_name].add(dependency[id_name])
|
||||||
|
return [id_groups[name] for name in id_names]
|
||||||
|
|
||||||
|
used_dependencies = (Job.objects
|
||||||
|
.values('job_type_id', 'job_group_id', 'machine_id')
|
||||||
|
.distinct())
|
||||||
|
|
||||||
|
(used_job_type_ids,
|
||||||
|
used_job_group_ids,
|
||||||
|
used_machine_ids) = extract_id_groups(
|
||||||
|
['job_type_id',
|
||||||
|
'job_group_id',
|
||||||
|
'machine_id'],
|
||||||
|
used_dependencies)
|
||||||
|
|
||||||
|
JobType.objects.exclude(id__in=used_job_type_ids).delete()
|
||||||
|
JobGroup.objects.exclude(id__in=used_job_group_ids).delete()
|
||||||
|
Machine.objects.exclude(id__in=used_machine_ids).delete()
|
||||||
|
|
||||||
|
|
||||||
|
class PerfherderCycler(DataCycler):
|
||||||
|
source = PERFHERDER.title()
|
||||||
|
max_runtime = datetime.timedelta(hours=23)
|
||||||
|
|
||||||
|
def __init__(self, days, chunk_size, sleep_time, is_debug=None,
|
||||||
|
logger=None, **kwargs):
|
||||||
|
super().__init__(days, chunk_size, sleep_time, is_debug, logger)
|
||||||
|
if days < MINIMUM_PERFHERDER_EXPIRE_INTERVAL:
|
||||||
|
raise ValueError('Cannot remove performance data that is more recent than {} days'
|
||||||
|
.format(MINIMUM_PERFHERDER_EXPIRE_INTERVAL))
|
||||||
|
|
||||||
|
def cycle(self):
|
||||||
|
started_at = datetime.datetime.now()
|
||||||
|
|
||||||
|
PerformanceDatum.objects.cycle_data(self.cycle_interval,
|
||||||
|
self.chunk_size,
|
||||||
|
self.logger,
|
||||||
|
started_at,
|
||||||
|
self.max_runtime)
|
||||||
|
|
||||||
|
|
||||||
class Command(BaseCommand):
|
class Command(BaseCommand):
|
||||||
help = """Cycle data that exceeds the time constraint limit"""
|
help = """Cycle data that exceeds the time constraint limit"""
|
||||||
|
CYCLER_CLASSES = {
|
||||||
|
TREEHERDER: TreeherderCycler,
|
||||||
|
PERFHERDER: PerfherderCycler,
|
||||||
|
}
|
||||||
|
|
||||||
def add_arguments(self, parser):
|
def add_arguments(self, parser):
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--debug',
|
'--debug',
|
||||||
action='store_true',
|
action='store_true',
|
||||||
dest='debug',
|
dest='is_debug',
|
||||||
default=False,
|
default=False,
|
||||||
help='Write debug messages to stdout'
|
help='Write debug messages to stdout'
|
||||||
)
|
)
|
||||||
|
@ -27,7 +117,8 @@ class Command(BaseCommand):
|
||||||
dest='days',
|
dest='days',
|
||||||
default=120,
|
default=120,
|
||||||
type=int,
|
type=int,
|
||||||
help='Data cycle interval expressed in days'
|
help='Data cycle interval expressed in days. '
|
||||||
|
'Minimum {} days when expiring performance data.'.format(MINIMUM_PERFHERDER_EXPIRE_INTERVAL)
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'--chunk-size',
|
'--chunk-size',
|
||||||
|
@ -44,45 +135,42 @@ class Command(BaseCommand):
|
||||||
dest='sleep_time',
|
dest='sleep_time',
|
||||||
default=0,
|
default=0,
|
||||||
type=int,
|
type=int,
|
||||||
help='How many seconds to pause between each query'
|
help='How many seconds to pause between each query. Ignored when cycling performance data.'
|
||||||
)
|
)
|
||||||
|
subparsers = parser.add_subparsers(
|
||||||
|
description='Data producers from which to expire data',
|
||||||
|
dest='data_source')
|
||||||
|
subparsers.add_parser(TREEHERDER_SUBCOMMAND) # default subcommand even if not provided
|
||||||
|
|
||||||
|
# Perfherder will have its own specifics
|
||||||
|
subparsers.add_parser(PERFHERDER_SUBCOMMAND)
|
||||||
|
|
||||||
def handle(self, *args, **options):
|
def handle(self, *args, **options):
|
||||||
self.is_debug = options['debug']
|
logger = self.get_logger(options['is_debug'])
|
||||||
|
|
||||||
cycle_interval = datetime.timedelta(days=options['days'])
|
logger.warning("Cycle interval... {}".format(options['days']))
|
||||||
|
|
||||||
self.debug("cycle interval... {}".format(cycle_interval))
|
data_cycler = self.fabricate_data_cycler(options, logger)
|
||||||
|
logger.warning('Cycling {0} data...'.format(data_cycler.source))
|
||||||
|
data_cycler.cycle()
|
||||||
|
|
||||||
for repository in Repository.objects.all():
|
def get_logger(self, is_debug):
|
||||||
self.debug("Cycling repository: {0}".format(repository.name))
|
logger = logging.getLogger('cycle_data')
|
||||||
rs_deleted = Job.objects.cycle_data(repository,
|
logger.setLevel(
|
||||||
cycle_interval,
|
logging.WARNING if is_debug else logging.CRITICAL)
|
||||||
options['chunk_size'],
|
logger.propagate = False
|
||||||
options['sleep_time'])
|
|
||||||
self.debug("Deleted {} jobs from {}".format(rs_deleted,
|
|
||||||
repository.name))
|
|
||||||
|
|
||||||
# TODO: Fix the performance issues and re-enable:
|
console_handler = logging.StreamHandler()
|
||||||
# https://bugzilla.mozilla.org/show_bug.cgi?id=1346567#c10
|
console_handler.setLevel(logging.WARNING)
|
||||||
if False and repository.expire_performance_data:
|
|
||||||
PerformanceDatum.objects.cycle_data(repository,
|
|
||||||
cycle_interval,
|
|
||||||
options['chunk_size'],
|
|
||||||
options['sleep_time'])
|
|
||||||
|
|
||||||
self.cycle_non_job_data(options['chunk_size'], options['sleep_time'])
|
formatter = logging.Formatter('%(asctime)s|%(name)s|%(levelname)s|%(message)s')
|
||||||
|
console_handler.setFormatter(formatter)
|
||||||
|
logger.addHandler(console_handler)
|
||||||
|
return logger
|
||||||
|
|
||||||
def cycle_non_job_data(self, chunk_size, sleep_time):
|
def fabricate_data_cycler(self, options, logger):
|
||||||
used_job_type_ids = Job.objects.values('job_type_id').distinct()
|
data_source = options.pop('data_source') or TREEHERDER_SUBCOMMAND
|
||||||
JobType.objects.exclude(id__in=used_job_type_ids).delete()
|
data_source = data_source.split(':')[1]
|
||||||
|
|
||||||
used_job_group_ids = Job.objects.values('job_group_id').distinct()
|
cls = self.CYCLER_CLASSES[data_source]
|
||||||
JobGroup.objects.exclude(id__in=used_job_group_ids).delete()
|
return cls(logger=logger, **options)
|
||||||
|
|
||||||
used_machine_ids = Job.objects.values('machine_id').distinct()
|
|
||||||
Machine.objects.exclude(id__in=used_machine_ids).delete()
|
|
||||||
|
|
||||||
def debug(self, msg):
|
|
||||||
if self.is_debug:
|
|
||||||
self.stdout.write(msg)
|
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
class NoDataCyclingAtAll(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class MaxRuntimeExceeded(Exception):
|
||||||
|
pass
|
|
@ -1,5 +1,4 @@
|
||||||
import datetime
|
import datetime
|
||||||
import time
|
|
||||||
|
|
||||||
from django.contrib.auth.models import User
|
from django.contrib.auth.models import User
|
||||||
from django.core.exceptions import ValidationError
|
from django.core.exceptions import ValidationError
|
||||||
|
@ -12,6 +11,8 @@ from treeherder.model.models import (Job,
|
||||||
OptionCollection,
|
OptionCollection,
|
||||||
Push,
|
Push,
|
||||||
Repository)
|
Repository)
|
||||||
|
from treeherder.perf.exceptions import (MaxRuntimeExceeded,
|
||||||
|
NoDataCyclingAtAll)
|
||||||
|
|
||||||
SIGNATURE_HASH_LENGTH = 40
|
SIGNATURE_HASH_LENGTH = 40
|
||||||
|
|
||||||
|
@ -107,31 +108,83 @@ class PerformanceDatumManager(models.Manager):
|
||||||
Convenience functions for operations on groups of performance datums
|
Convenience functions for operations on groups of performance datums
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def cycle_data(self, repository, cycle_interval, chunk_size, sleep_time):
|
@classmethod
|
||||||
|
def _maybe_quit(cls, started_at, max_overall_runtime):
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
elapsed_runtime = now - started_at
|
||||||
|
|
||||||
|
if max_overall_runtime < elapsed_runtime:
|
||||||
|
raise MaxRuntimeExceeded('Max runtime for performance data cycling exceeded')
|
||||||
|
|
||||||
|
def cycle_data(self, cycle_interval, chunk_size,
|
||||||
|
logger, started_at, max_overall_runtime):
|
||||||
"""Delete data older than cycle_interval, splitting the target data
|
"""Delete data older than cycle_interval, splitting the target data
|
||||||
into chunks of chunk_size size."""
|
into chunks of chunk_size size."""
|
||||||
|
|
||||||
max_timestamp = datetime.datetime.now() - cycle_interval
|
max_timestamp = datetime.datetime.now() - cycle_interval
|
||||||
|
|
||||||
# seperate datums into chunks
|
try:
|
||||||
while True:
|
self._delete_in_chunks(max_timestamp, chunk_size, logger,
|
||||||
perf_datums_to_cycle = list(self.filter(
|
started_at, max_overall_runtime)
|
||||||
repository=repository,
|
|
||||||
push_timestamp__lt=max_timestamp).values_list('id', flat=True)[:chunk_size])
|
|
||||||
if not perf_datums_to_cycle:
|
|
||||||
# we're done!
|
|
||||||
break
|
|
||||||
self.filter(id__in=perf_datums_to_cycle).delete()
|
|
||||||
if sleep_time:
|
|
||||||
# Allow some time for other queries to get through
|
|
||||||
time.sleep(sleep_time)
|
|
||||||
|
|
||||||
# also remove any signatures which are (no longer) associated with
|
# also remove any signatures which are (no longer) associated with
|
||||||
# a job
|
# a job
|
||||||
for signature in PerformanceSignature.objects.filter(
|
logger.warning('Removing performance signatures with missing jobs...')
|
||||||
repository=repository):
|
for signature in PerformanceSignature.objects.all():
|
||||||
if not self.filter(signature=signature).exists():
|
self._maybe_quit(started_at, max_overall_runtime)
|
||||||
signature.delete()
|
|
||||||
|
if not self.filter(
|
||||||
|
repository_id=signature.repository_id, # leverages (repository, signature) compound index
|
||||||
|
signature_id=signature.id).exists():
|
||||||
|
signature.delete()
|
||||||
|
except NoDataCyclingAtAll as ex:
|
||||||
|
logger.warning('Exception: {}'.format(ex))
|
||||||
|
except MaxRuntimeExceeded as ex:
|
||||||
|
logger.warning(ex)
|
||||||
|
|
||||||
|
def _delete_in_chunks(self, max_timestamp, chunk_size, logger,
|
||||||
|
started_at, max_overall_runtime):
|
||||||
|
from django.db import connection
|
||||||
|
any_succesful_attempt = False
|
||||||
|
|
||||||
|
with connection.cursor() as cursor:
|
||||||
|
while True:
|
||||||
|
self._maybe_quit(started_at, max_overall_runtime)
|
||||||
|
|
||||||
|
try:
|
||||||
|
ideal_chunk_size = self._compute_ideal_chunk_size(max_timestamp, chunk_size)
|
||||||
|
cursor.execute('''
|
||||||
|
DELETE FROM `performance_datum`
|
||||||
|
WHERE push_timestamp < %s
|
||||||
|
LIMIT %s
|
||||||
|
''', [max_timestamp, ideal_chunk_size])
|
||||||
|
except Exception as ex:
|
||||||
|
logger.warning('Failed to delete performance data chunk, while running "{}" query '
|
||||||
|
.format(cursor._last_executed))
|
||||||
|
|
||||||
|
if any_succesful_attempt is False:
|
||||||
|
raise NoDataCyclingAtAll from ex
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
deleted_rows = cursor.rowcount
|
||||||
|
|
||||||
|
if deleted_rows == 0:
|
||||||
|
break # finished removing all expired data
|
||||||
|
else:
|
||||||
|
any_succesful_attempt = True
|
||||||
|
logger.warning('Successfully deleted {} performance datum rows'.format(deleted_rows))
|
||||||
|
|
||||||
|
def _compute_ideal_chunk_size(self, max_timestamp, max_chunk_size):
|
||||||
|
'''
|
||||||
|
max_chunk_size may be too big, causing
|
||||||
|
timeouts while attempting deletion;
|
||||||
|
doing basic database query, maybe a lower value is better
|
||||||
|
'''
|
||||||
|
max_id = self.filter(push_timestamp__gt=max_timestamp
|
||||||
|
).order_by('-id')[0].id
|
||||||
|
older_ids = self.filter(push_timestamp__lte=max_timestamp, id__lte=max_id
|
||||||
|
).order_by('id')[:max_chunk_size]
|
||||||
|
|
||||||
|
return len(older_ids) or max_chunk_size
|
||||||
|
|
||||||
|
|
||||||
class PerformanceDatum(models.Model):
|
class PerformanceDatum(models.Model):
|
||||||
|
@ -156,9 +209,9 @@ class PerformanceDatum(models.Model):
|
||||||
class Meta:
|
class Meta:
|
||||||
db_table = 'performance_datum'
|
db_table = 'performance_datum'
|
||||||
index_together = [
|
index_together = [
|
||||||
# this should speed up the typical "get a range of performance datums" query
|
# Speeds up the typical "get a range of performance datums" query
|
||||||
('repository', 'signature', 'push_timestamp'),
|
('repository', 'signature', 'push_timestamp'),
|
||||||
# this should speed up the compare view in treeherder (we only index on
|
# Speeds up the compare view in treeherder (we only index on
|
||||||
# repository because we currently filter on it in the query)
|
# repository because we currently filter on it in the query)
|
||||||
('repository', 'signature', 'push')]
|
('repository', 'signature', 'push')]
|
||||||
unique_together = ('repository', 'job', 'push', 'signature')
|
unique_together = ('repository', 'job', 'push', 'signature')
|
||||||
|
|
Загрузка…
Ссылка в новой задаче