From a5df8a966b1202f3f80872a78f6093ea060cdb77 Mon Sep 17 00:00:00 2001 From: ionutgoldan Date: Fri, 2 Aug 2019 16:04:24 +0300 Subject: [PATCH] Bug 1346567 - Re enable Perfherder data cycler --- tests/conftest.py | 1 - tests/model/test_cycle_data.py | 88 +++++++--- .../model/management/commands/cycle_data.py | 156 ++++++++++++++---- treeherder/perf/exceptions.py | 6 + treeherder/perf/models.py | 99 ++++++++--- 5 files changed, 269 insertions(+), 81 deletions(-) create mode 100644 treeherder/perf/exceptions.py diff --git a/tests/conftest.py b/tests/conftest.py index b27854179..9bab2da8b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -141,7 +141,6 @@ def test_repository(transactional_db): repository_group_id=1, description="", performance_alerts_enabled=True, - expire_performance_data=False ) return r diff --git a/tests/model/test_cycle_data.py b/tests/model/test_cycle_data.py index ff3b48a0d..78a473512 100644 --- a/tests/model/test_cycle_data.py +++ b/tests/model/test_cycle_data.py @@ -7,6 +7,8 @@ from django.core.management import call_command from tests import test_utils from tests.autoclassify.utils import (create_failure_lines, test_line) +from treeherder.model.management.commands.cycle_data import (MINIMUM_PERFHERDER_EXPIRE_INTERVAL, + PerfherderCycler) from treeherder.model.models import (FailureLine, Job, JobDetail, @@ -15,7 +17,9 @@ from treeherder.model.models import (FailureLine, JobType, Machine, Push) +from treeherder.perf.exceptions import MaxRuntimeExceeded from treeherder.perf.models import (PerformanceDatum, + PerformanceDatumManager, PerformanceSignature) from treeherder.services.elasticsearch import (all_documents, count_index, @@ -36,7 +40,7 @@ def test_cycle_all_data(test_repository, failure_classifications, sample_data, job.submit_time = cycle_date_ts job.save() - call_command('cycle_data', sleep_time=0, days=1) + call_command('cycle_data', 'from:treeherder', sleep_time=0, days=1) # There should be no jobs or failure lines after cycling assert Job.objects.count() == 0 @@ -86,7 +90,7 @@ def test_cycle_all_but_one_job(test_repository, failure_classifications, sample_ id=job_not_deleted.id).count() num_job_logs_before = JobLog.objects.count() - call_command('cycle_data', sleep_time=0, days=1, debug=True) + call_command('cycle_data', 'from:treeherder', sleep_time=0, days=1, debug=True) assert Job.objects.count() == 1 assert JobLog.objects.count() == (num_job_logs_before - @@ -124,7 +128,7 @@ def test_cycle_all_data_in_chunks(test_repository, failure_classifications, samp if settings.ELASTICSEARCH_URL: assert count_index() > 0 - call_command('cycle_data', sleep_time=0, days=1, chunk_size=3) + call_command('cycle_data', 'from:treeherder', sleep_time=0, days=1, chunk_size=3) # There should be no jobs after cycling assert Job.objects.count() == 0 @@ -151,7 +155,7 @@ def test_cycle_job_model_reference_data(test_repository, failure_classifications jt = JobType.objects.create(symbol='mu', name='mu') m = Machine.objects.create(name='machine_with_no_job') (jg_id, jt_id, m_id) = (jg.id, jt.id, m.id) - call_command('cycle_data', sleep_time=0, days=1, chunk_size=3) + call_command('cycle_data', 'from:treeherder', sleep_time=0, days=1, chunk_size=3) # assert that reference data that should have been cycled, was cycled assert JobGroup.objects.filter(id=jg_id).count() == 0 @@ -164,7 +168,6 @@ def test_cycle_job_model_reference_data(test_repository, failure_classifications assert Machine.objects.filter(id__in=original_machine_ids).count() == len(original_machine_ids) -@pytest.mark.skip(reason="Perf data cycling temporarily disabled (bug 1346567)") def test_cycle_job_with_performance_data(test_repository, failure_classifications, test_job, mock_log_parser, test_perf_signature): @@ -181,7 +184,7 @@ def test_cycle_job_with_performance_data(test_repository, failure_classification push_timestamp=test_job.push.time, value=1.0) - call_command('cycle_data', sleep_time=0, days=1, chunk_size=3) + call_command('cycle_data', 'from:treeherder', sleep_time=0, days=1, chunk_size=3) # assert that the job got cycled assert Job.objects.count() == 0 @@ -191,15 +194,20 @@ def test_cycle_job_with_performance_data(test_repository, failure_classification assert p.job is None -@pytest.mark.skip(reason="Perf data cycling temporarily disabled (bug 1346567)") -@pytest.mark.parametrize("test_repository_expire_data", [False, True]) -def test_cycle_performance_data(test_repository, push_stored, - test_perf_signature, - test_repository_expire_data): - test_repository.expire_performance_data = test_repository_expire_data +@pytest.mark.parametrize('repository_name, command_options, subcommand_options, should_expire', + [('autoland', '--days=365', None, True), + ('mozilla-inbound', '--days=365', None, True), + ('mozilla-beta', '--days=365', None, True), + ('mozilla-central', '--days=365', None, True), + ('autoland', '--days=401', None, False), + ]) +def test_cycle_performance_data(test_repository, repository_name, push_stored, + test_perf_signature, command_options, subcommand_options, + should_expire): + test_repository.name = repository_name test_repository.save() - expired_timestamp = datetime.datetime.now() - datetime.timedelta(weeks=1) + expired_timestamp = datetime.datetime.now() - datetime.timedelta(days=400) test_perf_signature_2 = PerformanceSignature.objects.create( signature_hash='b'*40, @@ -220,7 +228,7 @@ def test_cycle_performance_data(test_repository, push_stored, push2.time = expired_timestamp push2.save() - # a performance datum that *should not* be deleted + # this shouldn't be deleted in any circumstance PerformanceDatum.objects.create( id=1, repository=test_repository, @@ -230,8 +238,7 @@ def test_cycle_performance_data(test_repository, push_stored, push_timestamp=push1.time, value=1.0) - # a performance datum that *should* be deleted (but only if the - # repository is marked as having expirable performance data) + # the performance datum that which we're targetting PerformanceDatum.objects.create( id=2, repository=test_repository, @@ -241,13 +248,48 @@ def test_cycle_performance_data(test_repository, push_stored, push_timestamp=push2.time, value=1.0) - call_command('cycle_data', sleep_time=0, days=1) + command = filter(lambda arg: arg is not None, + ['cycle_data', command_options, 'from:perfherder', subcommand_options]) + call_command(*list(command)) # test repository isn't a main one - if test_repository_expire_data: + if should_expire: assert list(PerformanceDatum.objects.values_list('id', flat=True)) == [1] - assert list(PerformanceSignature.objects.values_list('id', flat=True)) == [ - test_perf_signature.id] + assert list(PerformanceSignature.objects.values_list('id', flat=True)) == [test_perf_signature.id] else: - assert list(PerformanceDatum.objects.values_list('id', flat=True)) == [1, 2] - assert list(PerformanceSignature.objects.values_list('id', flat=True)) == [ - test_perf_signature.id, test_perf_signature_2.id] + assert PerformanceDatum.objects.count() == 2 + assert PerformanceSignature.objects.count() == 2 + + +def test_performance_cycler_quit_indicator(): + ten_minutes_ago = datetime.datetime.now() - datetime.timedelta(minutes=10) + max_one_second = datetime.timedelta(seconds=1) + + two_seconds_ago = datetime.datetime.now() - datetime.timedelta(seconds=2) + max_five_minutes = datetime.timedelta(minutes=5) + + with pytest.raises(MaxRuntimeExceeded): + PerformanceDatumManager._maybe_quit(started_at=ten_minutes_ago, + max_overall_runtime=max_one_second) + + try: + PerformanceDatumManager._maybe_quit(started_at=two_seconds_ago, + max_overall_runtime=max_five_minutes) + except MaxRuntimeExceeded: + pytest.fail('Performance cycling shouldn\'t have quit') + + +def test_performance_cycler_doesnt_delete_too_recent_data(): + down_to_last_year = MINIMUM_PERFHERDER_EXPIRE_INTERVAL + dangerously_recent = 40 + + with pytest.raises(ValueError): + PerfherderCycler(days=dangerously_recent, + chunk_size=1000, + sleep_time=0) + + try: + PerfherderCycler(days=down_to_last_year, + chunk_size=1000, + sleep_time=0) + except ValueError: + pytest.fail('Should be able to expire data older than one year') diff --git a/treeherder/model/management/commands/cycle_data.py b/treeherder/model/management/commands/cycle_data.py index 68973d2e4..d44e44a89 100644 --- a/treeherder/model/management/commands/cycle_data.py +++ b/treeherder/model/management/commands/cycle_data.py @@ -1,4 +1,5 @@ import datetime +import logging from django.core.management.base import BaseCommand @@ -9,15 +10,104 @@ from treeherder.model.models import (Job, Repository) from treeherder.perf.models import PerformanceDatum +logging.basicConfig(format='%(levelname)s:%(message)s') + +TREEHERDER = 'treeherder' +PERFHERDER = 'perfherder' +TREEHERDER_SUBCOMMAND = 'from:treeherder' +PERFHERDER_SUBCOMMAND = 'from:perfherder' +MINIMUM_PERFHERDER_EXPIRE_INTERVAL = 365 + + +class DataCycler: + source = '' + + def __init__(self, days, chunk_size, sleep_time, is_debug=None, + logger=None, **kwargs): + self.cycle_interval = datetime.timedelta(days=days) + self.chunk_size = chunk_size + self.sleep_time = sleep_time + self.is_debug = is_debug or False + self.logger = logger + + def cycle(self): + pass + + +class TreeherderCycler(DataCycler): + source = TREEHERDER.title() + + def cycle(self): + for repository in Repository.objects.all(): + self.logger.warning("Cycling repository: {0}".format(repository.name)) + rs_deleted = Job.objects.cycle_data(repository, + self.cycle_interval, + self.chunk_size, + self.sleep_time) + self.logger.warning("Deleted {} jobs from {}" + .format(rs_deleted, repository.name)) + + self.remove_leftovers() + + def remove_leftovers(self): + def extract_id_groups(id_names, used_dependencies): + id_groups = {id_name: set() for id_name in id_names} + + for dependency in used_dependencies: + for id_name in id_names: + id_groups[id_name].add(dependency[id_name]) + return [id_groups[name] for name in id_names] + + used_dependencies = (Job.objects + .values('job_type_id', 'job_group_id', 'machine_id') + .distinct()) + + (used_job_type_ids, + used_job_group_ids, + used_machine_ids) = extract_id_groups( + ['job_type_id', + 'job_group_id', + 'machine_id'], + used_dependencies) + + JobType.objects.exclude(id__in=used_job_type_ids).delete() + JobGroup.objects.exclude(id__in=used_job_group_ids).delete() + Machine.objects.exclude(id__in=used_machine_ids).delete() + + +class PerfherderCycler(DataCycler): + source = PERFHERDER.title() + max_runtime = datetime.timedelta(hours=23) + + def __init__(self, days, chunk_size, sleep_time, is_debug=None, + logger=None, **kwargs): + super().__init__(days, chunk_size, sleep_time, is_debug, logger) + if days < MINIMUM_PERFHERDER_EXPIRE_INTERVAL: + raise ValueError('Cannot remove performance data that is more recent than {} days' + .format(MINIMUM_PERFHERDER_EXPIRE_INTERVAL)) + + def cycle(self): + started_at = datetime.datetime.now() + + PerformanceDatum.objects.cycle_data(self.cycle_interval, + self.chunk_size, + self.logger, + started_at, + self.max_runtime) + class Command(BaseCommand): help = """Cycle data that exceeds the time constraint limit""" + CYCLER_CLASSES = { + TREEHERDER: TreeherderCycler, + PERFHERDER: PerfherderCycler, + } def add_arguments(self, parser): parser.add_argument( '--debug', action='store_true', - dest='debug', + dest='is_debug', default=False, help='Write debug messages to stdout' ) @@ -27,7 +117,8 @@ class Command(BaseCommand): dest='days', default=120, type=int, - help='Data cycle interval expressed in days' + help='Data cycle interval expressed in days. ' + 'Minimum {} days when expiring performance data.'.format(MINIMUM_PERFHERDER_EXPIRE_INTERVAL) ) parser.add_argument( '--chunk-size', @@ -44,45 +135,42 @@ class Command(BaseCommand): dest='sleep_time', default=0, type=int, - help='How many seconds to pause between each query' + help='How many seconds to pause between each query. Ignored when cycling performance data.' ) + subparsers = parser.add_subparsers( + description='Data producers from which to expire data', + dest='data_source') + subparsers.add_parser(TREEHERDER_SUBCOMMAND) # default subcommand even if not provided + + # Perfherder will have its own specifics + subparsers.add_parser(PERFHERDER_SUBCOMMAND) def handle(self, *args, **options): - self.is_debug = options['debug'] + logger = self.get_logger(options['is_debug']) - cycle_interval = datetime.timedelta(days=options['days']) + logger.warning("Cycle interval... {}".format(options['days'])) - self.debug("cycle interval... {}".format(cycle_interval)) + data_cycler = self.fabricate_data_cycler(options, logger) + logger.warning('Cycling {0} data...'.format(data_cycler.source)) + data_cycler.cycle() - for repository in Repository.objects.all(): - self.debug("Cycling repository: {0}".format(repository.name)) - rs_deleted = Job.objects.cycle_data(repository, - cycle_interval, - options['chunk_size'], - options['sleep_time']) - self.debug("Deleted {} jobs from {}".format(rs_deleted, - repository.name)) + def get_logger(self, is_debug): + logger = logging.getLogger('cycle_data') + logger.setLevel( + logging.WARNING if is_debug else logging.CRITICAL) + logger.propagate = False - # TODO: Fix the performance issues and re-enable: - # https://bugzilla.mozilla.org/show_bug.cgi?id=1346567#c10 - if False and repository.expire_performance_data: - PerformanceDatum.objects.cycle_data(repository, - cycle_interval, - options['chunk_size'], - options['sleep_time']) + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.WARNING) - self.cycle_non_job_data(options['chunk_size'], options['sleep_time']) + formatter = logging.Formatter('%(asctime)s|%(name)s|%(levelname)s|%(message)s') + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + return logger - def cycle_non_job_data(self, chunk_size, sleep_time): - used_job_type_ids = Job.objects.values('job_type_id').distinct() - JobType.objects.exclude(id__in=used_job_type_ids).delete() + def fabricate_data_cycler(self, options, logger): + data_source = options.pop('data_source') or TREEHERDER_SUBCOMMAND + data_source = data_source.split(':')[1] - used_job_group_ids = Job.objects.values('job_group_id').distinct() - JobGroup.objects.exclude(id__in=used_job_group_ids).delete() - - used_machine_ids = Job.objects.values('machine_id').distinct() - Machine.objects.exclude(id__in=used_machine_ids).delete() - - def debug(self, msg): - if self.is_debug: - self.stdout.write(msg) + cls = self.CYCLER_CLASSES[data_source] + return cls(logger=logger, **options) diff --git a/treeherder/perf/exceptions.py b/treeherder/perf/exceptions.py new file mode 100644 index 000000000..62770bb5c --- /dev/null +++ b/treeherder/perf/exceptions.py @@ -0,0 +1,6 @@ +class NoDataCyclingAtAll(Exception): + pass + + +class MaxRuntimeExceeded(Exception): + pass diff --git a/treeherder/perf/models.py b/treeherder/perf/models.py index 12b45d7b5..7b5edbdb9 100644 --- a/treeherder/perf/models.py +++ b/treeherder/perf/models.py @@ -1,5 +1,4 @@ import datetime -import time from django.contrib.auth.models import User from django.core.exceptions import ValidationError @@ -12,6 +11,8 @@ from treeherder.model.models import (Job, OptionCollection, Push, Repository) +from treeherder.perf.exceptions import (MaxRuntimeExceeded, + NoDataCyclingAtAll) SIGNATURE_HASH_LENGTH = 40 @@ -107,31 +108,83 @@ class PerformanceDatumManager(models.Manager): Convenience functions for operations on groups of performance datums """ - def cycle_data(self, repository, cycle_interval, chunk_size, sleep_time): + @classmethod + def _maybe_quit(cls, started_at, max_overall_runtime): + now = datetime.datetime.now() + elapsed_runtime = now - started_at + + if max_overall_runtime < elapsed_runtime: + raise MaxRuntimeExceeded('Max runtime for performance data cycling exceeded') + + def cycle_data(self, cycle_interval, chunk_size, + logger, started_at, max_overall_runtime): """Delete data older than cycle_interval, splitting the target data into chunks of chunk_size size.""" - max_timestamp = datetime.datetime.now() - cycle_interval - # seperate datums into chunks - while True: - perf_datums_to_cycle = list(self.filter( - repository=repository, - push_timestamp__lt=max_timestamp).values_list('id', flat=True)[:chunk_size]) - if not perf_datums_to_cycle: - # we're done! - break - self.filter(id__in=perf_datums_to_cycle).delete() - if sleep_time: - # Allow some time for other queries to get through - time.sleep(sleep_time) + try: + self._delete_in_chunks(max_timestamp, chunk_size, logger, + started_at, max_overall_runtime) - # also remove any signatures which are (no longer) associated with - # a job - for signature in PerformanceSignature.objects.filter( - repository=repository): - if not self.filter(signature=signature).exists(): - signature.delete() + # also remove any signatures which are (no longer) associated with + # a job + logger.warning('Removing performance signatures with missing jobs...') + for signature in PerformanceSignature.objects.all(): + self._maybe_quit(started_at, max_overall_runtime) + + if not self.filter( + repository_id=signature.repository_id, # leverages (repository, signature) compound index + signature_id=signature.id).exists(): + signature.delete() + except NoDataCyclingAtAll as ex: + logger.warning('Exception: {}'.format(ex)) + except MaxRuntimeExceeded as ex: + logger.warning(ex) + + def _delete_in_chunks(self, max_timestamp, chunk_size, logger, + started_at, max_overall_runtime): + from django.db import connection + any_succesful_attempt = False + + with connection.cursor() as cursor: + while True: + self._maybe_quit(started_at, max_overall_runtime) + + try: + ideal_chunk_size = self._compute_ideal_chunk_size(max_timestamp, chunk_size) + cursor.execute(''' + DELETE FROM `performance_datum` + WHERE push_timestamp < %s + LIMIT %s + ''', [max_timestamp, ideal_chunk_size]) + except Exception as ex: + logger.warning('Failed to delete performance data chunk, while running "{}" query ' + .format(cursor._last_executed)) + + if any_succesful_attempt is False: + raise NoDataCyclingAtAll from ex + break + else: + deleted_rows = cursor.rowcount + + if deleted_rows == 0: + break # finished removing all expired data + else: + any_succesful_attempt = True + logger.warning('Successfully deleted {} performance datum rows'.format(deleted_rows)) + + def _compute_ideal_chunk_size(self, max_timestamp, max_chunk_size): + ''' + max_chunk_size may be too big, causing + timeouts while attempting deletion; + doing basic database query, maybe a lower value is better + ''' + max_id = self.filter(push_timestamp__gt=max_timestamp + ).order_by('-id')[0].id + older_ids = self.filter(push_timestamp__lte=max_timestamp, id__lte=max_id + ).order_by('id')[:max_chunk_size] + + return len(older_ids) or max_chunk_size class PerformanceDatum(models.Model): @@ -156,9 +209,9 @@ class PerformanceDatum(models.Model): class Meta: db_table = 'performance_datum' index_together = [ - # this should speed up the typical "get a range of performance datums" query + # Speeds up the typical "get a range of performance datums" query ('repository', 'signature', 'push_timestamp'), - # this should speed up the compare view in treeherder (we only index on + # Speeds up the compare view in treeherder (we only index on # repository because we currently filter on it in the query) ('repository', 'signature', 'push')] unique_together = ('repository', 'job', 'push', 'signature')