Bug 1268484 - Add elastic-search based matcher for test failure lines (#1488)

Add support for matching test failures where the test, subtest, status,
and expected status are all exact matches, but the message is not an
exact match. The matching uses ElasticSearch and is initially optimised
for cases where the messages differ only in numeric values since this is
a relatively common case.

This commit also adds ElasticSearch to the travis environment.
This commit is contained in:
jgraham 2016-06-29 11:25:38 +01:00 коммит произвёл GitHub
Родитель de914a1cd2
Коммит 52607c2a57
21 изменённых файлов: 654 добавлений и 46 удалений

Просмотреть файл

@ -1,8 +1,9 @@
env:
global:
- BROKER_URL='amqp://guest:guest@localhost:5672//'
- DATABASE_URL='mysql://root@localhost/test_treeherder'
- DATABASE_URL_RO='mysql://root@localhost/test_treeherder'
- BROKER_URL='amqp://guest:guest@localhost:5672//'
- ELASTICSEARCH_URL='http://127.0.0.1:9200'
- TREEHERDER_DJANGO_SECRET_KEY='secretkey-of-at-50-characters-to-pass-check-deploy'
matrix:
include:
@ -84,6 +85,9 @@ matrix:
- rabbitmq
- memcached
before_install:
- curl -o ~/elasticsearch-2.3.3.deb https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/deb/elasticsearch/2.3.3/elasticsearch-2.3.3.deb && sudo dpkg -i --force-confnew ~/elasticsearch-2.3.3.deb
- sudo service elasticsearch restart
- while ! curl localhost:9200 &>/dev/null; do sleep 1; done
- echo -e '\n[mysqld]\nsql_mode="STRICT_ALL_TABLES"\n' | sudo tee -a /etc/mysql/my.cnf
- sudo service mysql restart
# Create a clean virtualenv rather than using the one given to us,
@ -118,6 +122,9 @@ matrix:
- rabbitmq
- memcached
before_install:
- curl -o ~/elasticsearch-2.3.3.deb https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/deb/elasticsearch/2.3.3/elasticsearch-2.3.3.deb && sudo dpkg -i --force-confnew ~/elasticsearch-2.3.3.deb
- sudo service elasticsearch restart
- while ! curl localhost:9200 &>/dev/null; do sleep 1; done
- echo -e '\n[mysqld]\nsql_mode="STRICT_ALL_TABLES"\n' | sudo tee -a /etc/mysql/my.cnf
- sudo service mysql restart
# Create a clean virtualenv rather than using the one given to us,
@ -152,6 +159,9 @@ matrix:
- rabbitmq
- memcached
before_install:
- curl -o ~/elasticsearch-2.3.3.deb https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/deb/elasticsearch/2.3.3/elasticsearch-2.3.3.deb && sudo dpkg -i --force-confnew ~/elasticsearch-2.3.3.deb
- sudo service elasticsearch restart
- while ! curl localhost:9200 &>/dev/null; do sleep 1; done
- echo -e '\n[mysqld]\nsql_mode="STRICT_ALL_TABLES"\n' | sudo tee -a /etc/mysql/my.cnf
- sudo service mysql restart
# Create a clean virtualenv rather than using the one given to us,

Просмотреть файл

@ -31,9 +31,11 @@ line {"etc-hosts":
file {"/etc/profile.d/treeherder.sh":
content => "
export ENABLE_LOCAL_SETTINGS_FILE='True'
export BROKER_URL='amqp://${RABBITMQ_USER}:${RABBITMQ_PASSWORD}@localhost:5672/${RABBITMQ_VHOST}'
export DATABASE_URL='mysql://${DB_USER}:${DB_PASS}@localhost/treeherder'
export DATABASE_URL_RO='mysql://${DB_USER}:${DB_PASS}@localhost/treeherder'
export BROKER_URL='amqp://${RABBITMQ_USER}:${RABBITMQ_PASSWORD}@localhost:5672/${RABBITMQ_VHOST}'
export ELASTICSEARCH_URL='http://localhost:9200'
export TREEHERDER_DEBUG='True'
export ENABLE_DEBUG_TOOLBAR='True'
export TREEHERDER_DJANGO_SECRET_KEY='${DJANGO_SECRET_KEY}'

Просмотреть файл

@ -89,3 +89,12 @@ djangorestframework-filters==0.8.0 --hash=sha256:c4d77bc01af20cc7551f704f35db5aa
pylibmc==1.5.1 --hash=sha256:ecba261859c3e1ba3365389cb4f4dfffb7e02120a9f57a288cacf2f42c45cdd6
django-pylibmc==0.6.1 --hash=sha256:9cffdee703aaf9ebc029d9dbdee8abdd0723564b95e4b2ac59e4a668b8e58f93
elasticsearch==2.3.0 --hash=sha256:6f184507c151bf8b093b86c0b7cd576a1d730acee03e8213cae367f196ad4c5c
elasticsearch-dsl==2.0.0 --hash=sha256:d19a055b2c4f5b537dc4e3d8f3d7d8392d9c090f8790d71697e587550e9f812a
# required by elasticsearch
urllib3==1.16 --hash=sha256:8a46ee9b6b4487ba994e97f9e5eab48513c9b3ebdddc630ee9a899e041147695
certifi==2016.2.28 --hash=sha256:75c33d546e0a732a4606749cbadcd81929f30d8b814061ca93cde49933dbb860

Просмотреть файл

@ -6,6 +6,7 @@ from django.core.management import call_command
from treeherder.autoclassify.detectors import (ManualDetector,
TestFailureDetector)
from treeherder.autoclassify.matchers import (CrashSignatureMatcher,
ElasticSearchTestMatcher,
PreciseTestMatcher,
time_window)
from treeherder.model.models import (ClassifiedFailure,
@ -222,6 +223,55 @@ def test_classify_skip_ignore(activate_responses, jm, test_project, test_reposit
assert item.classified_failures.count() == 0
def test_classify_es(activate_responses, jm, test_project, test_repository,
eleven_jobs_stored, failure_lines, classified_failures):
job = jm.get_job(2)[0]
test_failure_lines = create_failure_lines(test_repository,
job["job_guid"],
[(test_line, {}),
(test_line, {"message": "message2"}),
(test_line, {"message": "message 1.2"}),
(test_line, {"message": "message 0x1F"}),
(test_line, {"subtest": "subtest3"}),
(test_line, {"status": "TIMEOUT"}),
(test_line, {"expected": "ERROR"})])
autoclassify(jm, job, test_failure_lines, [ElasticSearchTestMatcher])
expected_classified = test_failure_lines[:4]
expected_unclassified = test_failure_lines[4:]
for actual in expected_classified:
assert [item.id for item in actual.classified_failures.all()] == [classified_failures[0].id]
for item in expected_unclassified:
assert item.classified_failures.count() == 0
def test_classify_multiple(activate_responses, jm, test_project, test_repository,
eleven_jobs_stored, failure_lines, classified_failures):
job = jm.get_job(2)[0]
test_failure_lines = create_failure_lines(test_repository,
job["job_guid"],
[(test_line, {}),
(test_line, {"message": "message 1.2"})])
expected_classified_precise = [test_failure_lines[0]]
expected_classified_fuzzy = [test_failure_lines[1]]
autoclassify(jm, job, test_failure_lines, [PreciseTestMatcher, ElasticSearchTestMatcher])
for actual, expected in zip(expected_classified_precise, classified_failures):
assert [item.id for item in actual.classified_failures.all()] == [expected.id]
assert [item.matcher.id == 1 for item in item.matches.all()]
for actual, expected in zip(expected_classified_fuzzy, classified_failures):
assert [item.id for item in actual.classified_failures.all()] == [expected.id]
assert [item.matcher.id == 2 for item in item.matches.all()]
def test_classify_crash(activate_responses, jm, test_project, test_repository,
eleven_jobs_stored, test_matcher):
job_ref = jm.get_job(1)[0]

Просмотреть файл

@ -6,6 +6,7 @@ from mozlog.formatters.tbplformatter import TbplFormatter
from treeherder.model.derived.artifacts import ArtifactsModel
from treeherder.model.models import (FailureLine,
MatcherManager)
from treeherder.model.search import refresh_all
test_line = {"action": "test_result", "test": "test1", "subtest": "subtest1",
"status": "FAIL", "expected": "PASS", "message": "message1"}
@ -23,8 +24,11 @@ def create_failure_lines(repository, job_guid, failure_line_list):
data.update(updates)
failure_line = FailureLine(**data)
failure_line.save()
failure_line.elastic_search_insert()
failure_lines.append(failure_line)
refresh_all()
return failure_lines

Просмотреть файл

@ -51,6 +51,16 @@ def increment_cache_key_prefix():
cache.key_prefix = "t{0}".format(key_prefix_counter)
@pytest.fixture
def elasticsearch(request):
from treeherder.model.search import connection, doctypes, refresh_all
for item in doctypes():
connection.indices.delete(item._doc_type.index, ignore=404)
refresh_all()
item.init()
@pytest.fixture
def jobs_ds(request, transactional_db):
from treeherder.model.models import Datasource
@ -346,7 +356,7 @@ def mock_error_summary(monkeypatch):
@pytest.fixture
def failure_lines(jm, test_repository, eleven_jobs_stored):
def failure_lines(jm, test_repository, eleven_jobs_stored, elasticsearch):
from tests.autoclassify.utils import test_line, create_failure_lines
test_repository.save()
@ -392,6 +402,7 @@ def test_matcher(request):
def classified_failures(request, jm, eleven_jobs_stored, failure_lines,
test_matcher, failure_classifications):
from treeherder.model.models import ClassifiedFailure
from treeherder.model.search import refresh_all
job_1 = jm.get_job(1)[0]
@ -405,6 +416,7 @@ def classified_failures(request, jm, eleven_jobs_stored, failure_lines,
mark_best=True)
classified_failures.append(classified_failure)
refresh_all()
return classified_failures

Просмотреть файл

@ -9,6 +9,7 @@ from treeherder.log_parser.failureline import (char_to_codepoint_ucs2,
from treeherder.model.models import (FailureLine,
Job,
JobLog)
from treeherder.model.search import TestFailureLine
from ..sampledata import SampleData
@ -163,3 +164,29 @@ def test_store_error_summary_duplicate(activate_responses, test_repository, jm,
"line": 2}])
assert FailureLine.objects.count() == 2
def test_store_error_summary_elastic_search(activate_responses, test_repository,
jm, eleven_jobs_stored, elasticsearch):
log_path = SampleData().get_log_path("plain-chunked_errorsummary.log")
log_url = 'http://my-log.mozilla.org'
with open(log_path) as log_handler:
responses.add(responses.GET, log_url,
body=log_handler.read(), status=200)
job = Job.objects.get(guid=jm.get_job(1)[0]['job_guid'])
log_obj = JobLog.objects.create(job=job, name="errorsummary_json", url=log_url)
store_failure_lines(jm.project, job.guid, log_obj)
assert FailureLine.objects.count() == 1
failure = FailureLine.objects.get(pk=1)
es_line = TestFailureLine.get(1, routing=failure.test)
for prop in ["test", "subtest", "status", "expected"]:
assert getattr(es_line, prop) == getattr(failure, prop)
assert es_line.best_classification is None
assert es_line.best_is_verified is False

Просмотреть файл

@ -22,6 +22,8 @@ from treeherder.model.models import (FailureClassification,
JobType,
Machine,
TaskSetMeta)
from treeherder.model.search import (TestFailureLine,
refresh_all)
slow = pytest.mark.slow
xfail = pytest.mark.xfail
@ -435,6 +437,8 @@ def test_cycle_all_data(jm, sample_data,
call_command('cycle_data', sleep_time=0, days=1)
refresh_all()
jobs_after = jm.execute(proc="jobs_test.selects.jobs")
assert len(jobs_after) == len(jobs_before) - len(jobs_to_be_deleted)
@ -446,10 +450,13 @@ def test_cycle_all_data(jm, sample_data,
assert JobDetail.objects.count() == 0
assert JobLog.objects.count() == 0
# There should be nothing in elastic search after cycling
assert TestFailureLine.search().params(search_type="count").execute().hits.total == 0
def test_cycle_one_job(jm, sample_data,
sample_resultset, test_repository, mock_log_parser,
failure_lines):
elasticsearch, failure_lines):
"""
Test cycling one job in a group of jobs to confirm there are no
unexpected deletions
@ -497,6 +504,7 @@ def test_cycle_one_job(jm, sample_data,
job_logs_before = JobLog.objects.count()
call_command('cycle_data', sleep_time=0, days=1, debug=True)
refresh_all()
jobs_after = jm.execute(proc="jobs_test.selects.jobs")
@ -519,6 +527,8 @@ def test_cycle_one_job(jm, sample_data,
assert (set(item.id for item in object_type.objects.all()) ==
set(item.id for item in objects))
assert set(int(item.meta.id) for item in TestFailureLine.search().execute()) == set(item.id for item in extra_objects["failure_lines"][1])
def test_cycle_all_data_in_chunks(jm, sample_data,
sample_resultset, test_repository, mock_log_parser):
@ -549,7 +559,10 @@ def test_cycle_all_data_in_chunks(jm, sample_data,
jobs_before = jm.execute(proc="jobs_test.selects.jobs")
assert TestFailureLine.search().params(search_type="count").execute().hits.total > 0
call_command('cycle_data', sleep_time=0, days=1, chunk_size=3)
refresh_all()
jobs_after = jm.execute(proc="jobs_test.selects.jobs")
@ -560,6 +573,7 @@ def test_cycle_all_data_in_chunks(jm, sample_data,
assert Job.objects.count() == 0
assert FailureLine.objects.count() == 0
assert JobDetail.objects.count() == 0
assert TestFailureLine.search().params(search_type="count").execute().hits.total == 0
def test_cycle_task_set_meta(jm):

Просмотреть файл

@ -14,3 +14,9 @@ PULSE_EXCHANGE_NAMESPACE = 'test'
# Set a fake api key for testing bug filing
BZ_API_KEY = "12345helloworld"
BZ_API_URL = "https://thisisnotbugzilla.org"
# ELASTIC SEARCH
# Prefix indices used in tests to avoid clobbering data
ELASTIC_SEARCH.update({
"index_prefix": "test",
})

Просмотреть файл

@ -10,6 +10,7 @@ from treeherder.model.derived import ArtifactsModel
from treeherder.model.models import (FailureLine,
Matcher,
MatcherManager)
from treeherder.model.search import TestFailureLine
def test_get_failure_line(webapp, eleven_jobs_stored, jm, failure_lines):
@ -57,6 +58,10 @@ def test_update_failure_line_verify(eleven_jobs_stored, jm, failure_lines,
assert failure_line.best_classification == classified_failures[0]
assert failure_line.best_is_verified
es_line = TestFailureLine.get(failure_line.id, routing=failure_line.test)
assert es_line.best_classification == classified_failures[0].id
assert es_line.best_is_verified
def test_update_failure_line_replace(eleven_jobs_stored, jm, failure_lines,
classified_failures, test_user):

Просмотреть файл

@ -98,19 +98,3 @@ def update_db(jm, job_id, matches, all_matched):
if all_matched:
jm.update_after_autoclassification(job_id)
def all_lines_matched(failure_lines):
failure_score_dict = defaultdict(list)
query = FailureMatch.objects.filter(
failure_line__in=failure_lines).only('failure_line_id', 'score')
for failure_match in query:
failure_score_dict[failure_match.failure_line_id].append(failure_match.score)
for failure_line in failure_lines:
scores = failure_score_dict[failure_line.id]
if not scores or not all(score >= 1 for score in scores):
return False
return True

Просмотреть файл

@ -0,0 +1,109 @@
import json
import logging
import time
from collections import defaultdict
from optparse import make_option
from django.conf import settings
from django.core.management.base import BaseCommand
from treeherder.autoclassify import matchers
from treeherder.etl.common import fetch_text
from treeherder.model.models import FailureLine
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Mark failures on a job.'
option_list = BaseCommand.option_list + (
make_option('--min-id',
action='store',
type=int,
default=None,
help='Minimum id of failure line to use'),
make_option('--num-lines',
action='store',
type=int,
default=1000,
help='Minimum id of failure line to use'),
make_option('--profile',
action='store',
type=str,
default=None,
help='Enable profiling and write output to this file'),
make_option('--ref-data',
action='store',
type=str,
default=None,
help='json file to compare results to'))
def handle(self, *args, **options):
if options["min_id"] is None:
options["min_id"] = (FailureLine.objects
.filter(action="test_result")
.exclude(message=None)
.exclude(message="")
.order_by("-id")
.values_list("id", flat=True)[options["num_lines"]])
failure_lines = (FailureLine.objects
.filter(id__gt=options["min_id"],
action="test_result")
.exclude(message=None)
.exclude(message="")
.order_by("id")[:options["num_lines"]])
self.stderr.write("Using min id %d" % options["min_id"])
self.stderr.write("Got %d lines" % len(failure_lines))
t0 = time.time()
fetch_text(settings.ELASTIC_SEARCH["url"])
self.stderr.write("Simple GET took %dms" % ((time.time() - t0) * 1000))
failure_lines_by_job = defaultdict(list)
for line in failure_lines:
failure_lines_by_job[line.job_guid].append(line)
matcher = matchers.ElasticSearchTestMatcher(None)
all_matches = {}
if options["profile"]:
import cProfile
prof = cProfile.Profile()
prof.enable()
total_lines = 0
t0 = time.time()
for job_guid, failure_lines in failure_lines_by_job.iteritems():
total_lines += len(failure_lines)
matches = matcher(failure_lines)
all_matches[job_guid] = matches
duration = 1000 * (time.time() - t0)
self.stderr.write("Total lines %d" % total_lines)
self.stderr.write("Total lines in matcher %d" % matcher.lines)
self.stderr.write("Called ElasticSearch %i times" % matcher.calls)
self.stderr.write("Took %dms" % duration)
if options["profile"]:
prof.disable()
prof.dump_stats(options["profile"])
json_data = {}
for key, values in all_matches.iteritems():
json_values = [[item[0].id, item[1].id, item[2]] for item in values]
json_data[key] = json_values
json_string = json.dumps(json_data)
if options["ref_data"]:
with open(options["ref_data"]) as f:
ref_data = json.load(f)
this_data = json.loads(json_string)
if this_data == ref_data:
self.stderr.write("Output matches refdata")
else:
self.stderr.write("Output does not match refdata")
self.stdout.write(json_string)

Просмотреть файл

@ -5,13 +5,19 @@ from abc import (ABCMeta,
from collections import namedtuple
from datetime import (datetime,
timedelta)
from difflib import SequenceMatcher
from django.conf import settings
from django.db.models import Q
from elasticsearch_dsl.query import Match as ESMatch
from treeherder.autoclassify.management.commands.autoclassify import AUTOCLASSIFY_GOOD_ENOUGH_RATIO
from treeherder.model.models import (FailureLine,
from treeherder.model.models import (ClassifiedFailure,
FailureLine,
FailureMatch,
MatcherManager)
from treeherder.model.search import (TestFailureLine,
es_connected)
logger = logging.getLogger(__name__)
@ -110,6 +116,53 @@ class PreciseTestMatcher(Matcher):
return rv
class ElasticSearchTestMatcher(Matcher):
"""Matcher that looks for existing failures with identical tests, and error
message that is a good match when non-alphabetic tokens have been removed."""
def __init__(self, *args, **kwargs):
Matcher.__init__(self, *args, **kwargs)
self.lines = 0
self.calls = 0
@es_connected(default=[])
def __call__(self, failure_lines):
rv = []
self.lines += len(failure_lines)
for failure_line in failure_lines:
if failure_line.action != "test_result" or not failure_line.message:
logger.debug("Skipped elasticsearch matching")
continue
match = ESMatch(message={"query": failure_line.message[:1024],
"type": "phrase"})
search = (TestFailureLine.search()
.filter("term", test=failure_line.test)
.filter("term", status=failure_line.status)
.filter("term", expected=failure_line.expected)
.filter("exists", field="best_classification")
.query(match))
if failure_line.subtest:
search = search.filter("term", subtest=failure_line.subtest)
try:
self.calls += 1
resp = search.execute()
except:
logger.error("Elastic search lookup failed: %s %s %s %s %s",
failure_line.test, failure_line.subtest, failure_line.status,
failure_line.expected, failure_line.message)
raise
scorer = MatchScorer(failure_line.message)
matches = [(item, item.message) for item in resp]
best_match = scorer.best_match(matches)
if best_match:
logger.debug("Matched using elastic search test matcher")
rv.append(Match(failure_line,
ClassifiedFailure.objects.get(
id=best_match[1].best_classification),
best_match[0]))
return rv
class CrashSignatureMatcher(Matcher):
"""Matcher that looks for crashes with identical signature"""
@ -148,6 +201,34 @@ class CrashSignatureMatcher(Matcher):
return rv
class MatchScorer(object):
"""Simple scorer for similarity of strings based on python's difflib
SequenceMatcher"""
def __init__(self, target):
""":param target: The string to which candidate strings will be
compared"""
self.matcher = SequenceMatcher(lambda x: x == " ")
self.matcher.set_seq2(target)
def best_match(self, matches):
"""Return the most similar string to the target string from a list
of candidates, along with a score indicating the goodness of the match.
:param matches: A list of candidate matches
:returns: A tuple of (score, best_match)"""
best_match = None
for match, message in matches:
self.matcher.set_seq1(message)
ratio = self.matcher.quick_ratio()
if best_match is None or ratio >= best_match[0]:
new_ratio = self.matcher.ratio()
if best_match is None or new_ratio > best_match[0]:
best_match = (new_ratio, match)
return best_match
def register():
for obj in [PreciseTestMatcher, CrashSignatureMatcher]:
for obj_name in settings.AUTOCLASSIFY_MATCHERS:
obj = globals()[obj_name]
MatcherManager.register_matcher(obj)

Просмотреть файл

@ -393,6 +393,9 @@ SILENCED_SYSTEM_CHECKS = [
# Enable integration between autoclassifier and jobs
AUTOCLASSIFY_JOBS = env.bool("AUTOCLASSIFY_JOBS", default=False)
# Ordered list of matcher classes to use during autoclassification
AUTOCLASSIFY_MATCHERS = ["PreciseTestMatcher", "CrashSignatureMatcher",
"ElasticSearchTestMatcher"]
# timeout for requests to external sources
# like ftp.mozilla.org or hg.mozilla.org
@ -543,3 +546,9 @@ HAWK_CREDENTIALS_LOOKUP = 'treeherder.webapp.api.auth.hawk_lookup'
# This is the client ID used by the internal data ingestion service.
ETL_CLIENT_ID = 'treeherder-etl'
# Configuration for elasticsearch backend
ELASTIC_SEARCH = {
"url": env.str('ELASTICSEARCH_URL', default=""),
"index_prefix": ""
}

Просмотреть файл

@ -13,6 +13,8 @@ from treeherder.etl.common import fetch_text
from treeherder.model.models import (FailureLine,
JobLog,
Repository)
from treeherder.model.search import (TestFailureLine,
bulk_insert)
logger = logging.getLogger(__name__)
@ -59,7 +61,7 @@ def write_failure_lines(repository, job_guid, job_log, log_iter):
retry = False
with transaction.atomic():
try:
create(repository, job_guid, job_log, log_list)
failure_lines = create(repository, job_guid, job_log, log_list)
except OperationalError as e:
logger.warning("Got OperationalError inserting failure_line")
# Retry iff this error is the "incorrect String Value" error
@ -82,18 +84,29 @@ def write_failure_lines(repository, job_guid, job_log, log_iter):
if retry:
with transaction.atomic():
log_list = list(transformer(log_list))
print log_list
create(repository, job_guid, job_log, log_list)
failure_lines = create(repository, job_guid, job_log, log_list)
create_es(failure_lines)
def create(repository, job_guid, job_log, log_list):
FailureLine.objects.bulk_create(
[FailureLine(repository=repository, job_guid=job_guid, job_log=job_log,
**failure_line)
for failure_line in log_list]
)
failure_lines = [
FailureLine.objects.create(repository=repository, job_guid=job_guid, job_log=job_log,
**failure_line)
for failure_line in log_list]
job_log.status == JobLog.PARSED
job_log.save()
return failure_lines
def create_es(failure_lines):
# Store the failure lines in elastic_search
es_lines = []
for failure_line in failure_lines:
es_line = TestFailureLine.from_model(failure_line)
if es_line:
es_lines.append(es_line)
bulk_insert(es_lines)
def replace_astral(log_list):

Просмотреть файл

@ -35,6 +35,8 @@ from treeherder.model.models import (BuildPlatform,
ReferenceDataSignatures,
Repository,
TextLogSummaryLine)
from treeherder.model.search import bulk_delete as es_delete
from treeherder.model.search import TestFailureLine
from treeherder.model.tasks import (populate_error_summary,
publish_job_action,
publish_resultset_action)
@ -748,7 +750,10 @@ into chunks of chunk_size size. Returns the number of result sets deleted"""
# Remove ORM entries for these jobs (objects referring to Job, like
# JobDetail and JobLog, are cycled automatically via ON DELETE
# CASCADE)
orm_delete(FailureLine, FailureLine.objects.filter(job_guid__in=job_guid_list),
failure_line_query = FailureLine.objects.filter(job_guid__in=job_guid_list)
es_delete_data = failure_line_query.values_list("id", "test")
es_delete(TestFailureLine, es_delete_data)
orm_delete(FailureLine, failure_line_query,
chunk_size, sleep_time)
orm_delete(Job, Job.objects.filter(guid__in=job_guid_list),
chunk_size, sleep_time)

Просмотреть файл

@ -0,0 +1,80 @@
import time
from optparse import make_option
from django.core.management.base import BaseCommand
from elasticsearch_dsl import Search
from treeherder.model.models import FailureLine
from treeherder.model.search import (TestFailureLine,
bulk_insert,
connection)
class Command(BaseCommand):
help = """Populate ElasticSearch with data from the DB failure_line table.
This script must be run when ElasticSearch is first set up, to ensure that
existing data is considered for matching failure lines."""
option_list = BaseCommand.option_list + (
make_option('--recreate',
action='store_true',
help="Delete and recreate index"),
make_option('--chunk-size',
action='store',
type='int',
default=10000,
help='Chunk size to use for select/insert'),
make_option('--sleep',
action='store',
type='int',
default=1,
help='Seconds to sleep between batches'),
)
def handle(self, *args, **options):
min_id = FailureLine.objects.order_by('id').values_list("id", flat=True)[0] - 1
chunk_size = options['chunk_size']
if options["recreate"]:
connection.indices.delete(TestFailureLine._doc_type.index, ignore=404)
TestFailureLine.init()
else:
if connection.indices.exists(TestFailureLine._doc_type.index):
self.stderr.write("Index already exists; can't perform import")
return
while True:
rows = (FailureLine.objects
.filter(id__gt=min_id)
.order_by('id')
.values("id", "job_guid", "action", "test", "subtest",
"status", "expected", "message", "best_classification_id",
"best_is_verified"))[:chunk_size]
if not rows:
break
es_lines = []
for item in rows:
es_line = failure_line_from_value(item)
if es_line:
es_lines.append(es_line)
self.stdout.write("Inserting %i rows" % len(es_lines))
bulk_insert(es_lines)
min_id = rows[len(rows) - 1]["id"]
time.sleep(options['sleep'])
s = Search(doc_type=TestFailureLine).params(search_type="count")
self.stdout.write("Index contains %i documents" % s.execute().hits.total)
def failure_line_from_value(line):
if line["action"] == "test_result":
rv = TestFailureLine(job_guid=line["job_guid"],
test=line["test"],
subtest=line["subtest"],
status=line["status"],
expected=line["expected"],
message=line["message"],
best_classification=line["best_classification_id"],
best_is_verified=line["best_is_verified"])
rv.meta.id = line["id"]
return rv

Просмотреть файл

@ -26,6 +26,7 @@ from treeherder import path
from .fields import (BigAutoField,
FlexibleForeignKey)
from .search import TestFailureLine
# the cache key is specific to the database name we're pulling the data from
SOURCES_CACHE_KEY = "treeherder-datasources"
@ -760,6 +761,7 @@ class FailureLine(models.Model):
if mark_best:
self.best_classification = classification
self.save(update_fields=['best_classification'])
self.elastic_search_insert()
return classification, new_link
def mark_best_classification_verified(self, classification):
@ -770,6 +772,7 @@ class FailureLine(models.Model):
self.best_classification = classification
self.best_is_verified = True
self.save()
self.elastic_search_insert()
def _serialized_components(self):
if self.action == "test_result":
@ -818,6 +821,12 @@ class FailureLine(models.Model):
classification, _ = self.set_classification(manual_detector)
self.mark_best_classification_verified(classification)
def elastic_search_insert(self):
es_line = TestFailureLine.from_model(self)
if es_line:
es_line.save()
return es_line
class ClassifiedFailure(models.Model):
id = BigAutoField(primary_key=True)

191
treeherder/model/search.py Normal file
Просмотреть файл

@ -0,0 +1,191 @@
import logging
from functools import wraps
import certifi
from django.conf import settings
from elasticsearch.helpers import bulk
from elasticsearch_dsl import (Boolean,
DocType,
Index,
Integer,
String,
analyzer,
tokenizer)
from elasticsearch_dsl.connections import connections
logger = logging.getLogger(__name__)
# Tokenizer that splits on tokens matching a hex number
# a decimal number, or anything non-alphanumeric.
message_tokenizer = tokenizer('message_tokenizer',
'pattern',
pattern=r"0x[0-9a-fA-F]+|[\W0-9]+?")
message_analyzer = analyzer('message_analyzer',
type="custom",
tokenizer=message_tokenizer,
filters=[])
class RoutedDocType(DocType):
_routing_key = None
@property
def routing(self):
return getattr(self, self._routing_key)
@classmethod
def get(cls, id, **kwargs):
if "routing" not in kwargs:
raise TypeError("Must supply 'routing' parameter to get")
return super(RoutedDocType, cls).get(id, **kwargs)
def save(self, **kwargs):
if "routing" not in kwargs:
kwargs["routing"] = self.routing
return super(RoutedDocType, self).save(**kwargs)
def index(name):
if settings.ELASTIC_SEARCH["index_prefix"]:
name = "%s-%s" % (settings.ELASTIC_SEARCH["index_prefix"], name)
return Index(name)
test_failure_line = index("test-failure-line")
test_failure_line.settings(number_of_shards=10)
@test_failure_line.doc_type
class TestFailureLine(RoutedDocType):
"""DocType representing a test with an unexpected result
and an error message"""
_routing_key = "test"
job_guid = String(required=True, index='not_analyzed')
test = String(required=True, index='not_analyzed')
subtest = String(required=True, index='not_analyzed')
status = String(required=True, index='not_analyzed')
expected = String(required=True, index='not_analyzed')
best_classification = Integer(index='not_analyzed')
best_is_verified = Boolean(index='not_analyzed')
message = String(analyzer=message_analyzer)
@classmethod
def from_model(cls, line):
"""Create a TestFailureLine object from a FailureLine model instance."""
if line.action == "test_result":
rv = cls(job_guid=line.job_guid,
test=line.test,
subtest=line.subtest,
status=line.status,
expected=line.expected,
message=line.message,
best_classification=(line.best_classification.id
if line.best_classification else None),
best_is_verified=line.best_is_verified)
rv.meta.id = line.id
return rv
def es_connected(default=None):
"""Decorator that runs the decorated function only if we have an
elasticsearch connection, and otherwise returns a default value.
:param default: The default value to return in the absence of a
decorator"""
logged_warning = [False]
def decorator(func):
@wraps(func)
def inner(*args, **kwargs):
if connection is None:
if not logged_warning[0]:
logger.warning(
"Tried to use elasticsearch with %s, but no connection found.",
func.__name__)
logged_warning[0] = True
return default
return func(*args, **kwargs)
return inner
return decorator
@es_connected()
def bulk_insert(items):
"""Insert multiple items into ElasticSearch.
:param items: An iterable containing items that are
instances of subclasses of elasticsearch_dsl.DocType"""
bulk_data = []
for item in items:
data = item.to_dict(include_meta=True)
data["_routing"] = item.routing
bulk_data.append(data)
try:
return bulk(connection, bulk_data)
except Exception as e:
logger.error(e)
raise
@es_connected()
def bulk_delete(cls, ids_routing):
"""Delete multiple items from elasticsearch by document id
:param cls: The DocType subclass of the items being deleted.
:param ids_routing: Iterable of (document ids, routing key) to delete."""
actions = []
for (id, routing) in ids_routing:
actions.append({
'_op_type': 'delete',
'_index': cls._doc_type.index,
'_type': cls._doc_type.name,
'_id': id,
'_routing': routing})
bulk(connection, actions)
def refresh_all():
"""Refresh all elasticsearch indicies. This is only intended for
test use, so that inserted documents are updated immediately and
tests are not random"""
if connection is None:
logger.error("Must have an elastic search connection")
raise ValueError("Tried to elasticsearch with no connection specified")
logger.info("Refreshing all es indices")
return connection.indices.refresh()
def doctypes():
"""List of all DocType subclasses"""
return [item for item in globals().values()
if type(item) == type(DocType) and
issubclass(item, DocType) and
item._doc_type.index]
def _init():
es_url = settings.ELASTIC_SEARCH["url"]
if not es_url:
return
connection = connections.create_connection(
hosts=[es_url],
verify_certs=es_url.startswith("https"),
ca_certs=certifi.where(),
timeout=20)
# Create any indices that are missing
indices = connection.indices.get("*")
for item in doctypes():
if item._doc_type.index not in indices:
item.init()
connection.indices.put_mapping(doc_type=item._doc_type.name,
index=item._doc_type.index,
body={"_routing": {"required": True}})
return connection
connection = _init()

Просмотреть файл

@ -46,3 +46,4 @@ def orm_delete(model, queryset, chunk_size, sleep_time):
if sleep_time:
# Allow some time for other queries to get through
time.sleep(sleep_time)
return delete_ids

Просмотреть файл

@ -11,9 +11,7 @@ from rest_framework.status import (HTTP_200_OK,
from treeherder.model.derived import JobsModel
from treeherder.model.models import (ClassifiedFailure,
FailureLine,
FailureMatch,
Matcher)
FailureLine)
from treeherder.webapp.api import serializers
from treeherder.webapp.api.utils import as_dict
@ -74,18 +72,7 @@ class FailureLineViewSet(mixins.RetrieveModelMixin, viewsets.GenericViewSet):
by_project[failure_line.repository.name].append(failure_line.job_guid)
failure_line.best_classification = classification
failure_line.best_is_verified = True
failure_line.save()
if (classification is not None and
classification not in failure_line.classified_failures.all()):
manual_detector = Matcher.objects.get(name="ManualDetector")
match = FailureMatch(failure_line=failure_line,
classified_failure=classification,
matcher=manual_detector,
score=1.0)
match.save()
failure_line.mark_best_classification_verified(classification)
for project, job_guids in by_project.iteritems():
with JobsModel(project) as jm: