Make push_data_label and push_data_group normal DBs instead of a JSON containing a very large list

This should reduce memory usage while generating push data results, as we don't have to hold
all of them in memory and then dump a large JSON result.

Reading the resulting file should also be faster.
This commit is contained in:
Marco Castelluccio 2020-05-06 12:37:22 +02:00
Родитель 5773baba14
Коммит 8347cd4405
4 изменённых файлов: 85 добавлений и 79 удалений

Просмотреть файл

@ -12,8 +12,6 @@ import struct
from bugbug import db, repository
from bugbug.utils import ExpQueue, LMDBDict
PUSH_DATA_URL = "https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_test_scheduling_history_push_data.latest/artifacts/public/push_data_{granularity}.json.zst"
TEST_LABEL_SCHEDULING_DB = "data/test_label_scheduling_history.pickle"
PAST_FAILURES_LABEL_DB = "past_failures_label.lmdb.tar.zst"
FAILING_TOGETHER_LABEL_DB = "failing_together_label.lmdb.tar.zst"
@ -23,6 +21,12 @@ db.register(
13,
[PAST_FAILURES_LABEL_DB, FAILING_TOGETHER_LABEL_DB],
)
PUSH_DATA_LABEL_DB = "data/push_data_label.json"
db.register(
PUSH_DATA_LABEL_DB,
"https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_test_scheduling_history_push_data.latest/artifacts/public/push_data_label.json.zst",
1,
)
TEST_GROUP_SCHEDULING_DB = "data/test_group_scheduling_history.pickle"
PAST_FAILURES_GROUP_DB = "past_failures_group.lmdb.tar.zst"
@ -33,6 +37,12 @@ db.register(
18,
[PAST_FAILURES_GROUP_DB, TOUCHED_TOGETHER_DB],
)
PUSH_DATA_GROUP_DB = "data/push_data_group.json"
db.register(
PUSH_DATA_GROUP_DB,
"https://community-tc.services.mozilla.com/api/index/v1/task/project.relman.bugbug.data_test_scheduling_history_push_data.latest/artifacts/public/push_data_group.json.zst",
1,
)
HISTORICAL_TIMESPAN = 4500

Просмотреть файл

@ -211,10 +211,16 @@ tasks:
artifacts:
public/push_data_label.json.zst:
path: /push_data_label.json.zst
path: /data/push_data_label.json.zst
type: file
public/push_data_label.json.version:
path: /data/push_data_label.json.version
type: file
public/push_data_group.json.zst:
path: /push_data_group.json.zst
path: /data/push_data_group.json.zst
type: file
public/push_data_group.json.version:
path: /data/push_data_group.json.version
type: file
features:

Просмотреть файл

@ -3,21 +3,13 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import json
from bugbug import test_scheduling
from bugbug.utils import download_check_etag, zstd_decompress
from bugbug import db, test_scheduling
def count(is_first_task, is_second_task):
updated = download_check_etag(
test_scheduling.PUSH_DATA_URL.format(granularity="label")
)
if updated:
zstd_decompress("push_data_label.json")
assert db.download(test_scheduling.PUSH_DATA_LABEL_DB)
with open("push_data_label.json", "r") as f:
push_data = json.load(f)
push_data = list(db.read(test_scheduling.PUSH_DATA_LABEL_DB))
print(f"Analyzing {len(push_data)} pushes...")

Просмотреть файл

@ -7,13 +7,13 @@ import argparse
import collections
import concurrent.futures
import itertools
import json
import math
import os
import struct
import traceback
from datetime import datetime
from logging import INFO, basicConfig, getLogger
from typing import Dict, Generator, List, NewType, Tuple
import adr
import dateutil.parser
@ -22,12 +22,11 @@ from dateutil.relativedelta import relativedelta
from tqdm import tqdm
from bugbug import commit_features, db, repository, test_scheduling
from bugbug.utils import (
create_tar_zst,
download_check_etag,
zstd_compress,
zstd_decompress,
)
from bugbug.utils import create_tar_zst, zstd_compress
Revision = NewType("Revision", str)
TaskName = NewType("TaskName", str)
PushResult = Tuple[List[Revision], List[TaskName], List[TaskName], List[TaskName]]
basicConfig(level=INFO)
logger = getLogger(__name__)
@ -80,7 +79,7 @@ def rename_tasks(tasks):
class Retriever(object):
def generate_push_data(self, runnable):
def generate_push_data(self, granularity: str) -> None:
# We keep in the cache the fact that we failed to analyze a push for 10
# days, so if we re-run often we don't retry the same pushes many times.
MISSING_CACHE_RETENTION = 10 * 24 * 60
@ -88,8 +87,8 @@ class Retriever(object):
# We'll use the past TRAINING_MONTHS months only for training the model,
# but we use half TRAINING_MONTHS months more than that to calculate the
# failure statistics.
from_months = TRAINING_MONTHS[runnable] + math.floor(
TRAINING_MONTHS[runnable] / 2
from_months = TRAINING_MONTHS[granularity] + math.floor(
TRAINING_MONTHS[granularity] / 2
)
# We use the actual date instead of 'today-X' aliases to avoid adr caching
@ -103,13 +102,15 @@ class Retriever(object):
branch="autoland",
)
num_cached = 0
if granularity == "label":
push_data_db = test_scheduling.PUSH_DATA_LABEL_DB
elif granularity == "group":
push_data_db = test_scheduling.PUSH_DATA_GROUP_DB
push_data = []
cache = {}
cache: Dict[mozci.push.Push, Tuple[PushResult, int]] = {}
def cache_key(push):
return f"push_data.{runnable}.{push.rev}"
def cache_key(push: mozci.push.Push) -> str:
return f"push_data.{granularity}.{push.rev}"
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_push = {
@ -147,64 +148,58 @@ class Retriever(object):
cache[push] = None
to_regenerate += 1"""
for push in tqdm(pushes):
key = cache_key(push)
def generate() -> Generator[PushResult, None, None]:
num_cached = 0
if cache[push] is not None:
num_cached += 1
cached = cache[push]
if cached:
value, mozci_version = cached
push_data.append(value)
else:
logger.info(f"Analyzing {push.rev} at the {runnable} level...")
for push in tqdm(pushes):
key = cache_key(push)
try:
if runnable == "label":
runnables = push.task_labels
elif runnable == "group":
runnables = push.group_summaries.keys()
if cache[push] is not None:
num_cached += 1
cached = cache[push]
if cached:
value, mozci_version = cached
yield value
else:
logger.info(f"Analyzing {push.rev} at the {granularity} level...")
value = [
push.revs,
list(runnables),
list(push.get_possible_regressions(runnable)),
list(push.get_likely_regressions(runnable)),
]
push_data.append(value)
adr.config.cache.put(
key, (value, MOZCI_VERSION), adr.config["cache"]["retention"]
)
except adr.errors.MissingDataError:
logger.warning(
f"Tasks for push {push.rev} can't be found on ActiveData"
)
adr.config.cache.put(key, (), MISSING_CACHE_RETENTION)
except Exception:
traceback.print_exc()
adr.config.cache.put(key, (), MISSING_CACHE_RETENTION)
try:
if granularity == "label":
runnables = push.task_labels
elif granularity == "group":
runnables = push.group_summaries.keys()
logger.info(f"{num_cached} pushes were already cached out of {len(pushes)}")
value = (
push.revs,
list(runnables),
list(push.get_possible_regressions(granularity)),
list(push.get_likely_regressions(granularity)),
)
adr.config.cache.put(
key,
(value, MOZCI_VERSION),
adr.config["cache"]["retention"],
)
yield value
except adr.errors.MissingDataError:
logger.warning(
f"Tasks for push {push.rev} can't be found on ActiveData"
)
adr.config.cache.put(key, (), MISSING_CACHE_RETENTION)
except Exception:
traceback.print_exc()
adr.config.cache.put(key, (), MISSING_CACHE_RETENTION)
with open(f"push_data_{runnable}.json", "w") as f:
json.dump(push_data, f)
logger.info(f"{num_cached} pushes were already cached out of {len(pushes)}")
zstd_compress(f"push_data_{runnable}.json")
db.write(push_data_db, generate())
zstd_compress(push_data_db)
def retrieve_push_data(self):
def retrieve_push_data(self) -> None:
self.generate_push_data("label")
self.generate_push_data("group")
def generate_test_scheduling_history(self, granularity):
push_data_path = f"push_data_{granularity}.json"
updated = download_check_etag(
test_scheduling.PUSH_DATA_URL.format(granularity=granularity)
)
if updated:
zstd_decompress(push_data_path)
os.remove(f"{push_data_path}.zst")
assert os.path.exists(push_data_path), "Decompressed push data file exists"
# Get the commits DB.
assert db.download(repository.COMMITS_DB)
@ -213,6 +208,7 @@ class Retriever(object):
)
if granularity == "label":
push_data_db = test_scheduling.PUSH_DATA_LABEL_DB
test_scheduling_db = test_scheduling.TEST_LABEL_SCHEDULING_DB
past_failures_db = os.path.join(
"data", test_scheduling.PAST_FAILURES_LABEL_DB
@ -221,6 +217,7 @@ class Retriever(object):
"data", test_scheduling.FAILING_TOGETHER_LABEL_DB
)
elif granularity == "group":
push_data_db = test_scheduling.PUSH_DATA_GROUP_DB
test_scheduling_db = test_scheduling.TEST_GROUP_SCHEDULING_DB
past_failures_db = os.path.join(
"data", test_scheduling.PAST_FAILURES_GROUP_DB
@ -229,6 +226,8 @@ class Retriever(object):
"data", test_scheduling.TOUCHED_TOGETHER_DB
)
assert db.download(push_data_db)
db.download(test_scheduling_db, support_files_too=True)
last_node = None
@ -345,8 +344,7 @@ class Retriever(object):
commit_map[commit_data["node"]] = commit_data
with open(push_data_path, "r") as f:
push_data = json.load(f)
push_data = list(db.read(push_data_db))
logger.info(f"push data nodes: {len(push_data)}")