Bug 1204942 - First cut at autoclassify / intermittent orange detection.

This adds an autoclassify command and a detect_intermittents command.
The former is designed to take an incoming job with an error summary
and look for existing results marked as intermittent that are a close
match for the new result. At present only one matcher is implemented;
this requires an exact match in terms of test name, result and error
message. Matching is also constrained to be based on single lines; it
is anticipated that future iterations may add support for matching on
groups of lines.

The detect_intermittents command is designed to take a group of jobs
running on the same push and with the same build job (i.e. same
testsuite, same chunk, etc.) and look for new intermittents to add to
the database. This currently only looks for test failures where there
is at least one green job and one non-green job.

There is currently no UI for seeing matches or for adding new
prototypical intermittents as match candidates. There is also no
integration with bugzilla; future development should add association
of frequent intermittents with bugs.
This commit is contained in:
James Graham 2015-08-17 18:01:39 +01:00
Родитель a073fbc9af
Коммит 7b6fa25402
21 изменённых файлов: 604 добавлений и 6 удалений

Просмотреть файл

@ -19,6 +19,6 @@ if [ ! -f $LOGFILE ]; then
fi
exec $NEWRELIC_ADMIN celery -A treeherder worker -c 3 \
-Q default,cycle_data,calculate_eta,fetch_bugs \
-Q default,cycle_data,calculate_eta,fetch_bugs,autoclassify,detect_intermittents \
-E --maxtasksperchild=500 \
--logfile=$LOGFILE -l INFO -n default.%h

Просмотреть файл

Просмотреть файл

@ -0,0 +1,38 @@
from django.core.management import call_command
from treeherder.model.models import Matcher, Repository
from treeherder.autoclassify.matchers import PreciseTestMatcher
from .utils import test_line, create_failure_lines
def test_classify_test_failure(activate_responses, jm, eleven_jobs_stored, initial_data,
failure_lines, classified_failures):
repository = Repository.objects.get(name=jm.project)
job = jm.get_job(2)[0]
test_failure_lines = create_failure_lines(repository,
job["job_guid"],
[(test_line, {}),
(test_line, {"subtest": "subtest2"}),
(test_line, {"status": "TIMEOUT"}),
(test_line, {"expected": "ERROR"}),
(test_line, {"message": "message2"})])
# Poke some internal state so that we only use a single matcher for the test
Matcher._matcher_funcs = {}
Matcher.objects.register_matcher(PreciseTestMatcher)
call_command('autoclassify', job['job_guid'], jm.project)
for item in test_failure_lines:
item.refresh_from_db()
expected_classified = test_failure_lines[:2]
expected_unclassified = test_failure_lines[2:]
for actual, expected in zip(expected_classified, classified_failures):
assert [item.id for item in actual.classified_failures.all()] == [expected.id]
for item in expected_unclassified:
assert item.classified_failures.count() == 0

Просмотреть файл

@ -0,0 +1,49 @@
from django.core.management import call_command
from treeherder.model.models import Matcher, Repository, ClassifiedFailure
from treeherder.autoclassify.matchers import PreciseTestMatcher
from treeherder.autoclassify.detectors import TestFailureDetector
from .utils import test_line, create_failure_lines
def test_detect_intermittents(activate_responses, jm, eleven_jobs_stored, initial_data,
failure_lines, classified_failures, retriggers):
repository = Repository.objects.get(name=jm.project)
retrigger = retriggers[0]
test_failure_lines = create_failure_lines(repository,
retrigger["job_guid"],
[(test_line, {"subtest": "subtest2"}),
(test_line, {"status": "TIMEOUT"}),
(test_line, {"expected": "ERROR"}),
(test_line, {"message": "message2"})])
old_failure_ids = set(item.id for item in ClassifiedFailure.objects.all())
# Poke some internal state so that we only use a single matcher for the test
Matcher._matcher_funcs = {}
Matcher.objects.register_matcher(PreciseTestMatcher)
Matcher._detector_funcs = {}
detector = Matcher.objects.register_detector(TestFailureDetector)
call_command('detect_intermittents', retrigger['job_guid'], jm.project)
assert ClassifiedFailure.objects.count() == len(old_failure_ids) + 4
matches_seen = set()
failure_ids_seen = old_failure_ids
for item in test_failure_lines:
item.refresh_from_db()
failure_matches = item.matches.all()
assert len(failure_matches) == 1
match = failure_matches[0]
assert match.classified_failure.id not in failure_ids_seen
assert match not in matches_seen
assert match.matcher == detector.db_object
assert match.score == 1
assert match.is_best
matches_seen.add(match)
failure_ids_seen.add(match.classified_failure.id)

