mozperftest-tools/high-value-tests/generate_high_value_tests.py

250 строки
7.8 KiB
Python

import argparse
import csv
import numpy as np
import os
import random
def highvalue_parser():
"""
Parser for the high-value test generator.
"""
parser = argparse.ArgumentParser(
"This tool can be used to determine high-value tests from a "
+ "CSV file produced by the Redash query in `sql_query.txt`."
)
parser.add_argument(
"--input",
type=str,
required=True,
help="Path to the data that contains information on regressions.",
)
parser.add_argument(
"--iterations",
type=int,
default=100,
help="The number of minimzations to do while trying to find a minimal "
"test set that maximizes the number of alerts caught. Defaults to 100.",
)
parser.add_argument(
"--view",
action="store_true",
default=False,
help="View how the regressions are spread across tests, showing the "
"number of regressions each test caught, as well as the unique "
"number of regressions the tests caught.",
)
return parser
def open_csv_data(path):
"""
Opens a CSV data file from a given path.
"""
rows = []
with open(path, "r") as f:
reader = csv.reader(f)
for row in reader:
rows.append(row)
return rows
def get_data_ind(data, fieldname):
"""
Returns an index for the requested field.
"""
for i, entry in enumerate(data[0]):
if fieldname in entry:
return i
return None
def get_suites_and_alerts(data):
"""
Returns the suites and alert IDs found. The two
lists returned in the tuple have a 1:1 relationship between
their entries.
"""
summaryid_ind = get_data_ind(data, "summary_id")
suite_ind = get_data_ind(data, "suite")
suites = []
summary_ids = []
for row in data[1:]:
suites.append(row[suite_ind])
summary_ids.append(row[summaryid_ind])
return (suites, summary_ids)
def get_alert_matrix(data, suites=None, summary_ids=None, randomize=True):
"""
Returns the data in matrix form. Rows are alerts,
and columns are tests. It also returns the suites and
alert IDs that were found and used to build the matrix.
The values' indices in the unique_suites, and the unique_ids
correspond to the column or row in the alert_mat returned.
"""
if not suites and not summary_ids:
suites, summary_ids = get_suites_and_alerts(data)
unique_suites = list(set(suites))
if randomize:
random.shuffle(unique_suites)
unique_suites_dict = {s: c for c, s in enumerate(unique_suites)}
unique_ids = list(set(summary_ids))
if randomize:
random.shuffle(unique_ids)
unique_ids_dict = {s: c for c, s in enumerate(unique_ids)}
# Organize all the data to make it easier to build the
# alert matrix.
summaryid_ind = get_data_ind(data, "summary_id")
suite_ind = get_data_ind(data, "suite")
summary_ids_dict = {}
for row in data[1:]:
if row[summaryid_ind] not in summary_ids_dict:
summary_ids_dict[row[summaryid_ind]] = {}
summary_ids_dict[row[summaryid_ind]]["tests"] = []
test = row[suite_ind]
if test not in summary_ids_dict[row[summaryid_ind]]["tests"]:
summary_ids_dict[row[summaryid_ind]]["tests"].append(test)
# Build matrix to analyze
alert_mat = np.zeros((len(unique_ids), len(unique_suites)))
for alertid, alertinfo in summary_ids_dict.items():
for test in alertinfo["tests"]:
alert_mat[unique_ids_dict[alertid], unique_suites_dict[test]] = 1
return alert_mat, unique_suites, unique_ids
def get_minimal_testset(data, iterations=100):
"""
Returns a minimal set of tests to run to catch all
known regressions.
"""
suites, summary_ids = get_suites_and_alerts(data)
best_suites = None
best_ids = None
minimal_testset = None
maximal_alerts = []
for _ in range(iterations):
alert_mat, suites, summary_ids = get_alert_matrix(
data, suites=suites, summary_ids=summary_ids
)
## Algorithm for minimzation starts here
allchosentests = []
caught_alerts = []
for i in range(alert_mat.shape[0]):
# Pick a test for each row
chosentest = -1
row = np.squeeze(alert_mat[i, :])
alltests = [c for c, j in enumerate(row) if j == 1]
if len(alltests) == 1:
chosentest = alltests[0]
else:
# Check if it's already been caught
if any([t in allchosentests for t in alltests]):
caught_alerts.append(i)
continue
# Not caught; let's pick a test which maximizes the
# number of alerts we catch (excluding those we already
# caught)
max_col = -1
max_alerts = -1
for j in alltests:
col = alert_mat[:, j]
rowinds = [c for c, a in enumerate(col) if a == 1]
rowinds = list(set(rowinds) - set(caught_alerts))
if len(rowinds) > max_alerts:
max_alerts = len(rowinds)
max_col = j
chosentest = max_col
if chosentest not in allchosentests and chosentest > -1:
allchosentests.append(chosentest)
caught_alerts.append(i)
## Check if this round of minimization worked any better
if not minimal_testset:
best_suites = suites.copy()
best_ids = summary_ids.copy()
minimal_testset = allchosentests
maximal_alerts = caught_alerts
elif len(allchosentests) < len(minimal_testset):
best_suites = suites.copy()
best_ids = summary_ids.copy()
minimal_testset = allchosentests
maximal_alerts = caught_alerts
rejected_inds = list(set(list(range(len(best_suites)))) - set(minimal_testset))
info = {
"total_caught": 100 * (float(len(maximal_alerts)) / len(best_ids)),
"total_tests_left": 100 * (float(len(minimal_testset)) / len(best_suites)),
"tests": [best_suites[j] for j in minimal_testset],
"rejected_tests": [best_suites[j] for j in rejected_inds],
}
print(
"Total alerts caught: %s (%s/%s)"
% (info["total_caught"], len(maximal_alerts), len(best_ids))
)
print(
"Percentage of total tests left: %s (%s/%s)\n"
% (info["total_tests_left"], len(minimal_testset), len(best_suites))
)
print("Chosen tests: %s\n" % info["tests"])
print("Rejected tests: %s" % info["rejected_tests"])
return info
def view_histogram(data):
from matplotlib import pyplot as plt
alert_mat, suites, summary_ids = get_alert_matrix(data)
x_coords = np.arange(len(suites))
suites_counts = [np.sum(np.squeeze(alert_mat[:, j])) for j, _ in enumerate(suites)]
summed_am = np.sum(alert_mat, axis=1)
uni_counts = [0 for _ in suites]
for i, val in enumerate(summed_am):
if val == 1:
test = [j for j, v in enumerate(alert_mat[i, :]) if v == 1]
uni_counts[test[0]] += 1
plt.figure()
plt.suptitle(
"Number of regressions/improvements, excluding duplicate entries\n"
"Red is number of times only that test caught the regression/improvement\n"
"Blue is number of times that test caught a regression/improvement"
)
plt.barh(x_coords, suites_counts)
plt.barh(x_coords, uni_counts, color="red")
plt.yticks(x_coords, suites)
plt.title("Regressions")
plt.show()
def main():
args = highvalue_parser().parse_args()
data = open_csv_data(args.input)
get_minimal_testset(data, args.iterations)
if args.view:
view_histogram(data)
if __name__ == "__main__":
main()