Use unique run names in Weight & Biases (#727)

* Use unique names for W&B runs

* Update tests

* Fix online group_logs publication

* Use task group ID suffix in offline publication from Taskcluster

* Use task group ID suffix in offline publication from GCP old Snakemake experiments

* Fix

* TRASHME Test publication from CI

* Revert "TRASHME Test publication from CI"

This reverts commit 4e15ed4eb4.
This commit is contained in:
Valentin Rigal 2024-07-12 20:46:26 +02:00 коммит произвёл GitHub
Родитель 6b6b64999e
Коммит 2027f4e99b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
7 изменённых файлов: 164 добавлений и 65 удалений

Просмотреть файл

@ -44,7 +44,7 @@ def samples_dir():
wandb_artifacts=None,
wandb_group="group",
wandb_publication=True,
wandb_run_name="run",
wandb_run_name="run_id",
tags=[
"unittest",
],
@ -58,6 +58,7 @@ def test_taskcluster(wandb_mock, getargs_mock, caplog, samples_dir, tmp_dir):
wandb_dir = tmp_dir / "wandb"
wandb_dir.mkdir(parents=True)
wandb_mock.init.return_value.dir = wandb_dir
wandb_mock.init.return_value.resumed = False
tc_publish.main()
assert [(level, message) for _module, level, message in caplog.record_tuples] == [
(logging.INFO, "Reading logs stream."),
@ -66,6 +67,20 @@ def test_taskcluster(wandb_mock, getargs_mock, caplog, samples_dir, tmp_dir):
(logging.INFO, "Found 102 training entries"),
(logging.INFO, "Found 34 validation entries"),
]
assert [
(
c.kwargs["project"],
c.kwargs["group"],
c.kwargs["name"],
c.kwargs["id"],
c.kwargs["config"].get("after"),
)
for c in wandb_mock.init.call_args_list
] == [
("test", "group", "run_id", "run_id", "2e"),
]
with (samples_dir / "taskcluster_wandb_calls.json").open("r") as f:
assert list(wandb_mock.init.return_value.log.call_args_list) == [
call(**entry) for entry in json.load(f)
@ -82,6 +97,7 @@ def test_experiments_marian_1_10(wandb_mock, getargs_mock, caplog, samples_dir,
wandb_dir = tmp_dir / "wandb"
wandb_dir.mkdir(parents=True)
wandb_mock.init.return_value.dir = wandb_dir
wandb_mock.init.return_value.resumed = False
wandb_mock.plot.bar = lambda *args, **kwargs: (args, kwargs)
wandb_mock.Table = lambda *args, **kwargs: (args, kwargs)
experiments_publish.main()
@ -132,6 +148,29 @@ def test_experiments_marian_1_10(wandb_mock, getargs_mock, caplog, samples_dir,
(logging.INFO, "Creating missing run teacher-ensemble with associated metrics"),
]
)
assert [
(
c.kwargs["project"],
c.kwargs["group"],
c.kwargs["name"],
c.kwargs["id"],
c.kwargs["config"].get("after"),
)
for c in wandb_mock.init.call_args_list
] == [
("en-nl", "prod", "student_prod", "student_prod", "0e"),
("en-nl", "prod", "teacher-finetune-0_prod", "teacher-finetune-0_prod", "0e"),
("en-nl", "prod", "teacher-finetune-1_prod", "teacher-finetune-1_prod", "0e"),
("en-nl", "prod", "quantized_prod", "quantized_prod", None),
("en-nl", "prod", "backwards_prod", "backwards_prod", None),
("en-nl", "prod", "student-finetune_prod", "student-finetune_prod", None),
("en-nl", "prod", "teacher-base-0_prod", "teacher-base-0_prod", None),
("en-nl", "prod", "teacher-base-1_prod", "teacher-base-1_prod", None),
("en-nl", "prod", "teacher-ensemble_prod", "teacher-ensemble_prod", None),
("en-nl", "prod", "group_logs_prod", "group_logs_prod", None),
]
log_calls, metrics_calls = [], []
for log in wandb_mock.init.return_value.log.call_args_list:
if log.args:
@ -177,6 +216,7 @@ def test_experiments_marian_1_12(wandb_mock, getargs_mock, caplog, samples_dir,
wandb_dir = tmp_dir / "wandb"
wandb_dir.mkdir(parents=True)
wandb_mock.init.return_value.dir = wandb_dir
wandb_mock.init.return_value.resumed = False
wandb_mock.plot.bar = lambda *args, **kwargs: (args, kwargs)
wandb_mock.Table = lambda *args, **kwargs: (args, kwargs)
experiments_publish.main()
@ -210,6 +250,23 @@ def test_experiments_marian_1_12(wandb_mock, getargs_mock, caplog, samples_dir,
(logging.INFO, "Detected Marian version 1.12"),
]
)
assert [
(
c.kwargs["project"],
c.kwargs["group"],
c.kwargs["name"],
c.kwargs["id"],
c.kwargs["config"].get("after"),
)
for c in wandb_mock.init.call_args_list
] == [
("fi-en", "opusprod", "student_opusprod", "student_opusprod", "0e"),
("fi-en", "opusprod", "student-finetune_opusprod", "student-finetune_opusprod", "0e"),
("fi-en", "opusprod", "quantized_opusprod", "quantized_opusprod", None),
("fi-en", "opusprod", "group_logs_opusprod", "group_logs_opusprod", None),
]
log_calls, metrics_calls = [], []
for log in wandb_mock.init.return_value.log.call_args_list:
if log.args:
@ -309,6 +366,7 @@ def test_taskcluster_wandb_log_failures(wandb_mock, getargs_mock, caplog, sample
wandb_dir = tmp_dir / "wandb"
wandb_dir.mkdir(parents=True)
wandb_mock.init.return_value.dir = wandb_dir
wandb_mock.init.return_value.resumed = False
wandb_mock.init.return_value.log.side_effect = Exception("Unexpected failure")
tc_publish.main()
assert [(level, message) for _module, level, message in caplog.record_tuples] == [

Просмотреть файл

@ -33,9 +33,11 @@ def get_args() -> argparse.Namespace:
def parse_experiment(
*,
project: str,
group: str,
name: str,
suffix: str,
logs_file: Path,
metrics_dir: Path | None = None,
) -> None:
@ -57,8 +59,9 @@ def parse_experiment(
publishers=[
WandB(
project=project,
name=name,
group=group,
name=name,
suffix=suffix,
)
],
)
@ -91,6 +94,8 @@ def main() -> None:
project, group, *name = parents
base_name = name[0]
name = "_".join(name)
# Directly use group name as a suffix from GCP experiments, since we don't have access to the task group ID
suffix = f"_{group}"
try:
name = parse_task_label(f"train-{name}").model
except ValueError:
@ -105,7 +110,14 @@ def main() -> None:
if metrics_dir is None:
logger.warning("Evaluation metrics files not found, skipping.")
try:
parse_experiment(project, group, name, file, metrics_dir=metrics_dir)
parse_experiment(
project=project,
group=group,
name=name,
suffix=suffix,
logs_file=file,
metrics_dir=metrics_dir,
)
existing_runs.append(name)
except Exception as e:
logger.error(f"An exception occured parsing {file}: {e}")
@ -121,6 +133,12 @@ def main() -> None:
logger.info(
f"Publishing '{last_project}/{last_group}' evaluation metrics and files (fake run 'group_logs')"
)
WandB.publish_group_logs(prefix, last_project, last_group, existing_runs=existing_runs)
WandB.publish_group_logs(
logs_parent_folder=prefix,
project=last_project,
group=last_group,
suffix=suffix,
existing_runs=existing_runs,
)
existing_runs = []
last_index = (project, group)

Просмотреть файл

@ -23,7 +23,11 @@ from pathlib import Path
import taskcluster
from translations_parser.parser import TrainingParser, logger
from translations_parser.publishers import CSVExport, Publisher
from translations_parser.utils import publish_group_logs_from_tasks, taskcluster_log_filter
from translations_parser.utils import (
publish_group_logs_from_tasks,
suffix_from_group,
taskcluster_log_filter,
)
from translations_parser.wandb import add_wandb_arguments, get_wandb_publisher
queue = taskcluster.Queue({"rootUrl": "https://firefox-ci-tc.services.mozilla.com"})
@ -126,7 +130,12 @@ def boot() -> None:
queue.getTaskGroup(group_id)
task_group = queue.task(group_id)
config = task_group.get("extra", {}).get("action", {}).get("context", {}).get("input")
publish_group_logs_from_tasks(config=config)
publish_group_logs_from_tasks(
project=wandb_publisher.project,
group=wandb_publisher.group,
config=config,
suffix=suffix_from_group(group_id),
)
# Use log filtering when using non-stream (for uploading past experiments)
log_filter = taskcluster_log_filter if not args.from_stream else None

Просмотреть файл

@ -24,6 +24,7 @@ from translations_parser.utils import (
build_task_name,
parse_task_label,
publish_group_logs_from_tasks,
suffix_from_group,
)
KIND_TAG_TARGET = ("train", "finetune")
@ -76,7 +77,9 @@ def get_logs(task: dict) -> list[str]:
return log.tobytes().decode().split("\n")
def publish_task(project: str, group: str, name: str, task: dict, metrics: list[Metric]) -> None:
def publish_task(
*, project: str, group: str, name: str, suffix: str, task: dict, metrics: list[Metric]
) -> None:
logs = get_logs(task)
if not logs:
logger.warning(f"Skipping publication of training task {name}")
@ -88,6 +91,7 @@ def publish_task(project: str, group: str, name: str, task: dict, metrics: list[
project=project,
group=group,
name=name,
suffix=suffix,
tags=["taskcluster-offline"],
)
],
@ -256,6 +260,7 @@ def publish_task_group(group_id: str, override: bool = False) -> None:
publish_task(
project=project_name,
group=group_name,
suffix=suffix_from_group(group_id),
name=training_task["name"],
task=training_task,
metrics=metrics,

Просмотреть файл

@ -80,7 +80,11 @@ class CSVExport(Publisher):
class WandB(Publisher):
def __init__(
self,
*,
project: str,
group: str,
name: str,
suffix: str = "",
# Optional path to a directory containing training artifacts
artifacts: Path | None = None,
artifacts_name: str = "logs",
@ -93,6 +97,11 @@ class WandB(Publisher):
self.wandb_logger.setLevel(logging.ERROR)
self.project = project
self.group = group
# Build a unique run identifier based on the passed suffix
# This ID is also used as display name on W&B, as the interface expects unique display names among runs
self.run = f"{name}{suffix}"
self.artifacts = artifacts
self.artifacts_name = artifacts_name
self.extra_kwargs = extra_kwargs
@ -104,50 +113,25 @@ class WandB(Publisher):
config = getattr(parser, "config", {})
config.update(self.extra_kwargs.pop("config", {}))
# Avoid overriding an existing run on a first training, this should not happen
if resume is False and int(os.environ.get("RUN_ID", 0)) > 0:
logger.warning(
"Training has been resumed but resume option has been set to False, skipping publication."
)
return
try:
project = next(filter(lambda p: p.name == self.project, wandb.Api().projects()), None)
# Check if a W&B run already exists with this name
existing_runs = []
if project and (name := self.extra_kwargs.get("name")):
existing_runs = list(
wandb.Api().runs(
self.project,
filters={"display_name": name, "group": self.extra_kwargs.get("group")},
)
)
if len(existing_runs) == 0:
# Start a new W&B run
self.wandb = wandb.init(
project=self.project,
config=config,
**self.extra_kwargs,
)
return
elif len(existing_runs) == 1:
run = existing_runs[0]
# Avoid overriding an existing run on a first training, this should not happen
if resume is False and int(os.environ.get("RUN_ID", 0)) < 1:
logger.warning(
f"A W&B run already exists with name '{name}': {run}. No data will be published."
)
return
# Resume an existing run
logger.info(
f"Training has been resumed from an earlier run wit name '{name}', "
f"continue W&B publication with run {run}."
)
self.wandb = wandb.init(
project=self.project,
config=config,
id=run.id,
resume="must",
**self.extra_kwargs,
)
else:
logger.warning(
f"Multiple W&B runs already exist with name '{name}': {existing_runs}. No data will be published."
)
return
self.wandb = wandb.init(
project=self.project,
group=self.group,
name=self.run,
id=self.run,
config=config,
resume=resume,
**self.extra_kwargs,
)
if self.wandb.resumed:
logger.info(f"W&B run is being resumed from existing run '{self.run}'.")
except Exception as e:
logger.error(f"WandB client could not be initialized: {e}. No data will be published.")
@ -217,9 +201,11 @@ class WandB(Publisher):
@classmethod
def publish_group_logs(
cls,
*,
logs_parent_folder: list[str],
project: str,
group: str,
suffix: str,
existing_runs: list[str] | None = None,
) -> None:
"""
@ -301,7 +287,12 @@ class WandB(Publisher):
for model_name, model_metrics in missing_run_metrics.items():
logger.info(f"Creating missing run {model_name} with associated metrics")
publisher = cls(project=project, name=model_name, group=group)
publisher = cls(
project=project,
group=group,
name=model_name,
suffix=suffix,
)
publisher.open(TrainingParser(logs_iter=iter([]), publishers=[]))
publisher.handle_metrics(model_metrics)
publisher.close()
@ -326,11 +317,13 @@ class WandB(Publisher):
project=project,
group=group,
name="group_logs",
suffix=suffix,
)
publisher.wandb = wandb.init(
project=project,
group=group,
name="group_logs",
name=publisher.run,
id=publisher.run,
config=config,
)

Просмотреть файл

@ -201,8 +201,10 @@ def metric_from_tc_context(chrf: float, bleu: float, comet: float):
def publish_group_logs_from_tasks(
project: str | None = None,
group: str | None = None,
*,
project: str,
group: str,
suffix: str | None = None,
metrics_tasks: dict[str, dict] = {},
config: dict = {},
):
@ -212,17 +214,12 @@ def publish_group_logs_from_tasks(
`metrics_tasks` optionally contains finished evaluation tasks that will be published as new runs.
"""
from translations_parser.publishers import WandB
from translations_parser.wandb import get_wandb_names
message = "Handling group_logs publication"
if metrics_tasks:
message += f" with {len(metrics_tasks)} extra evaluation tasks"
logger.info(message)
if project is None or group is None:
logger.info("Retrieving W&B names from taskcluster attributes")
project, group, _ = get_wandb_names()
with tempfile.TemporaryDirectory() as temp_dir:
logs_folder = Path(temp_dir) / "logs"
metrics_folder = logs_folder / project / group / "metrics"
@ -263,4 +260,18 @@ def publish_group_logs_from_tasks(
yaml.dump(config, config_file)
parents = str(logs_folder.resolve()).strip().split("/")
WandB.publish_group_logs(parents, project, group, existing_runs=[])
WandB.publish_group_logs(
logs_parent_folder=parents,
project=project,
group=group,
suffix=suffix,
existing_runs=[],
)
def suffix_from_group(task_group_id: str) -> str:
# Simply return the first 5 characters of the Taskcluster group ID as unique runs suffix
assert (
len(task_group_id) >= 5
), f"Taskcluster group ID should contain more than 5 characters: {task_group_id}"
return f"_{task_group_id[:5]}"

Просмотреть файл

@ -4,7 +4,7 @@ from pathlib import Path
import taskcluster
from translations_parser.parser import logger
from translations_parser.publishers import WandB
from translations_parser.utils import build_task_name
from translations_parser.utils import build_task_name, suffix_from_group
def add_wandb_arguments(parser):
@ -65,10 +65,12 @@ def get_wandb_token(secret_name):
)
def get_wandb_names():
def get_wandb_names() -> tuple[str, str, str, str]:
"""
Find the various names needed to publish on Weight & Biases using
the taskcluster task & group payloads
the taskcluster task & group payloads.
Returns project, group, run names and the task group ID.
"""
task_id = os.environ.get("TASK_ID")
if not task_id:
@ -94,11 +96,11 @@ def get_wandb_names():
else:
experiment = config["experiment"]
# Build project, group and run names
return (
f'{experiment["src"]}-{experiment["trg"]}',
f'{experiment["name"]}_{group_id}',
task_name,
group_id,
)
@ -119,6 +121,7 @@ def get_wandb_publisher(
return
# Load secret from Taskcluster and auto-configure naming
suffix = ""
if taskcluster_secret:
assert os.environ.get(
"TASKCLUSTER_PROXY_URL"
@ -127,7 +130,8 @@ def get_wandb_publisher(
# Weight and Biases client use environment variable to read the token
os.environ.setdefault("WANDB_API_KEY", get_wandb_token(taskcluster_secret))
project_name, group_name, run_name = get_wandb_names()
project_name, group_name, run_name, task_group_id = get_wandb_names()
suffix = suffix_from_group(task_group_id)
# Enable publication on weight and biases when project is set
# But prevent running when explicitly disabled by operator
@ -148,6 +152,7 @@ def get_wandb_publisher(
project=project_name,
group=group_name,
name=run_name,
suffix=suffix,
artifacts=artifacts,
tags=tags,
config=config,