Просмотреть файл

@ -0,0 +1,19 @@
from treeherder.model.models import FailureLine
test_line = {"action": "test_result", "test": "test1", "subtest": "subtest1",
"status": "FAIL", "expected": "PASS", "message": "message1"}
def create_failure_lines(repository, job_guid, failure_line_list):
failure_lines = []
for i, (base_data, updates) in enumerate(failure_line_list):
data = {"job_guid": job_guid,
"repository": repository,
"line": i}
data.update(base_data)
data.update(updates)
failure_line = FailureLine(**data)
failure_line.save()
failure_lines.append(failure_line)
return failure_lines

Просмотреть файл

@ -437,3 +437,67 @@ def mock_error_summary(monkeypatch):
monkeypatch.setattr(error_summary, "get_error_summary", _get_error_summary)
return bs_obj
@pytest.fixture
def failure_lines(jm, eleven_jobs_stored, initial_data):
from treeherder.model.models import RepositoryGroup, Repository
from tests.autoclassify.utils import test_line, create_failure_lines
job = jm.get_job(1)[0]
repository_group = RepositoryGroup.objects.create(name="repo_group")
repository = Repository.objects.create(name=jm.project,
repository_group=repository_group)
return create_failure_lines(repository,
job["job_guid"],
[(test_line, {}),
(test_line, {"subtest": "subtest2"})])
@pytest.fixture
def classified_failures(request, jm, eleven_jobs_stored, initial_data, failure_lines):
from treeherder.model.models import ClassifiedFailure, FailureMatch, Matcher
from treeherder.autoclassify import detectors
job_1 = jm.get_job(1)[0]
class TreeherderUnitTestDetector(detectors.Detector):
def __call__(self, failure_lines):
pass
test_matcher = Matcher.objects.register_detector(TreeherderUnitTestDetector)
def finalize():
Matcher._detector_funcs = {}
Matcher._matcher_funcs = {}
request.addfinalizer(finalize)
classified_failures = []
for failure_line in failure_lines:
if failure_line.job_guid == job_1["job_guid"]:
classified_failure = ClassifiedFailure()
classified_failure.save()
match = FailureMatch(failure_line=failure_line,
classified_failure=classified_failure,
matcher=test_matcher.db_object,
score=1.0,
is_best=True)
match.save()
classified_failures.append(classified_failure)
return classified_failures
@pytest.fixture
def retriggers(jm, eleven_jobs_stored):
original = jm.get_job(2)[0]
retrigger = original.copy()
retrigger['job_guid'] = "f1c75261017c7c5ce3000931dce4c442fe0a1298"
jm.execute(proc="jobs_test.inserts.duplicate_job",
placeholders=[retrigger['job_guid'], original['job_guid']])
return [retrigger]

Просмотреть файл

