250 строки
7.8 KiB
Python
250 строки
7.8 KiB
Python
import argparse
|
|
import csv
|
|
import numpy as np
|
|
import os
|
|
import random
|
|
|
|
|
|
def highvalue_parser():
|
|
"""
|
|
Parser for the high-value test generator.
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
"This tool can be used to determine high-value tests from a "
|
|
+ "CSV file produced by the Redash query in `sql_query.txt`."
|
|
)
|
|
parser.add_argument(
|
|
"--input",
|
|
type=str,
|
|
required=True,
|
|
help="Path to the data that contains information on regressions.",
|
|
)
|
|
parser.add_argument(
|
|
"--iterations",
|
|
type=int,
|
|
default=100,
|
|
help="The number of minimzations to do while trying to find a minimal "
|
|
"test set that maximizes the number of alerts caught. Defaults to 100.",
|
|
)
|
|
parser.add_argument(
|
|
"--view",
|
|
action="store_true",
|
|
default=False,
|
|
help="View how the regressions are spread across tests, showing the "
|
|
"number of regressions each test caught, as well as the unique "
|
|
"number of regressions the tests caught.",
|
|
)
|
|
return parser
|
|
|
|
|
|
def open_csv_data(path):
|
|
"""
|
|
Opens a CSV data file from a given path.
|
|
"""
|
|
rows = []
|
|
with open(path, "r") as f:
|
|
reader = csv.reader(f)
|
|
for row in reader:
|
|
rows.append(row)
|
|
return rows
|
|
|
|
|
|
def get_data_ind(data, fieldname):
|
|
"""
|
|
Returns an index for the requested field.
|
|
"""
|
|
for i, entry in enumerate(data[0]):
|
|
if fieldname in entry:
|
|
return i
|
|
return None
|
|
|
|
|
|
def get_suites_and_alerts(data):
|
|
"""
|
|
Returns the suites and alert IDs found. The two
|
|
lists returned in the tuple have a 1:1 relationship between
|
|
their entries.
|
|
"""
|
|
summaryid_ind = get_data_ind(data, "summary_id")
|
|
suite_ind = get_data_ind(data, "suite")
|
|
|
|
suites = []
|
|
summary_ids = []
|
|
for row in data[1:]:
|
|
suites.append(row[suite_ind])
|
|
summary_ids.append(row[summaryid_ind])
|
|
|
|
return (suites, summary_ids)
|
|
|
|
|
|
def get_alert_matrix(data, suites=None, summary_ids=None, randomize=True):
|
|
"""
|
|
Returns the data in matrix form. Rows are alerts,
|
|
and columns are tests. It also returns the suites and
|
|
alert IDs that were found and used to build the matrix.
|
|
|
|
The values' indices in the unique_suites, and the unique_ids
|
|
correspond to the column or row in the alert_mat returned.
|
|
"""
|
|
if not suites and not summary_ids:
|
|
suites, summary_ids = get_suites_and_alerts(data)
|
|
|
|
unique_suites = list(set(suites))
|
|
if randomize:
|
|
random.shuffle(unique_suites)
|
|
|
|
unique_suites_dict = {s: c for c, s in enumerate(unique_suites)}
|
|
|
|
unique_ids = list(set(summary_ids))
|
|
if randomize:
|
|
random.shuffle(unique_ids)
|
|
|
|
unique_ids_dict = {s: c for c, s in enumerate(unique_ids)}
|
|
|
|
# Organize all the data to make it easier to build the
|
|
# alert matrix.
|
|
summaryid_ind = get_data_ind(data, "summary_id")
|
|
suite_ind = get_data_ind(data, "suite")
|
|
summary_ids_dict = {}
|
|
for row in data[1:]:
|
|
if row[summaryid_ind] not in summary_ids_dict:
|
|
summary_ids_dict[row[summaryid_ind]] = {}
|
|
summary_ids_dict[row[summaryid_ind]]["tests"] = []
|
|
test = row[suite_ind]
|
|
if test not in summary_ids_dict[row[summaryid_ind]]["tests"]:
|
|
summary_ids_dict[row[summaryid_ind]]["tests"].append(test)
|
|
|
|
# Build matrix to analyze
|
|
alert_mat = np.zeros((len(unique_ids), len(unique_suites)))
|
|
for alertid, alertinfo in summary_ids_dict.items():
|
|
for test in alertinfo["tests"]:
|
|
alert_mat[unique_ids_dict[alertid], unique_suites_dict[test]] = 1
|
|
|
|
return alert_mat, unique_suites, unique_ids
|
|
|
|
|
|
def get_minimal_testset(data, iterations=100):
|
|
"""
|
|
Returns a minimal set of tests to run to catch all
|
|
known regressions.
|
|
"""
|
|
suites, summary_ids = get_suites_and_alerts(data)
|
|
|
|
best_suites = None
|
|
best_ids = None
|
|
minimal_testset = None
|
|
maximal_alerts = []
|
|
for _ in range(iterations):
|
|
alert_mat, suites, summary_ids = get_alert_matrix(
|
|
data, suites=suites, summary_ids=summary_ids
|
|
)
|
|
|
|
## Algorithm for minimzation starts here
|
|
allchosentests = []
|
|
caught_alerts = []
|
|
for i in range(alert_mat.shape[0]):
|
|
# Pick a test for each row
|
|
chosentest = -1
|
|
row = np.squeeze(alert_mat[i, :])
|
|
alltests = [c for c, j in enumerate(row) if j == 1]
|
|
if len(alltests) == 1:
|
|
chosentest = alltests[0]
|
|
else:
|
|
# Check if it's already been caught
|
|
if any([t in allchosentests for t in alltests]):
|
|
caught_alerts.append(i)
|
|
continue
|
|
|
|
# Not caught; let's pick a test which maximizes the
|
|
# number of alerts we catch (excluding those we already
|
|
# caught)
|
|
max_col = -1
|
|
max_alerts = -1
|
|
for j in alltests:
|
|
col = alert_mat[:, j]
|
|
rowinds = [c for c, a in enumerate(col) if a == 1]
|
|
rowinds = list(set(rowinds) - set(caught_alerts))
|
|
if len(rowinds) > max_alerts:
|
|
max_alerts = len(rowinds)
|
|
max_col = j
|
|
chosentest = max_col
|
|
|
|
if chosentest not in allchosentests and chosentest > -1:
|
|
allchosentests.append(chosentest)
|
|
caught_alerts.append(i)
|
|
|
|
## Check if this round of minimization worked any better
|
|
if not minimal_testset:
|
|
best_suites = suites.copy()
|
|
best_ids = summary_ids.copy()
|
|
minimal_testset = allchosentests
|
|
maximal_alerts = caught_alerts
|
|
elif len(allchosentests) < len(minimal_testset):
|
|
best_suites = suites.copy()
|
|
best_ids = summary_ids.copy()
|
|
minimal_testset = allchosentests
|
|
maximal_alerts = caught_alerts
|
|
|
|
rejected_inds = list(set(list(range(len(best_suites)))) - set(minimal_testset))
|
|
info = {
|
|
"total_caught": 100 * (float(len(maximal_alerts)) / len(best_ids)),
|
|
"total_tests_left": 100 * (float(len(minimal_testset)) / len(best_suites)),
|
|
"tests": [best_suites[j] for j in minimal_testset],
|
|
"rejected_tests": [best_suites[j] for j in rejected_inds],
|
|
}
|
|
|
|
print(
|
|
"Total alerts caught: %s (%s/%s)"
|
|
% (info["total_caught"], len(maximal_alerts), len(best_ids))
|
|
)
|
|
print(
|
|
"Percentage of total tests left: %s (%s/%s)\n"
|
|
% (info["total_tests_left"], len(minimal_testset), len(best_suites))
|
|
)
|
|
print("Chosen tests: %s\n" % info["tests"])
|
|
print("Rejected tests: %s" % info["rejected_tests"])
|
|
|
|
return info
|
|
|
|
|
|
def view_histogram(data):
|
|
from matplotlib import pyplot as plt
|
|
|
|
alert_mat, suites, summary_ids = get_alert_matrix(data)
|
|
|
|
x_coords = np.arange(len(suites))
|
|
suites_counts = [np.sum(np.squeeze(alert_mat[:, j])) for j, _ in enumerate(suites)]
|
|
|
|
summed_am = np.sum(alert_mat, axis=1)
|
|
uni_counts = [0 for _ in suites]
|
|
for i, val in enumerate(summed_am):
|
|
if val == 1:
|
|
test = [j for j, v in enumerate(alert_mat[i, :]) if v == 1]
|
|
uni_counts[test[0]] += 1
|
|
|
|
plt.figure()
|
|
plt.suptitle(
|
|
"Number of regressions/improvements, excluding duplicate entries\n"
|
|
"Red is number of times only that test caught the regression/improvement\n"
|
|
"Blue is number of times that test caught a regression/improvement"
|
|
)
|
|
plt.barh(x_coords, suites_counts)
|
|
plt.barh(x_coords, uni_counts, color="red")
|
|
plt.yticks(x_coords, suites)
|
|
plt.title("Regressions")
|
|
plt.show()
|
|
|
|
|
|
def main():
|
|
args = highvalue_parser().parse_args()
|
|
data = open_csv_data(args.input)
|
|
|
|
get_minimal_testset(data, args.iterations)
|
|
|
|
if args.view:
|
|
view_histogram(data)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|