Use dict, list, tuple instead of typing (#2640)

Except in one file where mypy complains

Fixes #2634
This commit is contained in:
Kunam Balaram Reddy 2021-12-01 21:57:14 +05:30 коммит произвёл GitHub
Родитель 8cc5a8db88
Коммит cbaf0de92e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
29 изменённых файлов: 165 добавлений и 195 удалений

Просмотреть файл

@ -7,7 +7,7 @@ import collections
import csv
import re
from datetime import datetime
from typing import Dict, Iterable, Iterator, List, NewType, Optional
from typing import Iterable, Iterator, NewType, Optional
import tenacity
from dateutil.relativedelta import relativedelta
@ -181,7 +181,7 @@ def get_ids_between(date_from, date_to=None, security=False, resolution=None):
return get_ids(params)
def download_bugs(bug_ids: Iterable[int], security: bool = False) -> List[BugDict]:
def download_bugs(bug_ids: Iterable[int], security: bool = False) -> list[BugDict]:
old_bug_count = 0
new_bug_ids_set = set(int(bug_id) for bug_id in bug_ids)
for bug in get_bugs(include_invalid=True):
@ -201,7 +201,7 @@ def download_bugs(bug_ids: Iterable[int], security: bool = False) -> List[BugDic
stop=tenacity.stop_after_attempt(7),
wait=tenacity.wait_exponential(multiplier=1, min=16, max=64),
)
def get_chunk(chunk: List[int]) -> List[BugDict]:
def get_chunk(chunk: list[int]) -> list[BugDict]:
new_bugs = get(chunk)
if not security:
@ -225,8 +225,8 @@ def download_bugs(bug_ids: Iterable[int], security: bool = False) -> List[BugDic
def _find_linked(
bug_map: Dict[int, BugDict], bug: BugDict, link_type: str
) -> List[int]:
bug_map: dict[int, BugDict], bug: BugDict, link_type: str
) -> list[int]:
return sum(
(
_find_linked(bug_map, bug_map[b], link_type)
@ -237,11 +237,11 @@ def _find_linked(
)
def find_blocked_by(bug_map: Dict[int, BugDict], bug: BugDict) -> List[int]:
def find_blocked_by(bug_map: dict[int, BugDict], bug: BugDict) -> list[int]:
return _find_linked(bug_map, bug, "blocks")
def find_blocking(bug_map: Dict[int, BugDict], bug: BugDict) -> List[int]:
def find_blocking(bug_map: dict[int, BugDict], bug: BugDict) -> list[int]:
return _find_linked(bug_map, bug, "depends_on")
@ -286,7 +286,7 @@ def count_bugs(bug_query_params):
return count
def get_product_component_count(months: int = 12) -> Dict[str, int]:
def get_product_component_count(months: int = 12) -> dict[str, int]:
"""Returns a dictionary where keys are full components (in the form of
`{product}::{component}`) and the value of the number of bugs for the
given full components. Full component with 0 bugs are returned.
@ -333,7 +333,7 @@ def get_product_component_count(months: int = 12) -> Dict[str, int]:
return bugs_number
def get_component_team_mapping() -> Dict[str, Dict[str, str]]:
def get_component_team_mapping() -> dict[str, dict[str, str]]:
r = utils.get_session("bugzilla").get(
"https://bugzilla.mozilla.org/rest/product",
params={
@ -344,7 +344,7 @@ def get_component_team_mapping() -> Dict[str, Dict[str, str]]:
)
r.raise_for_status()
mapping: Dict[str, Dict[str, str]] = collections.defaultdict(dict)
mapping: dict[str, dict[str, str]] = collections.defaultdict(dict)
for product in r.json()["products"]:
for component in product["components"]:
mapping[product["name"]][component["name"]] = component["team_name"]
@ -352,7 +352,7 @@ def get_component_team_mapping() -> Dict[str, Dict[str, str]]:
return mapping
def get_groups_users(group_names: List[str]) -> List[str]:
def get_groups_users(group_names: list[str]) -> list[str]:
r = utils.get_session("bugzilla").get(
"https://bugzilla.mozilla.org/rest/group",
params={
@ -370,7 +370,7 @@ def get_groups_users(group_names: List[str]) -> List[str]:
]
def get_revision_ids(bug: BugDict) -> List[int]:
def get_revision_ids(bug: BugDict) -> list[int]:
revision_ids = []
for attachment in bug["attachments"]:

Просмотреть файл

@ -4,7 +4,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
import logging
from typing import Callable, Iterator, List, NewType, Tuple
from typing import Callable, Iterator, NewType
from ratelimit import limits, sleep_and_retry
@ -67,7 +67,7 @@ class Github:
def fetch_issues(
self, url: str, retrieve_events: bool, params: dict = None
) -> Tuple[List[IssueDict], dict]:
) -> tuple[list[IssueDict], dict]:
self.api_limit()
headers = {"Authorization": "token {}".format(self.get_token())}
response = get_session("github").get(url, params=params, headers=headers)
@ -93,7 +93,7 @@ class Github:
count = sum(1 for _ in issues)
return int(count / PER_PAGE) + 1
def fetch_issues_updated_since_timestamp(self, since: str) -> List[IssueDict]:
def fetch_issues_updated_since_timestamp(self, since: str) -> list[IssueDict]:
# Fetches changed and new issues since a specified timestamp
url = "https://api.github.com/repos/{}/{}/issues".format(self.owner, self.repo)

Просмотреть файл

@ -5,7 +5,7 @@
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Tuple
from typing import Any
import matplotlib
import numpy as np
@ -148,9 +148,9 @@ class Model:
self.entire_dataset_training = False
# DBs required for training.
self.training_dbs: List[str] = []
self.training_dbs: list[str] = []
# DBs and DB support files required at runtime.
self.eval_dbs: Dict[str, Tuple[str, ...]] = {}
self.eval_dbs: dict[str, tuple[str, ...]] = {}
def download_eval_dbs(
self, extract: bool = True, ensure_exist: bool = True
@ -334,7 +334,7 @@ class Model:
"""Subclasses can implement their own additional evaluation."""
pass
def get_labels(self) -> Tuple[Dict[Any, Any], List[Any]]:
def get_labels(self) -> tuple[dict[Any, Any], list[Any]]:
"""Subclasses implement their own function to gather labels."""
pass

Просмотреть файл

@ -4,7 +4,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
import logging
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Iterable, Optional
import numpy as np
import xgboost
@ -56,8 +56,8 @@ TYPE_LIST = sorted(set(KEYWORD_DICT.values()))
def bug_to_types(
bug: bugzilla.BugDict, bug_map: Optional[Dict[int, bugzilla.BugDict]] = None
) -> List[str]:
bug: bugzilla.BugDict, bug_map: Optional[dict[int, bugzilla.BugDict]] = None
) -> list[str]:
types = set()
if "[overhead" in bug["whiteboard"].lower():
@ -160,7 +160,7 @@ class BugTypeModel(BugModel):
xgboost.XGBClassifier(n_jobs=utils.get_physical_cpu_count())
)
def get_labels(self) -> Tuple[Dict[int, np.ndarray], List[str]]:
def get_labels(self) -> tuple[dict[int, np.ndarray], list[str]]:
classes = {}
bug_map = {bug["id"]: bug for bug in bugzilla.get_bugs()}
@ -185,7 +185,7 @@ class BugTypeModel(BugModel):
def overwrite_classes(
self,
bugs: Iterable[bugzilla.BugDict],
classes: Dict[int, np.ndarray],
classes: dict[int, np.ndarray],
probabilities: bool,
):
for i, bug in enumerate(bugs):

Просмотреть файл

@ -4,7 +4,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
import itertools
from typing import Any, Dict, List, Tuple
from typing import Any
import xgboost
from imblearn.over_sampling import BorderlineSMOTE
@ -85,10 +85,10 @@ class DefectModel(BugModel):
self.clf = xgboost.XGBClassifier(n_jobs=utils.get_physical_cpu_count())
self.clf.set_params(predictor="cpu_predictor")
def get_bugbug_labels(self, kind="bug") -> Dict[int, Any]:
def get_bugbug_labels(self, kind="bug") -> dict[int, Any]:
assert kind in ["bug", "regression", "defect_enhancement_task"]
classes: Dict[int, Any] = {}
classes: dict[int, Any] = {}
for bug_id, category in labels.get_labels("bug_nobug"):
assert category in ["True", "False"], f"unexpected category {category}"
@ -245,7 +245,7 @@ class DefectModel(BugModel):
# Remove labels which belong to bugs for which we have no data.
return {bug_id: label for bug_id, label in classes.items() if bug_id in bug_ids}
def get_labels(self) -> Tuple[Dict[int, Any], List[Any]]:
def get_labels(self) -> tuple[dict[int, Any], list[Any]]:
classes = self.get_bugbug_labels("bug")
print("{} bugs".format(sum(1 for label in classes.values() if label == 1)))

Просмотреть файл

@ -3,7 +3,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Any, Dict, List, Tuple
from typing import Any
from bugbug.models.defect import DefectModel
@ -14,7 +14,7 @@ class DefectEnhancementTaskModel(DefectModel):
self.calculate_importance = False
def get_labels(self) -> Tuple[Dict[int, Any], List[Any]]:
def get_labels(self) -> tuple[dict[int, Any], list[Any]]:
classes = self.get_bugbug_labels("defect_enhancement_task")
print(

Просмотреть файл

@ -3,7 +3,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Any, Dict, List, Tuple
from typing import Any
from bugbug.models.defect import DefectModel
@ -13,7 +13,7 @@ class RegressionModel(DefectModel):
DefectModel.__init__(self, lemmatization, historical)
self.calculate_importance = False
def get_labels(self) -> Tuple[Dict[int, Any], List[int]]:
def get_labels(self) -> tuple[dict[int, Any], list[int]]:
classes = self.get_bugbug_labels("regression")
print(

Просмотреть файл

@ -10,18 +10,7 @@ import math
import pickle
import statistics
from functools import reduce
from typing import (
Any,
Callable,
Collection,
Dict,
Iterable,
List,
Optional,
Sequence,
Set,
Tuple,
)
from typing import Any, Callable, Collection, Iterable, Optional, Sequence, Set
import numpy as np
import xgboost
@ -46,7 +35,7 @@ logger = logging.getLogger(__name__)
def get_commit_map(
revs: Optional[Set[test_scheduling.Revision]] = None,
) -> Dict[test_scheduling.Revision, repository.CommitDict]:
) -> dict[test_scheduling.Revision, repository.CommitDict]:
commit_map = {}
for commit in repository.get_commits():
@ -131,7 +120,7 @@ class TestSelectModel(Model):
def get_pushes(
self, apply_filters: bool = False
) -> Tuple[List[Dict[str, Any]], int]:
) -> tuple[list[dict[str, Any]], int]:
pushes = []
for revs, test_datas in test_scheduling.get_test_scheduling_history(
self.granularity
@ -233,7 +222,7 @@ class TestSelectModel(Model):
commits: Sequence[repository.CommitDict],
confidence: float = 0.5,
push_num: Optional[int] = None,
) -> Dict[str, float]:
) -> dict[str, float]:
commit_data = commit_features.merge_commits(commits)
past_failures_data = test_scheduling.get_past_failures(self.granularity, True)
@ -299,14 +288,14 @@ class TestSelectModel(Model):
self,
tasks: Iterable[str],
min_redundancy_confidence: float,
load_failing_together: Callable[[str], Dict[str, Tuple[float, float]]],
load_failing_together: Callable[[str], dict[str, tuple[float, float]]],
assume_redundant: bool,
) -> List[Set[str]]:
) -> list[Set[str]]:
# Generate 'equivalence sets', containing all tasks that are redundant with
# each other.
groups: List[Set[str]] = []
task_to_groups: Dict[str, Set[int]] = collections.defaultdict(set)
incompatible_groups: Dict[str, Set[int]] = collections.defaultdict(set)
groups: list[Set[str]] = []
task_to_groups: dict[str, Set[int]] = collections.defaultdict(set)
incompatible_groups: dict[str, Set[int]] = collections.defaultdict(set)
def create_group(task: str) -> None:
if task in task_to_groups:
@ -406,7 +395,7 @@ class TestSelectModel(Model):
def load_failing_together(
config: str,
) -> Dict[str, Tuple[float, float]]:
) -> dict[str, tuple[float, float]]:
return failing_together_stats[config]
configs = (
@ -451,7 +440,7 @@ class TestSelectModel(Model):
self.granularity, True
)
def load_failing_together(task: str) -> Dict[str, Tuple[float, float]]:
def load_failing_together(task: str) -> dict[str, tuple[float, float]]:
key = test_scheduling.failing_together_key(task)
return pickle.loads(failing_together[key])
@ -503,7 +492,7 @@ class TestSelectModel(Model):
groups: Collection[str],
min_redundancy_confidence: float,
max_configurations: int = 3,
) -> Dict[str, List[str]]:
) -> dict[str, list[str]]:
failing_together = test_scheduling.get_failing_together_db("config_group", True)
all_configs = pickle.loads(failing_together[b"$ALL_CONFIGS$"])
@ -592,7 +581,7 @@ class TestSelectModel(Model):
)
)
configs_by_group: Dict[str, List[str]] = {}
configs_by_group: dict[str, list[str]] = {}
for group in groups:
configs_by_group[group] = []
@ -707,7 +696,7 @@ class TestSelectModel(Model):
cap: Optional[int],
minimum: Optional[int],
) -> None:
futures: Dict[concurrent.futures.Future, Dict[str, Any]] = {}
futures: dict[concurrent.futures.Future, dict[str, Any]] = {}
for push in test_pushes.values():
futures[
executor.submit(

Просмотреть файл

@ -19,18 +19,7 @@ import sys
import threading
from datetime import datetime
from functools import lru_cache
from typing import (
Collection,
Dict,
Iterable,
Iterator,
List,
NewType,
Optional,
Set,
Tuple,
Union,
)
from typing import Collection, Iterable, Iterator, NewType, Optional, Set, Union
import hglib
import lmdb
@ -172,10 +161,10 @@ class Commit:
desc: str,
pushdate: datetime,
bug_id: Optional[int],
backsout: List[str],
backsout: list[str],
backedoutby: str,
author_email: str,
reviewers: List[str],
reviewers: list[str],
ignored: bool = False,
) -> None:
self.node = node
@ -195,7 +184,7 @@ class Commit:
self.other_deleted = 0
self.test_deleted = 0
self.types: Set[str] = set()
self.functions: Dict[str, List[dict]] = {}
self.functions: dict[str, list[dict]] = {}
self.seniority_author = 0.0
self.total_source_code_file_size = 0
self.average_source_code_file_size = 0.0
@ -395,7 +384,7 @@ def get_functions_from_metrics(metrics_space):
def get_touched_functions(
metrics_space: dict, deleted_lines: Iterable[int], added_lines: Iterable[int]
) -> List[dict]:
) -> list[dict]:
touched_functions_indexes = set()
functions = get_functions_from_metrics(metrics_space)
@ -547,8 +536,8 @@ def get_space_metrics(
def set_commit_metrics(
commit: Commit,
path: str,
deleted_lines: List[int],
added_lines: List[int],
deleted_lines: list[int],
added_lines: list[int],
before_metrics: dict,
after_metrics: dict,
) -> None:
@ -757,7 +746,7 @@ def _transform(commit):
return transform(HG, REPO_DIR, commit)
def hg_log(hg: hglib.client, revs: List[bytes]) -> Tuple[Commit, ...]:
def hg_log(hg: hglib.client, revs: list[bytes]) -> tuple[Commit, ...]:
if len(revs) == 0:
return tuple()
@ -843,7 +832,7 @@ def hg_log(hg: hglib.client, revs: List[bytes]) -> Tuple[Commit, ...]:
return tuple(commits)
def _hg_log(revs: List[bytes]) -> Tuple[Commit, ...]:
def _hg_log(revs: list[bytes]) -> tuple[Commit, ...]:
return hg_log(thread_local.hg, revs)
@ -930,7 +919,7 @@ def calculate_experiences(
return f"{exp_type}${commit_type}${item}"
def get_experience(
exp_type: str, commit_type: str, item: str, day: int, default: Union[int, Tuple]
exp_type: str, commit_type: str, item: str, day: int, default: Union[int, tuple]
) -> utils.ExpQueue:
key = get_key(exp_type, commit_type, item)
try:
@ -1187,7 +1176,7 @@ def close_component_mapping():
path_to_component = None
def hg_log_multi(repo_dir: str, revs: List[bytes]) -> Tuple[Commit, ...]:
def hg_log_multi(repo_dir: str, revs: list[bytes]) -> tuple[Commit, ...]:
if len(revs) == 0:
return tuple()
@ -1220,13 +1209,13 @@ def get_first_pushdate(repo_dir):
def download_commits(
repo_dir: str,
rev_start: str = None,
revs: List[bytes] = None,
revs: list[bytes] = None,
save: bool = True,
use_single_process: bool = False,
include_no_bug: bool = False,
include_backouts: bool = False,
include_ignored: bool = False,
) -> Tuple[CommitDict, ...]:
) -> tuple[CommitDict, ...]:
assert revs is not None or rev_start is not None
with hglib.open(repo_dir) as hg:

Просмотреть файл

@ -17,11 +17,9 @@ from typing import (
Any,
Callable,
Deque,
Dict,
Generator,
Iterable,
Iterator,
List,
NewType,
Optional,
Set,
@ -41,10 +39,10 @@ logger = logging.getLogger(__name__)
Revision = NewType("Revision", str)
Task = NewType("Task", str)
Group = NewType("Group", str)
ConfigGroup = NewType("ConfigGroup", Tuple[str, Group])
ConfigGroup = NewType("ConfigGroup", tuple[str, Group])
Runnable = Union[Task, Group, ConfigGroup]
PushResult = Tuple[
Tuple[Revision],
PushResult = tuple[
tuple[Revision],
Revision,
Tuple[Runnable, ...],
Tuple[Runnable, ...],
@ -121,10 +119,10 @@ JOBS_TO_IGNORE = (
def filter_runnables(
runnables: Tuple[Runnable, ...], all_runnables: Set[Runnable], granularity: str
) -> Tuple[Any, ...]:
runnables: tuple[Runnable, ...], all_runnables: Set[Runnable], granularity: str
) -> tuple[Any, ...]:
if granularity == "label":
tasks = cast(List[Task], runnables)
tasks = cast(list[Task], runnables)
return tuple(
task
for task in tasks
@ -167,16 +165,16 @@ def rename_task(task: str) -> str:
# Handle "meaningless" labeling changes ("meaningless" as they shouldn't really affect test scheduling).
def rename_runnables(
granularity: str, runnables: Tuple[Runnable, ...]
) -> Tuple[Runnable, ...]:
granularity: str, runnables: tuple[Runnable, ...]
) -> tuple[Runnable, ...]:
if granularity == "label":
tasks = cast(List[Task], runnables)
tasks = cast(list[Task], runnables)
return tuple(Task(rename_task(task)) for task in tasks)
elif granularity == "group":
groups = cast(List[Group], runnables)
groups = cast(list[Group], runnables)
return tuple(Group(group.split(":")[0]) for group in groups)
elif granularity == "config_group":
config_groups = cast(List[ConfigGroup], runnables)
config_groups = cast(list[ConfigGroup], runnables)
return tuple(
ConfigGroup(
(
@ -192,7 +190,7 @@ def rename_runnables(
def get_push_data(
granularity: str,
) -> Tuple[Callable[[], Iterator[PushResult]], int, Tuple[Runnable, ...]]:
) -> tuple[Callable[[], Iterator[PushResult]], int, tuple[Runnable, ...]]:
if granularity == "label":
push_data_db = PUSH_DATA_LABEL_DB
elif granularity == "group":
@ -393,7 +391,7 @@ def generate_failing_together_probabilities(
count_single_failures[(task1, task2)] += 1
all_available_configs: Set[str] = set()
available_configs_by_group: Dict[Group, Set[str]] = collections.defaultdict(set)
available_configs_by_group: dict[Group, Set[str]] = collections.defaultdict(set)
for (
revisions,
@ -816,7 +814,7 @@ def generate_data(
yield obj
def get_failure_bugs(since: datetime, until: datetime) -> List[Dict[str, int]]:
def get_failure_bugs(since: datetime, until: datetime) -> list[dict[str, int]]:
r = requests.get(
"https://treeherder.mozilla.org/api/failures/?startday={}&endday={}&tree=trunk".format(
since.strftime("%Y-%m-%d"), until.strftime("%Y-%m-%d")
@ -827,7 +825,7 @@ def get_failure_bugs(since: datetime, until: datetime) -> List[Dict[str, int]]:
return r.json()
def get_test_info(date: datetime) -> Dict[str, Any]:
def get_test_info(date: datetime) -> dict[str, Any]:
r = requests.get(
"https://firefox-ci-tc.services.mozilla.com/api/index/v1/task/gecko.v2.mozilla-central.pushdate.{}.latest.source.test-info-all/artifacts/public/test-info-all-tests.json".format(
date.strftime("%Y.%m.%d")

Просмотреть файл

@ -17,7 +17,7 @@ from collections import deque
from contextlib import contextmanager
from datetime import datetime
from functools import lru_cache
from typing import Any, Iterator, List, Optional
from typing import Any, Iterator, Optional
import boto3
import dateutil.parser
@ -437,7 +437,7 @@ def get_session(name: str) -> requests.Session:
return session
def get_hgmo_stack(branch: str, revision: str) -> List[bytes]:
def get_hgmo_stack(branch: str, revision: str) -> list[bytes]:
"""Load descriptions of patches in the stack for a given revision"""
url = f"https://hg.mozilla.org/{branch}/json-automationrelevance/{revision}"
r = get_session("hgmo").get(url)

Просмотреть файл

@ -9,7 +9,7 @@ import os
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Callable, Optional, Sequence, Tuple
from typing import Any, Callable, Optional, Sequence
import orjson
import zstandard
@ -751,7 +751,7 @@ def push_schedules(branch, rev):
@application.route("/config_specific_groups/<path:config>")
@cross_origin()
def config_specific_groups(config: str) -> Tuple[Response, int]:
def config_specific_groups(config: str) -> tuple[Response, int]:
"""
---
get:

Просмотреть файл

@ -7,7 +7,7 @@ import logging
import os
from datetime import timedelta
from functools import lru_cache
from typing import Sequence, Tuple
from typing import Sequence
import orjson
import requests
@ -170,7 +170,7 @@ def classify_issue(
@lru_cache(maxsize=None)
def get_known_tasks() -> Tuple[str, ...]:
def get_known_tasks() -> tuple[str, ...]:
with open("known_tasks", "r") as f:
return tuple(line.strip() for line in f)

Просмотреть файл

@ -9,7 +9,7 @@ import logging
import threading
import time
from datetime import timedelta
from typing import Callable, Dict, Generic, TypeVar
from typing import Callable, Generic, TypeVar
LOGGER = logging.getLogger()
@ -26,8 +26,8 @@ class ReadthroughTTLCache(Generic[Key, Value]):
def __init__(self, ttl: timedelta, load_item_function: Callable[[Key], Value]):
self.ttl = ttl
self.load_item_function = load_item_function
self.items_last_accessed: Dict[Key, datetime.datetime] = {}
self.items_storage: Dict[Key, Value] = {}
self.items_last_accessed: dict[Key, datetime.datetime] = {}
self.items_storage: dict[Key, Value] = {}
def __contains__(self, key):
return key in self.items_storage

Просмотреть файл

@ -10,7 +10,7 @@ import pickle
import re
from collections import defaultdict
from datetime import datetime
from typing import Callable, Dict, Tuple
from typing import Callable
import hglib
import numpy as np
@ -159,7 +159,7 @@ def add_change_time():
@pytest.fixture
def mock_hgmo(mock_repo: Tuple[str, str]) -> None:
def mock_hgmo(mock_repo: tuple[str, str]) -> None:
"""Mock HGMO API to get patches to apply"""
def fake_json_relevance(request):
@ -202,7 +202,7 @@ def mock_hgmo(mock_repo: Tuple[str, str]) -> None:
@pytest.fixture
def mock_repo(
tmpdir: py.path.local, monkeypatch: MonkeyPatch
) -> Tuple[py.path.local, py.path.local]:
) -> tuple[py.path.local, py.path.local]:
"""Create an empty mercurial repo"""
local_dir = tmpdir / "local"
remote_dir = tmpdir / "remote"
@ -366,7 +366,7 @@ def mock_get_config_specific_groups(
@pytest.fixture
def mock_schedule_tests_classify(
monkeypatch: MonkeyPatch,
) -> Callable[[Dict[str, float], Dict[str, float]], None]:
) -> Callable[[dict[str, float], dict[str, float]], None]:
with open("known_tasks", "w") as f:
f.write("prova")

Просмотреть файл

@ -3,7 +3,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Callable, Dict
from typing import Callable
import orjson
import zstandard
@ -13,7 +13,7 @@ from bugbug_http import models
def test_get_config_specific_groups(
mock_get_config_specific_groups: Callable[
[Dict[str, float], Dict[str, float]], None
[dict[str, float], dict[str, float]], None
],
) -> None:
assert models.get_config_specific_groups("test-linux1804-64/opt-*") == "OK"

Просмотреть файл

@ -3,7 +3,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Callable, Dict, List, Tuple
from typing import Callable
import hglib
import orjson
@ -54,15 +54,15 @@ from bugbug_http import models
],
)
def test_simple_schedule(
labels_to_choose: Dict[str, float],
groups_to_choose: Dict[str, float],
reduced_labels: Dict[str, float],
config_groups: Dict[str, List[str]],
labels_to_choose: dict[str, float],
groups_to_choose: dict[str, float],
reduced_labels: dict[str, float],
config_groups: dict[str, list[str]],
mock_hgmo: None,
mock_repo: Tuple[str, str],
mock_repo: tuple[str, str],
mock_component_taskcluster_artifact: None,
mock_coverage_mapping_artifact: None,
mock_schedule_tests_classify: Callable[[Dict[str, float], Dict[str, float]], None],
mock_schedule_tests_classify: Callable[[dict[str, float], dict[str, float]], None],
) -> None:
# The repo should be almost empty at first
repo_dir, remote_repo_dir = mock_repo

Просмотреть файл

@ -12,7 +12,7 @@ import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Tuple
from typing import Any
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
@ -78,7 +78,7 @@ def plot_graph(
plt.close(figure)
def parse_metric_file(metric_file_path: Path) -> Tuple[datetime, str, Dict[str, Any]]:
def parse_metric_file(metric_file_path: Path) -> tuple[datetime, str, dict[str, Any]]:
# Load the metric
with open(metric_file_path, "r") as metric_file:
metric = json.load(metric_file)
@ -113,7 +113,7 @@ def analyze_metrics(
):
root = Path(metrics_directory)
metrics: Dict[str, Dict[str, Dict[datetime, float]]] = defaultdict(
metrics: dict[str, dict[str, dict[datetime, float]]] = defaultdict(
lambda: defaultdict(dict)
)

Просмотреть файл

@ -3,7 +3,6 @@
import argparse
from datetime import datetime
from logging import getLogger
from typing import List
import dateutil.parser
from dateutil.relativedelta import relativedelta
@ -81,7 +80,7 @@ class Retriever(object):
# Get IDs of bugs which are regressions, bugs which caused regressions (useful for the regressor model),
# and blocked bugs.
regression_related_ids: List[int] = list(
regression_related_ids: list[int] = list(
set(
sum(
(

Просмотреть файл

@ -9,7 +9,6 @@ import os
import subprocess
from fnmatch import fnmatch
from pathlib import Path
from typing import List
import taskcluster
@ -27,7 +26,7 @@ CURRENT_DIR = Path(__file__).resolve().parent
def download_metric(model_name: str, metric_directory: str):
download_script_path = "bugbug-retrieve-training-metrics"
cli_args: List[str] = [
cli_args: list[str] = [
download_script_path,
model_name,
"2019",
@ -43,7 +42,7 @@ def download_metric(model_name: str, metric_directory: str):
def check_metrics(metric_directory: str, output_directory: str):
analyze_script_path = "bugbug-analyze-training-metrics"
cli_args: List[str] = [analyze_script_path, metric_directory, output_directory]
cli_args: list[str] = [analyze_script_path, metric_directory, output_directory]
LOGGER.info("Checking metrics")
@ -65,7 +64,7 @@ def get_model_name(queue, task_id: str):
LOGGER.warning(f"No matching route found for task id {task_id}")
def get_model_names(task_id: str) -> List[str]:
def get_model_names(task_id: str) -> list[str]:
options = get_taskcluster_options()
queue = taskcluster.Queue(options)
task = queue.task(task_id)

Просмотреть файл

@ -11,7 +11,7 @@ import re
import subprocess
from datetime import datetime
from logging import INFO, basicConfig, getLogger
from typing import Optional, Tuple, cast
from typing import Optional, cast
import dateutil.parser
import hglib
@ -625,7 +625,7 @@ class CommitClassifier(object):
else:
self.classify_test_select(commits, runnable_jobs_path)
def classify_regressor(self, commits: Tuple[repository.CommitDict, ...]) -> None:
def classify_regressor(self, commits: tuple[repository.CommitDict, ...]) -> None:
# We use "clean" (or "dirty") commits as the background dataset for feature importance.
# This way, we can see the features which are most important in differentiating
# the current commit from the "clean" (or "dirty") commits.

Просмотреть файл

@ -15,7 +15,7 @@ import statistics
import traceback
import urllib.parse
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional, Set, Tuple, cast
from typing import Any, Optional, Set, cast
import bs4
import dateutil.parser
@ -57,7 +57,7 @@ PAST_FIXED_BUG_BLOCKED_BUGS_BY_URL = "https://community-tc.services.mozilla.com/
FUZZING_METABUG_ID = 316898
def _deduplicate(bug_summaries: List[dict]) -> List[dict]:
def _deduplicate(bug_summaries: list[dict]) -> list[dict]:
seen = set()
results = []
for bug_summary in bug_summaries[::-1]:
@ -79,7 +79,7 @@ def _download_past_bugs(url: str) -> dict:
return json.load(f)
def parse_risk_band(risk_band: str) -> Tuple[str, float, float]:
def parse_risk_band(risk_band: str) -> tuple[str, float, float]:
name, start, end = risk_band.split("-")
return (name, float(start), float(end))
@ -92,19 +92,19 @@ def get_full_component(bug):
return "{}::{}".format(bug["product"], bug["component"])
def histogram(components: List[str]) -> Dict[str, float]:
def histogram(components: list[str]) -> dict[str, float]:
counter = collections.Counter(components)
return {
component: count / len(components) for component, count in counter.most_common()
}
def component_histogram(bugs: List[dict]) -> Dict[str, float]:
def component_histogram(bugs: list[dict]) -> dict[str, float]:
return histogram([bug["component"] for bug in bugs])
# TODO: Remove once the mapping is updated in Bugzilla.
def get_component_team_mapping() -> Dict[str, Dict[str, str]]:
def get_component_team_mapping() -> dict[str, dict[str, str]]:
component_team_mapping = bugzilla.get_component_team_mapping()
component_team_mapping_override = json.loads(
@ -123,7 +123,7 @@ def get_crash_signatures(channel: str) -> dict:
return response["signatures"]
def get_crash_bugs(signatures: dict) -> List[int]:
def get_crash_bugs(signatures: dict) -> list[int]:
return [
bug["id"]
for data in signatures.values()
@ -204,7 +204,7 @@ class LandingsRiskReportGenerator(object):
past_bugs_by: dict,
commit: repository.CommitDict,
component: str = None,
) -> List[dict]:
) -> list[dict]:
paths = [
path
for path in commit["files"]
@ -266,28 +266,28 @@ class LandingsRiskReportGenerator(object):
def get_prev_bugs_stats(
self,
commit_group: dict,
commit_list: List[repository.CommitDict],
commit_list: list[repository.CommitDict],
component: str = None,
) -> None:
# Find previous regressions occurred in the same files as those touched by these commits.
# And find previous bugs that were fixed by touching the same files as these commits.
# And find previous bugs that were blocked by regressions occurred in the same files as those touched by these commits.
# And find previous bugs that were blocked by bugs that were fixed by touching the same files as those touched by these commits.
prev_regressions: List[Dict[str, Any]] = sum(
prev_regressions: list[dict[str, Any]] = sum(
(
self.get_prev_bugs(self.past_regressions_by, commit, component)
for commit in commit_list
),
[],
)
prev_fixed_bugs: List[Dict[str, Any]] = sum(
prev_fixed_bugs: list[dict[str, Any]] = sum(
(
self.get_prev_bugs(self.past_fixed_bugs_by, commit, component)
for commit in commit_list
),
[],
)
prev_regression_blocked_bugs: List[Dict[str, Any]] = sum(
prev_regression_blocked_bugs: list[dict[str, Any]] = sum(
(
self.get_prev_bugs(
self.past_regression_blocked_bugs_by, commit, component
@ -296,7 +296,7 @@ class LandingsRiskReportGenerator(object):
),
[],
)
prev_fixed_bug_blocked_bugs: List[Dict[str, Any]] = sum(
prev_fixed_bug_blocked_bugs: list[dict[str, Any]] = sum(
(
self.get_prev_bugs(
self.past_fixed_bug_blocked_bugs_by, commit, component
@ -339,11 +339,11 @@ class LandingsRiskReportGenerator(object):
"most_common_fixed_bug_blocked_bug_components"
] = fixed_bug_blocked_bug_components
def get_landed_and_filed_since(self, days: int) -> List[int]:
def get_landed_and_filed_since(self, days: int) -> list[int]:
since = datetime.utcnow() - timedelta(days=days)
commits = []
last_commit_by_bug: Dict[int, datetime] = {}
last_commit_by_bug: dict[int, datetime] = {}
for commit in repository.get_commits():
if not commit["bug_id"]:
continue
@ -364,7 +364,7 @@ class LandingsRiskReportGenerator(object):
return list(set(commit["bug_id"] for commit in commits) | set(timespan_ids))
def get_regressors_of(self, bug_ids: List[int]) -> List[int]:
def get_regressors_of(self, bug_ids: list[int]) -> list[int]:
bugzilla.download_bugs(bug_ids)
return sum(
(
@ -376,8 +376,8 @@ class LandingsRiskReportGenerator(object):
)
def get_blocking_of(
self, bug_ids: List[int], meta_only: bool = False
) -> Dict[int, List[int]]:
self, bug_ids: list[int], meta_only: bool = False
) -> dict[int, list[int]]:
bug_map = {bug["id"]: bug for bug in bugzilla.get_bugs()}
return {
bug_id: bugzilla.find_blocking(bug_map, bug_map[bug_id])
@ -385,7 +385,7 @@ class LandingsRiskReportGenerator(object):
if not meta_only or "meta" in bug_map[bug_id]["keywords"]
}
def get_meta_bugs(self, days: int) -> List[int]:
def get_meta_bugs(self, days: int) -> list[int]:
return bugzilla.get_ids(
{
"keywords": "feature-testing-meta",
@ -397,7 +397,7 @@ class LandingsRiskReportGenerator(object):
}
)
def retrieve_test_info(self, days: int) -> Dict[str, Any]:
def retrieve_test_info(self, days: int) -> dict[str, Any]:
logger.info("Download previous test info...")
db.download(TEST_INFOS_DB)
@ -456,10 +456,10 @@ class LandingsRiskReportGenerator(object):
def generate_landings_by_date(
self,
bug_map: Dict[int, bugzilla.BugDict],
bug_map: dict[int, bugzilla.BugDict],
regressor_bug_ids: Set[int],
bugs: List[int],
meta_bugs: Dict[int, List[int]],
bugs: list[int],
meta_bugs: dict[int, list[int]],
) -> None:
# A map from bug ID to the list of commits associated to the bug (in order of landing).
bug_to_commits = collections.defaultdict(list)
@ -542,7 +542,7 @@ class LandingsRiskReportGenerator(object):
assert False
def get_commit_data(commit_list: List[repository.CommitDict]) -> List[dict]:
def get_commit_data(commit_list: list[repository.CommitDict]) -> list[dict]:
if len(commit_list) == 0:
return []
@ -736,7 +736,7 @@ class LandingsRiskReportGenerator(object):
json.dump(output, f)
def generate_component_connections(
self, bug_map: Dict[int, bugzilla.BugDict], bugs: List[int]
self, bug_map: dict[int, bugzilla.BugDict], bugs: list[int]
) -> None:
bugs_set = set(bugs)
commits = [
@ -818,10 +818,10 @@ class LandingsRiskReportGenerator(object):
repository.close_component_mapping()
def generate_component_test_stats(
self, bug_map: Dict[int, bugzilla.BugDict], test_infos: Dict[str, Any]
self, bug_map: dict[int, bugzilla.BugDict], test_infos: dict[str, Any]
) -> None:
component_test_stats: Dict[
str, Dict[str, Dict[str, List[Dict[str, int]]]]
component_test_stats: dict[
str, dict[str, dict[str, list[dict[str, int]]]]
] = collections.defaultdict(
lambda: collections.defaultdict(lambda: collections.defaultdict(list))
)
@ -856,7 +856,7 @@ class LandingsRiskReportGenerator(object):
bugs = list(set(bugs))
test_infos = self.retrieve_test_info(days)
test_info_bugs: List[int] = [
test_info_bugs: list[int] = [
bug["id"] for test_info in test_infos.values() for bug in test_info["bugs"]
]

Просмотреть файл

@ -5,7 +5,6 @@
import argparse
from logging import getLogger
from typing import List, Tuple
from bugbug import db
from bugbug.github import Github, IssueDict
@ -33,8 +32,8 @@ class Retriever(object):
)
def replace_with_private(
self, original_data: List[IssueDict]
) -> Tuple[List[IssueDict], set]:
self, original_data: list[IssueDict]
) -> tuple[list[IssueDict], set]:
"""Replace title and body of automatically closed public issues.
Replace them with title and body of a corresponding private issue

Просмотреть файл

@ -7,7 +7,6 @@ import argparse
import json
import logging
from collections import defaultdict
from typing import Dict, List
from tqdm import tqdm
@ -62,29 +61,29 @@ class PastBugsCollector(object):
def dimension_to_field(dimension: str) -> str:
return f"{dimension}s" if dimension != "directory" else "directories"
past_regressions_by: Dict[str, Dict[str, List[int]]] = defaultdict(
past_regressions_by: dict[str, dict[str, list[int]]] = defaultdict(
lambda: defaultdict(list)
)
past_fixed_bugs_by: Dict[str, Dict[str, List[int]]] = defaultdict(
past_fixed_bugs_by: dict[str, dict[str, list[int]]] = defaultdict(
lambda: defaultdict(list)
)
past_regression_blocked_bugs_by: Dict[str, Dict[str, List[int]]] = defaultdict(
past_regression_blocked_bugs_by: dict[str, dict[str, list[int]]] = defaultdict(
lambda: defaultdict(list)
)
past_fixed_bug_blocked_bugs_by: Dict[str, Dict[str, List[int]]] = defaultdict(
past_fixed_bug_blocked_bugs_by: dict[str, dict[str, list[int]]] = defaultdict(
lambda: defaultdict(list)
)
past_regressions_by_function: Dict[str, Dict[str, List[int]]] = defaultdict(
past_regressions_by_function: dict[str, dict[str, list[int]]] = defaultdict(
lambda: defaultdict(list)
)
past_fixed_bugs_by_function: Dict[str, Dict[str, List[int]]] = defaultdict(
past_fixed_bugs_by_function: dict[str, dict[str, list[int]]] = defaultdict(
lambda: defaultdict(list)
)
past_regression_blocked_bugs_by_function: Dict[
str, Dict[str, List[int]]
past_regression_blocked_bugs_by_function: dict[
str, dict[str, list[int]]
] = defaultdict(lambda: defaultdict(list))
past_fixed_bug_blocked_bugs_by_function: Dict[
str, Dict[str, List[int]]
past_fixed_bug_blocked_bugs_by_function: dict[
str, dict[str, list[int]]
] = defaultdict(lambda: defaultdict(list))
for commit in tqdm(repository.get_commits()):
@ -134,7 +133,7 @@ class PastBugsCollector(object):
bugzilla.find_blocked_by(bug_map, bug)
)
def _transform(bug_ids: List[int]) -> List[dict]:
def _transform(bug_ids: list[int]) -> list[dict]:
seen = set()
results = []
for bug_id in bug_ids:
@ -154,8 +153,8 @@ class PastBugsCollector(object):
return results
def past_bug_ids_to_summaries(
past_bugs_by: Dict[str, List[int]]
) -> Dict[str, List[dict]]:
past_bugs_by: dict[str, list[int]]
) -> dict[str, list[dict]]:
return {path: _transform(bug_ids) for path, bug_ids in past_bugs_by.items()}
for dimension in by_dimensions:
@ -190,8 +189,8 @@ class PastBugsCollector(object):
zstd_compress(f"data/past_fixed_bug_blocked_bugs_by_{dimension}.json")
def past_function_bug_ids_to_summaries(
past_bugs: Dict[str, Dict[str, List[int]]]
) -> Dict[str, Dict[str, List[dict]]]:
past_bugs: dict[str, dict[str, list[int]]]
) -> dict[str, dict[str, list[dict]]]:
return {
path: {
func: _transform(bug_ids) for func, bug_ids in funcs_bugs.items()

Просмотреть файл

@ -9,7 +9,7 @@ import logging
import time
import traceback
from datetime import datetime
from typing import Any, Dict, Optional
from typing import Any, Optional
import matplotlib.pyplot as plt
import mozci.push
@ -32,7 +32,7 @@ db.register(
def analyze_shadow_schedulers(
push: mozci.push.Push,
) -> Dict[str, Any]:
) -> dict[str, Any]:
schedulers = []
for name, config_groups in push.generate_all_shadow_scheduler_config_groups():
@ -189,7 +189,7 @@ def plot_graphs(granularity: str) -> None:
if scheduler_stat["id"] not in regressions_by_rev:
continue
obj: Dict[str, Any] = {
obj: dict[str, Any] = {
"date": datetime.utcfromtimestamp(scheduler_stat["date"]),
}

Просмотреть файл

@ -10,7 +10,7 @@ import os
import traceback
from datetime import datetime
from logging import INFO, basicConfig, getLogger
from typing import Any, Dict, Generator, List
from typing import Any, Generator
import dateutil.parser
import mozci.errors
@ -57,8 +57,8 @@ class Retriever(object):
def generate(
progress_bar: tqdm,
pushes: List[mozci.push.Push],
futures: List[concurrent.futures.Future],
pushes: list[mozci.push.Push],
futures: list[concurrent.futures.Future],
) -> Generator[PushResult, None, None]:
nonlocal reretrieve
num_cached = 0
@ -207,7 +207,7 @@ class Retriever(object):
granularity, push_data_iter(), push_data_count
)
def generate_all_data() -> Generator[Dict[str, Any], None, None]:
def generate_all_data() -> Generator[dict[str, Any], None, None]:
past_failures = test_scheduling.get_past_failures(granularity, False)
push_num = past_failures["push_num"] if "push_num" in past_failures else 0

Просмотреть файл

@ -4,14 +4,13 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
from datetime import timedelta
from typing import List
from bugbug import phabricator
def test_get_first_review_time() -> None:
# No transactions.
transactions: List[phabricator.TransactionDict] = []
transactions: list[phabricator.TransactionDict] = []
assert (
phabricator.get_first_review_time(
phabricator.RevisionDict({"id": 1, "transactions": transactions})

Просмотреть файл

@ -4,7 +4,6 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
from datetime import datetime
from typing import List
import pytest
from _pytest.monkeypatch import MonkeyPatch
@ -168,7 +167,7 @@ def test_touched_together(monkeypatch: MonkeyPatch) -> None:
]
commits = [c.to_dict() for c in commits]
def mock_get_commits() -> List[CommitDict]:
def mock_get_commits() -> list[CommitDict]:
return commits
monkeypatch.setattr(repository, "get_commits", mock_get_commits)
@ -286,7 +285,7 @@ def test_touched_together_restart(monkeypatch: MonkeyPatch) -> None:
]
commits = [c.to_dict() for c in commits]
def mock_get_commits() -> List[CommitDict]:
def mock_get_commits() -> list[CommitDict]:
return commits
monkeypatch.setattr(repository, "get_commits", mock_get_commits)
@ -416,7 +415,7 @@ def test_touched_together_not_in_order(monkeypatch: MonkeyPatch) -> None:
]
commits = [c.to_dict() for c in commits]
def mock_get_commits() -> List[CommitDict]:
def mock_get_commits() -> list[CommitDict]:
return commits
monkeypatch.setattr(repository, "get_commits", mock_get_commits)
@ -530,7 +529,7 @@ def test_touched_together_with_backout(monkeypatch: MonkeyPatch) -> None:
]
commits = [c.to_dict() for c in commits]
def mock_get_commits() -> List[CommitDict]:
def mock_get_commits() -> list[CommitDict]:
return commits
monkeypatch.setattr(repository, "get_commits", mock_get_commits)

Просмотреть файл

@ -6,7 +6,7 @@
import itertools
import math
import pickle
from typing import Dict, Iterator, Tuple
from typing import Iterator
import hypothesis
import hypothesis.strategies as st
@ -601,7 +601,7 @@ def test_all(g: Graph) -> None:
test_scheduling.remove_failing_together_db("label")
# TODO: Also add some couples that are *not* failing together.
ft: Dict[str, Dict[str, Tuple[float, float]]] = {}
ft: dict[str, dict[str, tuple[float, float]]] = {}
for edge in g.es:
task1 = tasks[edge.tuple[0]]