@ -94,5 +94,24 @@
"host_type":"master_host"
}
},
"inserts": {
"duplicate_job": {
"sql":"INSERT INTO job
(job_guid, signature, job_coalesced_to_guid, result_set_id, build_platform_id,
machine_platform_id, machine_id, device_id, option_collection_hash,
job_type_id, product_id, failure_classification_id, who, reason, result,
state, submit_timestamp, start_timestamp, end_timestamp, last_modified,
running_eta, tier, active_status)
SELECT
?, signature, job_coalesced_to_guid, result_set_id, build_platform_id,
machine_platform_id, machine_id, device_id, option_collection_hash,
job_type_id, product_id, failure_classification_id, who, reason, result,
state, submit_timestamp, start_timestamp, end_timestamp, last_modified,
running_eta, tier, active_status
FROM job
WHERE job_guid = ?",
"host_type":"master_host"
}
}
}

Просмотреть файл

Просмотреть файл

@ -0,0 +1,37 @@
import logging
from abc import ABCMeta, abstractmethod
from treeherder.model import models
logger = logging.getLogger(__name__)
class Detector(object):
__metaclass__ = ABCMeta
name = None
"""Class that is called with a list of lines that correspond to
unmatched, intermittent, failures from a specific job and that
returns the indicies of the subset of that list that should be
added as new targets for failure classification."""
def __init__(self, db_object):
self.db_object = db_object
@abstractmethod
def __call__(self, failure_lines):
pass
class TestFailureDetector(Detector):
def __call__(self, failure_lines):
rv = []
for i, failure in enumerate(failure_lines):
if (failure.action == "test_result" and failure.test and failure.status
and failure.expected):
rv.append(i)
return rv
def register():
for obj in [TestFailureDetector]:
models.Matcher.objects.register_detector(obj)

Просмотреть файл

Просмотреть файл

Просмотреть файл

@ -0,0 +1,75 @@
import logging
from collections import defaultdict
from django.core.management.base import BaseCommand, CommandError
from treeherder.autoclassify import matchers
from treeherder.model.models import FailureLine, Matcher, FailureMatch
logger = logging.getLogger(__name__)
# The minimum goodness of match we need to mark a particular match as the best match
AUTOCLASSIFY_CUTOFF_RATIO = 0.8
# Initialisation needed to associate matcher functions with the matcher objects
matchers.register()
class Command(BaseCommand):
args = '<job_guid>, <repository>'
help = 'Mark failures on a job.'
def handle(self, *args, **options):
if not len(args) == 2:
raise CommandError('3 arguments required, %s given' % len(args))
job_id, repository = args
match_errors(repository, job_id)
def match_errors(repository, job_guid):
unmatched_failures = FailureLine.objects.unmatched_for_job(repository, job_guid)
if not unmatched_failures:
return
all_matched = set()
for matcher in Matcher.objects.registered_matchers():
matches = matcher(unmatched_failures)
for match in matches:
match.failure_line.matches.add(
FailureMatch(score=match.score,
matcher=matcher.db_object,
classified_failure=match.classified_failure))
match.failure_line.save()
logger.info("Matched failure %i with intermittent %i" %
(match.failure_line.id, match.classified_failure.id))
all_matched.add(match.failure_line)
if all_lines_matched(unmatched_failures):
break
for failure_line in all_matched:
# TODO: store all matches
best_match = failure_line.best_match(AUTOCLASSIFY_CUTOFF_RATIO)
if best_match:
best_match.is_best = True
best_match.save()
def all_lines_matched(failure_lines):
failure_score_dict = defaultdict(list)
query = FailureMatch.objects.filter(
failure_line__in=failure_lines).only('failure_line_id', 'score')
for failure_match in query:
failure_score_dict[failure_match.failure_line_id].append(failure_match.score)
for failure_line in failure_lines:
scores = failure_score_dict[failure_line.id]
if not scores or not all(score >= 1 for score in scores):
return False
return True

Просмотреть файл

@ -0,0 +1,76 @@
import logging
from django.core.management.base import BaseCommand, CommandError
from treeherder.autoclassify import detectors
from treeherder.model.derived import JobsModel
from treeherder.model.models import FailureLine, Matcher
from .autoclassify import match_errors
logger = logging.getLogger(__name__)
detectors.register()
class Command(BaseCommand):
args = '<job_guid>, <repository>'
help = 'Look for new intermittents in a job'
def handle(self, *args, **options):
if not len(args) == 2:
raise CommandError('2 arguments required, %s given' % len(args))
job_guid, repository = args
with JobsModel(repository) as jobs_model:
jobs = jobs_model.get_job_repeats(job_guid)
add_new_intermittents(repository, jobs)
def add_new_intermittents(repository, jobs):
# The approach here is currently to look for new intermittents to add, one at a time
# and then rerun the matching on other jobs
# TODO: limit the possible matches to those that have just been added
if len(jobs) <= 1:
logger.info("Too few jobs in the current set")
return
# For now conservatively assume that we can only mark new intermittents if
# one run in the current set fully passes
if not any(job["result"] == "success" for job in jobs):
logger.info("No successful jobs to compare against")
return
failures_by_job = FailureLine.objects.for_jobs(*jobs)
for job in jobs:
logger.debug("Looking for new intermittents from job %s" % (job["job_guid"]))
if not job["job_guid"] in failures_by_job:
logger.debug("Job has no failures")
continue
new_matches = set()
for detector in Matcher.objects.registered_detectors():
job_failures = failures_by_job[job["job_guid"]]
unmatched_lines = [item for item in job_failures if
not item.classified_failures.count() and
item.id not in new_matches]
logger.debug("Unmatched lines %r" % unmatched_lines)
if unmatched_lines:
logger.debug("Found %i unmatched lines" % len(unmatched_lines))
line_indicies = detector(unmatched_lines)
for index in line_indicies:
failure = unmatched_lines[index]
failure.create_new_classification(detector.db_object)
new_matches.add(failure.id)
if new_matches:
for rematch_job in jobs:
if rematch_job == job:
continue
logger.debug("Trying rematch on job %s" % (rematch_job["job_guid"]))
match_errors(repository, rematch_job["job_guid"])

Просмотреть файл

@ -0,0 +1,30 @@
from django.core.management.base import BaseCommand, make_option
from treeherder.model.models import Matcher
class Command(BaseCommand):
help = 'Add new matchers or list existing ones'
option_list = BaseCommand.option_list + (
make_option('--add',
action='store',
default=None,
help="Add matcher with the specified name"),
make_option('--remove',
action='store',
default=None,
help="Remove matcher with the specified name")
)
def handle(self, *args, **options):
if not (options["add"] or options["remove"]):
for item in Matcher.objects.all():
print item.name
if options["add"]:
new = Matcher(name=options["add"])
new.save()
if options["remove"]:
Matcher.objects.filter(name=options["remove"]).delete()

Просмотреть файл

@ -0,0 +1,61 @@
import logging
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from treeherder.model import models
from treeherder.model.models import FailureMatch
logger = logging.getLogger(__name__)
Match = namedtuple('Match', ['failure_line', 'classified_failure', 'score'])
class Matcher(object):
__metaclass__ = ABCMeta
"""Class that is called with a list of unmatched failure lines
from a specific job, and returns a list of Match tuples
containing the failure_line that matched, the failure it
matched with, and the score, which is a number in the range
0-1 with 1 being a perfect match and 0 being the worst possible
match."""
def __init__(self, db_object):
self.db_object = db_object
@abstractmethod
def __call__(self, failure_lines):
pass
class PreciseTestMatcher(Matcher):
"""Matcher that looks for existing failures with identical tests and identical error
message."""
def __call__(self, failure_lines):
rv = []
for failure_line in failure_lines:
logger.debug("Looking for test match in failure %d" % failure_line.id)
if failure_line.action == "test_result":
matching_failures = FailureMatch.objects.filter(
failure_line__action="test_result",
failure_line__test=failure_line.test,
failure_line__subtest=failure_line.subtest,
failure_line__status=failure_line.status,
failure_line__expected=failure_line.expected,
failure_line__message=failure_line.message).exclude(
failure_line__job_guid=failure_line.job_guid).order_by(
"-score", "-classified_failure__modified")
best_match = matching_failures.first()
if best_match:
rv.append(Match(failure_line,
best_match.classified_failure,
best_match.score))
return rv
def register():
for obj in [PreciseTestMatcher]:
models.Matcher.objects.register_matcher(obj)

Просмотреть файл

@ -0,0 +1,30 @@
import logging
from celery import task
from django.core.management import call_command
from treeherder import celery_app
logger = logging.getLogger(__name__)
@task(name='autoclassify', max_retries=10)
def autoclassify(project, job_guid):
try:
logger.info('Running autoclassify')
call_command('autoclassify', job_guid, project)
celery_app.send_task('detect-intermittents',
[project, job_guid],
routing_key='detect_intermittents')
except Exception, e:
autoclassify.retry(exc=e, countdown=(1 + autoclassify.request.retries) * 60)
@task(name='detect-intermittents', max_retries=10)
def detect_intermittents(project, job_guid):
try:
logger.info('Running detect intermittents')
# TODO: Make this list configurable
if project == "mozilla-inbound":
call_command('detect_intermittents', job_guid, project)
except Exception, e:
detect_intermittents.retry(exc=e, countdown=(1 + detect_intermittents.request.retries) * 60)

Просмотреть файл

@ -3,6 +3,7 @@ import logging
from celery import task
from django.core.management import call_command
from treeherder import celery_app
from treeherder.log_parser.utils import (extract_json_log_artifacts,
extract_text_log_artifacts, is_parsed,
post_log_artifacts)
@ -57,5 +58,8 @@ def store_error_summary(project, job_log_url, job_guid):
try:
logger.info('Running store_error_summary')
call_command('store_error_summary', job_log_url, job_guid, project)
celery_app.send_task('autoclassify',
[project, job_guid],
routing_key='autoclassify')
except Exception, e:
store_error_summary.retry(exc=e, countdown=(1 + store_error_summary.request.retries) * 60)

Просмотреть файл

@ -1934,6 +1934,13 @@ into chunks of chunk_size size. Returns the number of result sets deleted"""
resultset_status_dict['coalesced'] = num_coalesced
return resultset_status_dict
def get_job_repeats(self, ref_job_guid):
job_list = self.execute(
proc='jobs.selects.get_job_retriggers',
placeholders=[ref_job_guid],
debug_show=self.DEBUG)
return job_list
class JobDataError(ValueError):
pass

Просмотреть файл

@ -2,7 +2,7 @@ from __future__ import unicode_literals
import os
import uuid
from collections import defaultdict
from collections import defaultdict, OrderedDict
from warnings import filterwarnings, resetwarnings
from datasource.bases.BaseHub import BaseHub
@ -518,8 +518,23 @@ class ReferenceDataSignatures(models.Model):
db_table = 'reference_data_signatures'
class FailureLine(models.Model):
class FailureLineManager(models.Manager):
def unmatched_for_job(self, repository, job_guid):
return FailureLine.objects.filter(
job_guid=job_guid,
repository__name=repository,
classified_failures=None,
)
def for_jobs(self, *jobs):
failures = FailureLine.objects.filter(job_guid__in=[item["job_guid"] for item in jobs])
failures_by_job = defaultdict(list)
for item in failures:
failures_by_job[item.job_guid].append(item)
return failures_by_job
class FailureLine(models.Model):
STATUS_LIST = ('PASS', 'FAIL', 'OK', 'ERROR', 'TIMEOUT', 'CRASH', 'ASSERT', 'SKIP', 'NOTRUN')
# Truncated is a special action that we use to indicate that the list of failure lines
# was truncated according to settings.FAILURE_LINES_CUTOFF.
@ -548,6 +563,7 @@ class FailureLine(models.Model):
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
objects = FailureLineManager()
# TODO: add indexes once we know which queries will be typically executed
class Meta:
@ -556,11 +572,31 @@ class FailureLine(models.Model):
('job_guid', 'line')
)
def best_match(self, min_score=0):
match = FailureMatch.objects.filter(failure_line_id=self.id).order_by(
"-score",
"-classified_failure__modified").first()
if match and match.score > min_score:
return match
def create_new_classification(self, matcher):
new_classification = ClassifiedFailure()
new_classification.save()
new_link = FailureMatch(
failure_line=self,
classified_failure=new_classification,
matcher=matcher,
score=1,
is_best=True)
new_link.save()
class ClassifiedFailure(models.Model):
id = BigAutoField(primary_key=True)
failure_lines = models.ManyToManyField(FailureLine, through='FailureMatch',
related_name='intermittent_failures')
related_name='classified_failures')
bug_number = models.PositiveIntegerField(blank=True, null=True)
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
@ -571,16 +607,53 @@ class ClassifiedFailure(models.Model):
db_table = 'classified_failure'
class MatcherManager(models.Manager):
def register_matcher(self, cls):
return self._register(cls, Matcher._matcher_funcs)
def register_detector(self, cls):
return self._register(cls, Matcher._detector_funcs)
def _register(self, cls, dest):
if cls.__name__ in dest:
return dest[cls.__name__]
obj = Matcher.objects.get_or_create(name=cls.__name__)[0]
instance = cls(obj)
dest[cls.__name__] = instance
return instance
def registered_matchers(self):
for matcher in Matcher._matcher_funcs.values():
yield matcher
def registered_detectors(self):
for matcher in Matcher._detector_funcs.values():
yield matcher
class Matcher(models.Model):
name = models.CharField(max_length=50, unique=True)
_detector_funcs = OrderedDict()
_matcher_funcs = OrderedDict()
objects = MatcherManager()
class Meta:
db_table = 'matcher'
def match(self, *args, **kwargs):
if self.name in self._matcher_funcs:
return self._matcher_funcs(*args, **kwargs)
raise ValueError
class FailureMatch(models.Model):
id = BigAutoField(primary_key=True)
failure_line = FlexibleForeignKey(FailureLine)
failure_line = FlexibleForeignKey(FailureLine, related_name="matches")
classified_failure = FlexibleForeignKey(ClassifiedFailure)
matcher = models.ForeignKey(Matcher)
score = models.DecimalField(max_digits=3, decimal_places=2, blank=True, null=True)

Просмотреть файл

@ -689,6 +689,19 @@
group by state, result
",
"host_type": "read_host"
},
"get_job_retriggers":{
"sql":"SELECT
job.job_guid,
job.result
FROM job
JOIN job as ref_job ON
job.signature = ref_job.signature AND
job.result_set_id = ref_job.result_set_id
WHERE
job.state = 'completed' AND
ref_job.job_guid = ?",
"host_type": "read_host"
}
}
}

Просмотреть файл

@ -114,7 +114,8 @@ INSTALLED_APPS = [
'treeherder.etl',
'treeherder.workers',
'treeherder.embed',
'treeherder.perf'
'treeherder.perf',
'treeherder.autoclassify',
]
LOCAL_APPS = []
@ -162,6 +163,8 @@ CELERY_QUEUES = (
Queue('log_parser_hp', Exchange('default'), routing_key='parse_log.high_priority'),
Queue('log_parser_json', Exchange('default'), routing_key='parse_log.json'),
Queue('store_error_summary', Exchange('default'), routing_key='store_error_summary'),
Queue('autoclassify', Exchange('default'), routing_key='autoclassify'),
Queue('detect_intermittents', Exchange('default'), routing_key='detect_intermittents'),
# Queue for mirroring the failure classification activity to Bugzilla/Elasticsearch
Queue('classification_mirroring', Exchange('default'), routing_key='classification_mirroring'),
Queue('error_summary', Exchange('default'), routing_key='error_summary'),