From f1668c1a1ca61c6ddac563c594b25475ad07a440 Mon Sep 17 00:00:00 2001 From: Greg Tatum Date: Thu, 17 Oct 2024 13:54:31 -0500 Subject: [PATCH] Merge corpus rewrite to python (#851) --- pipeline/clean/merge-corpus.py | 318 ++++++++++++++++++ pipeline/clean/merge-corpus.sh | 42 --- pipeline/clean/merge-mono.py | 7 +- .../requirements/{merge-mono.in => merge.in} | 0 .../{merge-mono.txt => merge.txt} | 0 pipeline/common/downloads.py | 43 ++- pipeline/data/importers/mono/hplt.py | 2 +- taskcluster/configs/config.ci.yml | 9 +- taskcluster/kinds/merge-corpus/kind.yml | 123 +++---- taskcluster/kinds/merge-devset/kind.yml | 123 +++---- taskcluster/kinds/merge-mono/kind.yml | 4 +- .../translations_taskgraph/parameters.py | 1 + tests/fixtures/__init__.py | 53 +-- tests/test_merge_corpus.py | 219 ++++++++++++ tests/test_merge_mono.py | 4 +- 15 files changed, 736 insertions(+), 212 deletions(-) create mode 100644 pipeline/clean/merge-corpus.py delete mode 100755 pipeline/clean/merge-corpus.sh rename pipeline/clean/requirements/{merge-mono.in => merge.in} (100%) rename pipeline/clean/requirements/{merge-mono.txt => merge.txt} (100%) create mode 100644 tests/test_merge_corpus.py diff --git a/pipeline/clean/merge-corpus.py b/pipeline/clean/merge-corpus.py new file mode 100644 index 00000000..1f5c282a --- /dev/null +++ b/pipeline/clean/merge-corpus.py @@ -0,0 +1,318 @@ +""" +Merges multiple corpora into a single "source" language file, and a single "target" +language file, each. + +For instance: + + dataset1.en.zst dataset1.ru.zst + dataset2.en.zst dataset2.ru.zst + dataset3.en.zst dataset3.ru.zst + + Gets merged into: + + corpus.en.zst + corpus.ru.zst +""" + +import argparse +from contextlib import ExitStack +from glob import glob +from pathlib import Path +from typing import Generator, Optional +from pipeline.common.datasets import ( + FilteringStep, + Statistics, + WeakStringSet, + shuffle_with_max_lines, +) +from pipeline.common.downloads import get_human_readable_file_size, read_lines, write_lines +from pipeline.common.logging import get_logger + +logger = get_logger(__file__) + +# TODO(CJK) - Issue #424 +MAX_WORDS_IN_SENTENCE = 100 + + +class FilteringStatistics(Statistics): + """ + Gather statistics about the filtering process. + """ + + def __init__(self, dataset_path: Path) -> None: + super().__init__(dataset_path) + self.parallel_corpus = FilteringStep( + "The parallel corpora are merged and deduplicated", + ) + self.final_truncated = FilteringStep("The final result can be truncated by max_lines") + self.datasets = [] + + def add_parallel_dataset(self, location: str): + # e.g. /path/to/ada83_v1.en.zst + path = Path(location) + # e.g. ada83_v1 + dataset_stem = Path(path.stem).stem + step = FilteringStep(dataset_stem) + self.datasets.append(step) + return step + + +def log_dataset(location: str): + logger.info(f"Reading dataset {location}") + + +class DeduplicateCorpus: + def __init__( + self, + datasets_src: list[Path], + datasets_trg: list[Path], + src_outpath: Path, + trg_outpath: Path, + stats: FilteringStatistics, + ) -> None: + self.datasets_src: list[Path] = datasets_src + self.datasets_trg: list[Path] = datasets_trg + self.src_outpath: Path = src_outpath + self.trg_outpath: Path = trg_outpath + self.stats: FilteringStatistics = stats + self.dataset_stats: FilteringStep = None + + def run( + self, + total_corpus_bytes: int, + max_lines: Optional[int], + ): + stats = self.stats + with ExitStack() as stack: + src_outfile = stack.enter_context(write_lines(self.src_outpath)) + trg_outfile = stack.enter_context(write_lines(self.trg_outpath)) + + if max_lines: + for line in shuffle_with_max_lines( + line_stream=self.yield_lines_string(stack), + seed=38540735095, + max_lines=max_lines, + max_words_in_sentence=MAX_WORDS_IN_SENTENCE, + total_byte_size=total_corpus_bytes, + ): + src_line, trg_line = line.split("\t") + src_outfile.write(src_line) + trg_outfile.write(trg_line) + + stats.final_truncated.visited = stats.parallel_corpus.kept + stats.final_truncated.kept = min(max_lines, stats.parallel_corpus.kept) + else: + for src_line, trg_line in self.yield_lines_tuple(stack): + src_outfile.write(src_line) + trg_outfile.write(trg_line) + + stats.final_truncated.kept = stats.parallel_corpus.kept + stats.final_truncated.visited = stats.parallel_corpus.kept + + def yield_lines_tuple(self, stack: ExitStack) -> Generator[tuple[str, str], None, None]: + strings_seen = WeakStringSet() + stats = self.stats + src_lines: Generator[str, None, None] = stack.enter_context( + read_lines(self.datasets_src, on_enter_location=self.on_enter_location) + ) + trg_lines: Generator[str, None, None] = stack.enter_context( + read_lines(self.datasets_trg, on_enter_location=log_dataset) + ) + + for src_line, trg_line in zip(src_lines, trg_lines): + # No separator is needed as the newline is included. + line = src_line + trg_line + + if line in strings_seen: + stats.parallel_corpus.filtered += 1 + self.dataset_stats.filtered += 1 + else: + stats.parallel_corpus.kept += 1 + self.dataset_stats.kept += 1 + + strings_seen.add(line) + + yield src_line, trg_line + + def yield_lines_string(self, stack: ExitStack) -> Generator[str, None, None]: + for src_line, trg_line in self.yield_lines_tuple(stack): + if "\t" in src_line or "\t" in trg_line: + logger.error("A line contained a tab character, skipping:") + logger.error(f" src: {src_line}") + logger.error(f" trg: {src_line}") + else: + yield f"{src_line}\t{trg_line}" + + def on_enter_location(self, location): + log_dataset(location) + self.dataset_stats = self.stats.add_parallel_dataset(location) + + +def sample_corpus( + artifacts: Path, name: str, sample_size: int, src_outpath: Path, trg_outpath: Path +): + """ + Generate a sample of the corpus data with the following format: + + e.g. + > cat artifacts/corpus.sample.txt + Sentence 1 in source language + Sentence 1 in target language + + Sentence 2 in source language + Sentence 2 in target language + + Sentence 3 in source language + Sentence 3 in target language + ... + """ + total_byte_size = src_outpath.stat().st_size + trg_outpath.stat().st_size + + with ExitStack() as stack: + sample_path = artifacts / f"{name}.sample.txt" + + src_lines = stack.enter_context(read_lines(src_outpath)) + trg_lines = stack.enter_context(read_lines(trg_outpath)) + sample_outfile = stack.enter_context( + write_lines( + sample_path, + # The browser won't know the encoding when viewing this sample without including + # a "byte order mark", which python can do via this encoding. + encoding="utf-8-sig", + ) + ) + + def join_src_trg(): + for src_line, trg_line in zip(src_lines, trg_lines): + # The src and trg line each have a newline at the end. This means that + # each sentence pair will be separate by a blank line to make for easy + # scanning of datasets. + yield f"{src_line}{trg_line}\n" + + logger.info("Stream in:") + logger.info(f" - {src_outpath}") + logger.info(f" - {trg_outpath}") + logger.info(f"Write a {sample_size:,} line sample of the merged corpus:") + logger.info(f" - {sample_path}") + + for line in shuffle_with_max_lines( + line_stream=join_src_trg(), + seed=9834523434, + max_lines=sample_size, + max_words_in_sentence=MAX_WORDS_IN_SENTENCE, + total_byte_size=total_byte_size, + ): + sample_outfile.write(line) + + +def get_datasets(src: str, trg: str, datasets_glob: str): + dataset_paths: list[str] = glob(datasets_glob) + datasets_src: list[Path] = [] + datasets_trg: list[Path] = [] + dataset_paths.sort() + + total_corpus_bytes = 0 + + for dataset in dataset_paths: + path = Path(dataset) + if dataset.endswith(f"{src}.zst"): + datasets_src.append(path) + elif dataset.endswith(f"{trg}.zst"): + datasets_trg.append(path) + else: + raise Exception(f"Dataset does not match naming scheme: {dataset}") + + formatted_size, bytes = get_human_readable_file_size(path) + logger.info(f" - {path} ({formatted_size})") + total_corpus_bytes += bytes + + return datasets_src, datasets_trg, total_corpus_bytes + + +def main() -> None: + parser = argparse.ArgumentParser( + description=__doc__, + # Preserves whitespace in the help text. + formatter_class=argparse.RawTextHelpFormatter, + ) + parser.add_argument( + "--src", + type=str, + help="The source locale", + ) + + parser.add_argument( + "--trg", + type=str, + help="The target locale", + ) + + parser.add_argument( + "--datasets_glob", + type=str, + help="A glob-style path to the mono datasets, e.g. /path/to/*.zst", + ) + + parser.add_argument( + "--max_lines", + type=str, + default="None", + help="The (optionally) maximum number of sentences that will be merged.", + ) + + parser.add_argument( + "--sample_size", type=int, default=10_000, help="Generate a random sample of sentences." + ) + + parser.add_argument( + "--artifacts", + type=Path, + help="The path to the artifacts directory.", + ) + + parser.add_argument( + "--name", + type=str, + help='The final corpus name, e.g. "corpus" will output a "corpus.en.zst" file.', + ) + + args = parser.parse_args() + + datasets_src, datasets_trg, total_corpus_bytes = get_datasets( + args.src, args.trg, args.datasets_glob + ) + + logger.info("Parallel datasets:") + + src_outpath = args.artifacts / f"{args.name}.{args.src}.zst" + trg_outpath = args.artifacts / f"{args.name}.{args.trg}.zst" + + stats = FilteringStatistics(args.artifacts / args.name) + + max_lines: Optional[int] = None + if args.max_lines != "None": + max_lines = int(args.max_lines) + + deduplicate_corpus = DeduplicateCorpus( + datasets_src, + datasets_trg, + src_outpath, + trg_outpath, + stats, + ) + + deduplicate_corpus.run(total_corpus_bytes, max_lines) + + sample_corpus( + artifacts=args.artifacts, + name=args.name, + sample_size=args.sample_size, + src_outpath=src_outpath, + trg_outpath=trg_outpath, + ) + + stats.save_json() + + +if __name__ == "__main__": + main() diff --git a/pipeline/clean/merge-corpus.sh b/pipeline/clean/merge-corpus.sh deleted file mode 100755 index c6985b99..00000000 --- a/pipeline/clean/merge-corpus.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/bin/bash -## -# Merges and deduplicates parallel datasets -# - -set -x -set -euo pipefail - -echo "###### Merging parallel datasets" - -test -v SRC -test -v TRG -test -v BIN - -output_prefix=$1 -inputs=( "${@:2}" ) - -tmp="${output_prefix}/merge" -mkdir -p "${tmp}" - -echo "### Merging" -if [[ "${inputs[0]}" == *.zst ]]; then - cat `echo ${inputs[@]} | tr ' ' '\n' | grep "${SRC}.zst" | tr '\n' ' '` >"${tmp}/corpus.${SRC}.dup.zst" - cat `echo ${inputs[@]} | tr ' ' '\n' | grep "${TRG}.zst" | tr '\n' ' '` >"${tmp}/corpus.${TRG}.dup.zst" -else - cat "${inputs[@]/%/.${SRC}.zst}" >"${tmp}/corpus.${SRC}.dup.zst" - cat "${inputs[@]/%/.${TRG}.zst}" >"${tmp}/corpus.${TRG}.dup.zst" -fi - -# See pipeline/translate/merge-corpus.sh for more information on the deduplication step. - -echo "### Deduplication" -paste <(zstdmt -dc "${tmp}/corpus.${SRC}.dup.zst") <(zstdmt -dc "${tmp}/corpus.${TRG}.dup.zst") | -${BIN}/dedupe | -zstdmt >"${tmp}.${SRC}${TRG}.zst" - -zstdmt -dc "${tmp}.${SRC}${TRG}.zst" | cut -f1 | zstdmt > "${output_prefix}.${SRC}.zst" -zstdmt -dc "${tmp}.${SRC}${TRG}.zst" | cut -f2 | zstdmt > "${output_prefix}.${TRG}.zst" - -rm -rf "${tmp}" - -echo "###### Done: Merging parallel datasets" diff --git a/pipeline/clean/merge-mono.py b/pipeline/clean/merge-mono.py index c41b2143..6dcfd3f0 100644 --- a/pipeline/clean/merge-mono.py +++ b/pipeline/clean/merge-mono.py @@ -150,7 +150,12 @@ def filter_and_write_monolingual_data( log_memory(gc_collect=True) sample_path = output_path.parent / f"{output_path.stem}.sample.txt" logger.info(f"Write a 10,000 line sample of the final: {sample_path}") - with write_lines(sample_path) as outfile: + with write_lines( + sample_path, + # The browser won't know the encoding when viewing this sample without including + # a "byte order mark", which python can do via this encoding. + encoding="utf-8-sig", + ) as outfile: for line in shuffle_with_max_lines( line_stream=final_lines, seed=9834523434, diff --git a/pipeline/clean/requirements/merge-mono.in b/pipeline/clean/requirements/merge.in similarity index 100% rename from pipeline/clean/requirements/merge-mono.in rename to pipeline/clean/requirements/merge.in diff --git a/pipeline/clean/requirements/merge-mono.txt b/pipeline/clean/requirements/merge.txt similarity index 100% rename from pipeline/clean/requirements/merge-mono.txt rename to pipeline/clean/requirements/merge.txt diff --git a/pipeline/common/downloads.py b/pipeline/common/downloads.py index 0ce5ff58..adfb1f67 100644 --- a/pipeline/common/downloads.py +++ b/pipeline/common/downloads.py @@ -336,8 +336,9 @@ class DownloadChunkStreamer(io.IOBase): @contextmanager def _read_lines_multiple_files( files: list[Union[str, Path]], + encoding: str, path_in_archive: Optional[str], - on_enter_location: Optional[Callable[[], None]] = None, + on_enter_location: Optional[Callable[[str], None]] = None, ) -> Generator[str, None, None]: """ Iterates through each line in multiple files, combining it into a single stream. @@ -346,7 +347,9 @@ def _read_lines_multiple_files( def iter(stack: ExitStack): for file_path in files: logger.info(f"Reading lines from: {file_path}") - lines = stack.enter_context(read_lines(file_path, path_in_archive, on_enter_location)) + lines = stack.enter_context( + read_lines(file_path, path_in_archive, on_enter_location, encoding=encoding) + ) yield from lines stack.close() @@ -360,8 +363,9 @@ def _read_lines_multiple_files( @contextmanager def _read_lines_single_file( location: Union[Path, str], + encoding: str, path_in_archive: Optional[str] = None, - on_enter_location: Optional[Callable[[], None]] = None, + on_enter_location: Optional[Callable[[str], None]] = None, ): """ A smart function to efficiently stream lines from a local or remote file. @@ -375,7 +379,7 @@ def _read_lines_single_file( """ location = str(location) if on_enter_location: - on_enter_location() + on_enter_location(location) if location.startswith("http://") or location.startswith("https://"): # If this is mocked for a test, use the locally mocked path. @@ -415,12 +419,12 @@ def _read_lines_single_file( else: # noqa: PLR5501 # This is a local file. if location.endswith(".gz") or location.endswith(".gzip"): - yield stack.enter_context(gzip.open(location, "rt", encoding="utf-8")) + yield stack.enter_context(gzip.open(location, "rt", encoding=encoding)) elif location.endswith(".zst"): input_file = stack.enter_context(open(location, "rb")) zst_reader = stack.enter_context(ZstdDecompressor().stream_reader(input_file)) - yield stack.enter_context(io.TextIOWrapper(zst_reader, encoding="utf-8")) + yield stack.enter_context(io.TextIOWrapper(zst_reader, encoding=encoding)) elif location.endswith(".zip"): if not path_in_archive: @@ -428,11 +432,11 @@ def _read_lines_single_file( zip = stack.enter_context(ZipFile(location, "r")) if path_in_archive not in zip.namelist(): raise Exception(f"Path did not exist in the zip file: {path_in_archive}") - file = stack.enter_context(zip.open(path_in_archive, "r")) - yield stack.enter_context(io.TextIOWrapper(file, encoding="utf-8")) + file = stack.enter_context(zip.open(path_in_archive, "r", encoding=encoding)) + yield stack.enter_context(io.TextIOWrapper(file, encoding=encoding)) else: # Treat as plain text. - yield stack.enter_context(open(location, "rt", encoding="utf-8")) + yield stack.enter_context(open(location, "rt", encoding=encoding)) finally: stack.close() @@ -440,7 +444,8 @@ def _read_lines_single_file( def read_lines( location_or_locations: Union[Path, str, list[Union[str, Path]]], path_in_archive: Optional[str] = None, - on_enter_location: Optional[Callable[[], None]] = None, + on_enter_location: Optional[Callable[[str], None]] = None, + encoding="utf-8", ) -> Generator[str, None, None]: """ A smart function to efficiently stream lines from a local or remote file. @@ -468,17 +473,21 @@ def read_lines( if isinstance(location_or_locations, list): return _read_lines_multiple_files( - location_or_locations, path_in_archive, on_enter_location + location_or_locations, encoding, path_in_archive, on_enter_location ) - return _read_lines_single_file(location_or_locations, path_in_archive, on_enter_location) + return _read_lines_single_file( + location_or_locations, encoding, path_in_archive, on_enter_location + ) @contextmanager -def write_lines(path: Path | str): +def write_lines(path: Path | str, encoding="utf-8"): """ A smart function to create a context to write lines to a file. It works on .zst, .gz, and - raw text files. It reads the extension to determine the file type. + raw text files. It reads the extension to determine the file type. If writing out a raw + text file, for instance a sample of a dataset that is just used for viewing, include a + "byte order mark" so that the browser can properly detect the encoding. with write_lines("output.txt.gz") as output: output.write("writing a line\n") @@ -492,11 +501,11 @@ def write_lines(path: Path | str): if path.endswith(".zst"): file = stack.enter_context(open(path, "wb")) compressor = stack.enter_context(ZstdCompressor().stream_writer(file)) - yield stack.enter_context(io.TextIOWrapper(compressor, encoding="utf-8")) + yield stack.enter_context(io.TextIOWrapper(compressor, encoding=encoding)) elif path.endswith(".gz"): - yield stack.enter_context(gzip.open(path, "wt", encoding="utf-8")) + yield stack.enter_context(gzip.open(path, "wt", encoding=encoding)) else: - yield stack.enter_context(open(path, "wt", encoding="utf-8")) + yield stack.enter_context(open(path, "wt", encoding=encoding)) finally: stack.close() diff --git a/pipeline/data/importers/mono/hplt.py b/pipeline/data/importers/mono/hplt.py index c6f7d185..d60d4a11 100644 --- a/pipeline/data/importers/mono/hplt.py +++ b/pipeline/data/importers/mono/hplt.py @@ -71,7 +71,7 @@ class FilteringStatistics(Statistics): "How many lines were actually written. Smaller lines will be combined together.", ) - def count_shards_visited(self): + def count_shards_visited(self, *_args): self.shards.filtered -= 1 self.shards.kept += 1 diff --git a/taskcluster/configs/config.ci.yml b/taskcluster/configs/config.ci.yml index 74ae879d..7fb3618f 100644 --- a/taskcluster/configs/config.ci.yml +++ b/taskcluster/configs/config.ci.yml @@ -10,11 +10,11 @@ experiment: teacher-ensemble: 1 mono-max-sentences-src: - total: 10000 - per-dataset: 10000 + total: 1000 + per-dataset: 500 mono-max-sentences-trg: - total: 10000 - per-dataset: 10000 + total: 1000 + per-dataset: 500 spm-sample-size: 1000 spm-vocab-size: 1000 @@ -23,6 +23,7 @@ experiment: use-opuscleaner: "true" opuscleaner-mode: "custom" teacher-mode: "two-stage" + corpus-max-sentences: 1000 bicleaner: default-threshold: 0.5 diff --git a/taskcluster/kinds/merge-corpus/kind.yml b/taskcluster/kinds/merge-corpus/kind.yml index 6e0696d5..f4e9a4eb 100644 --- a/taskcluster/kinds/merge-corpus/kind.yml +++ b/taskcluster/kinds/merge-corpus/kind.yml @@ -17,80 +17,81 @@ kind-dependencies: - bicleaner - toolchain -task-defaults: - attributes: - src_locale: "{src_locale}" - trg_locale: "{trg_locale}" - cache: - resources: - - pipeline/clean/merge-corpus.sh - task-context: - from-parameters: - src_locale: training_config.experiment.src - trg_locale: training_config.experiment.trg - substitution-fields: - - name - - label - - description - - worker.env - - dependencies - - fetches - - attributes - - run.command - upstreams-config: - upstream-artifacts: - - "{dataset_sanitized}.{src_locale}.zst" - - "{dataset_sanitized}.{trg_locale}.zst" - worker-type: b-cpu-largedisk - worker: - docker-image: {"in-tree": "train"} - max-run-time: 86400 - artifacts: - - name: public/build - path: /builds/worker/artifacts - type: directory - env: - SRC: "{src_locale}" - TRG: "{trg_locale}" - # 128 happens when cloning this repository fails - retry-exit-status: [128] - - # Don't run unless explicitly scheduled - run-on-tasks-for: [] - - run: - using: run-task - command: - - bash - - -c - # Arguments are: - # 1) output prefix - # 2) input files - - >- - export BIN=$MOZ_FETCHES_DIR && - $VCS_PATH/pipeline/clean/merge-corpus.sh - $TASK_WORKDIR/artifacts/{artifact_prefix} - $MOZ_FETCHES_DIR/*.zst - fetches: - toolchain: - - preprocess - tasks: merge-corpus: label: merge-corpus-{src_locale}-{trg_locale} description: merge corpus for {src_locale}-{trg_locale} + + worker-type: b-cpu-largedisk + worker: + docker-image: {"in-tree": "train"} + max-run-time: 86400 + artifacts: + - name: public/build + path: /builds/worker/artifacts + type: directory + env: + SRC: "{src_locale}" + TRG: "{trg_locale}" + # 128 happens when cloning this repository fails + retry-exit-status: [128] + + # Don't run unless explicitly scheduled + run-on-tasks-for: [] + attributes: + src_locale: "{src_locale}" + trg_locale: "{trg_locale}" dataset-category: train stage: merge-corpus cache: type: merge-corpus + from-parameters: + max_sentences: + - training_config.experiment.corpus-max-sentences + resources: + - pipeline/clean/merge-corpus.py + - pipeline/clean/requirements/merge.txt + + task-context: + from-parameters: + src_locale: training_config.experiment.src + trg_locale: training_config.experiment.trg + max_sentences: training_config.experiment.corpus-max-sentences + substitution-fields: + - name + - label + - description + - worker.env + - dependencies + - fetches + - attributes + - run.command upstreams-config: + upstream-artifacts: + - "{dataset_sanitized}.{src_locale}.zst" + - "{dataset_sanitized}.{trg_locale}.zst" upstream-task-attributes: cleaning-type: by-cleaning-type: bicleaner-ai: bicleaner-ai - task-context: - from-object: - artifact_prefix: corpus + run: + using: run-task + command: + - bash + - -c + - >- + pip install -r $VCS_PATH/pipeline/clean/requirements/merge.txt && + export PYTHONPATH=$PYTHONPATH:$VCS_PATH && + python3 $VCS_PATH/pipeline/clean/merge-corpus.py + --src {src_locale} + --trg {trg_locale} + --artifacts $TASK_WORKDIR/artifacts + --name corpus + --max_lines {max_sentences} + --datasets_glob "$MOZ_FETCHES_DIR/*.zst" + fetches: + toolchain: + - preprocess diff --git a/taskcluster/kinds/merge-devset/kind.yml b/taskcluster/kinds/merge-devset/kind.yml index ba64cc8a..61cfd000 100644 --- a/taskcluster/kinds/merge-devset/kind.yml +++ b/taskcluster/kinds/merge-devset/kind.yml @@ -17,78 +17,79 @@ kind-dependencies: - dataset - toolchain -task-defaults: - attributes: - src_locale: "{src_locale}" - trg_locale: "{trg_locale}" - cache: - resources: - - pipeline/clean/merge-corpus.sh - task-context: - from-parameters: - src_locale: training_config.experiment.src - trg_locale: training_config.experiment.trg - substitution-fields: - - name - - label - - description - - worker.env - - dependencies - - fetches - - attributes - - run.command - upstreams-config: - upstream-artifacts: - - "{dataset_sanitized}.{src_locale}.zst" - - "{dataset_sanitized}.{trg_locale}.zst" - worker-type: b-cpu - worker: - docker-image: {"in-tree": "train"} - max-run-time: 86400 - artifacts: - - name: public/build - path: /builds/worker/artifacts - type: directory - env: - SRC: "{src_locale}" - TRG: "{trg_locale}" - # 128 happens when cloning this repository fails - retry-exit-status: [128] - - # Don't run unless explicitly scheduled - run-on-tasks-for: [] - - run: - using: run-task - command: - - bash - - -c - # Arguments are: - # 1) output prefix - # 2) input files - - >- - export BIN=$MOZ_FETCHES_DIR && - $VCS_PATH/pipeline/clean/merge-corpus.sh - $TASK_WORKDIR/artifacts/{artifact_prefix} - $MOZ_FETCHES_DIR/*.zst - fetches: - toolchain: - - preprocess - tasks: merge-devset: label: merge-devset-{src_locale}-{trg_locale} description: merge devset for {src_locale}-{trg_locale} + + worker-type: b-cpu + worker: + docker-image: {"in-tree": "train"} + max-run-time: 86400 + artifacts: + - name: public/build + path: /builds/worker/artifacts + type: directory + env: + SRC: "{src_locale}" + TRG: "{trg_locale}" + # 128 happens when cloning this repository fails + retry-exit-status: [128] + + # Don't run unless explicitly scheduled + run-on-tasks-for: [] + attributes: + src_locale: "{src_locale}" + trg_locale: "{trg_locale}" dataset-category: devtest stage: merge-devset cache: type: merge-devset + from-parameters: + max_sentences: + - training_config.experiment.corpus-max-sentences + resources: + - pipeline/clean/merge-corpus.py + - pipeline/clean/requirements/merge.txt + + task-context: + from-parameters: + src_locale: training_config.experiment.src + trg_locale: training_config.experiment.trg + max_sentences: training_config.experiment.corpus-max-sentences + substitution-fields: + - name + - label + - description + - worker.env + - dependencies + - fetches + - attributes + - run.command upstreams-config: + upstream-artifacts: + - "{dataset_sanitized}.{src_locale}.zst" + - "{dataset_sanitized}.{trg_locale}.zst" upstream-task-attributes: kind: dataset - task-context: - from-object: - artifact_prefix: devset + run: + using: run-task + command: + - bash + - -c + - >- + pip install -r $VCS_PATH/pipeline/clean/requirements/merge.txt && + export PYTHONPATH=$PYTHONPATH:$VCS_PATH && + python3 $VCS_PATH/pipeline/clean/merge-corpus.py + --src {src_locale} + --trg {trg_locale} + --artifacts $TASK_WORKDIR/artifacts + --name devset + --max_lines {max_sentences} + --datasets_glob "$MOZ_FETCHES_DIR/*.zst" + fetches: + toolchain: + - preprocess diff --git a/taskcluster/kinds/merge-mono/kind.yml b/taskcluster/kinds/merge-mono/kind.yml index 9b04a7d1..f6aa50ad 100644 --- a/taskcluster/kinds/merge-mono/kind.yml +++ b/taskcluster/kinds/merge-mono/kind.yml @@ -26,7 +26,7 @@ task-defaults: type: merge-mono resources: - pipeline/clean/merge-mono.py - - pipeline/clean/requirements/merge-mono.txt + - pipeline/clean/requirements/merge.txt task-context: from-parameters: @@ -77,7 +77,7 @@ task-defaults: # 2) max_sentences # 3) datasets - >- - pip install -r $VCS_PATH/pipeline/clean/requirements/merge-mono.txt && + pip install -r $VCS_PATH/pipeline/clean/requirements/merge.txt && export PYTHONPATH=$PYTHONPATH:$VCS_PATH && python3 $VCS_PATH/pipeline/clean/merge-mono.py --parallel_corpus $MOZ_FETCHES_DIR/corpus/corpus.{locale}.zst diff --git a/taskcluster/translations_taskgraph/parameters.py b/taskcluster/translations_taskgraph/parameters.py index e44749a5..92848391 100644 --- a/taskcluster/translations_taskgraph/parameters.py +++ b/taskcluster/translations_taskgraph/parameters.py @@ -37,6 +37,7 @@ extend_parameters_schema( Required("trg"): str, Required("teacher-ensemble"): int, Required("teacher-mode"): str, + Optional("corpus-max-sentences"): int, Required("mono-max-sentences-trg"): { Required("total"): int, Required("per-dataset"): int, diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py index 05286c83..809c7c88 100644 --- a/tests/fixtures/__init__.py +++ b/tests/fixtures/__init__.py @@ -124,6 +124,7 @@ class DataDir: fetches_dir: Optional[str] = None, env: dict[str, str] = {}, extra_args: List[str] = None, + replace_args: List[str] = None, ): """ Runs a task from the taskgraph. See artifacts/full-task-graph.json after running a @@ -163,6 +164,12 @@ class DataDir: if extra_args: command_parts_split.extend(extra_args) + if replace_args: + for arg_from, arg_to in replace_args: + for index, arg in enumerate(command_parts_split): + if arg == arg_from: + command_parts_split[index] = arg_to + final_env = { # The following are set by the Taskcluster server. "TASK_ID": "fake_id", @@ -206,26 +213,30 @@ class DataDir: command_parts_split[index] = part # If using a venv, prepend the binary directory to the path so it is used. - python_bin_dir, venv_dir = get_python_dirs(requirements) - if python_bin_dir: - final_env = {**final_env, "PATH": f'{python_bin_dir}:{os.environ.get("PATH", "")}'} - if command_parts_split[0].endswith(".py"): - # This script is relying on a shebang, add the python3 from the executable instead. - command_parts_split.insert(0, os.path.join(python_bin_dir, "python3")) - elif command_parts_split[0].endswith(".py"): - # This script does not require a virtual environment. - command_parts_split.insert(0, "python3") + if requirements: + python_bin_dir, venv_dir = get_python_dirs(requirements) + if python_bin_dir: + final_env = { + **final_env, + "PATH": f'{python_bin_dir}:{os.environ.get("PATH", "")}', + } + if command_parts_split[0].endswith(".py"): + # This script is relying on a shebang, add the python3 from the executable instead. + command_parts_split.insert(0, os.path.join(python_bin_dir, "python3")) + elif command_parts_split[0].endswith(".py"): + # This script does not require a virtual environment. + command_parts_split.insert(0, "python3") - # We have to set the path to the C++ lib before the process is started - # https://github.com/Helsinki-NLP/opus-fast-mosestokenizer/issues/6 - with open(requirements) as f: - reqs_txt = f.read() - if "opus-fast-mosestokenizer" in reqs_txt: - lib_path = os.path.join( - venv_dir, "lib/python3.10/site-packages/mosestokenizer/lib" - ) - print(f"Setting LD_LIBRARY_PATH to {lib_path}") - final_env["LD_LIBRARY_PATH"] = lib_path + # We have to set the path to the C++ lib before the process is started + # https://github.com/Helsinki-NLP/opus-fast-mosestokenizer/issues/6 + with open(requirements) as f: + reqs_txt = f.read() + if venv_dir and "opus-fast-mosestokenizer" in reqs_txt: + lib_path = os.path.join( + venv_dir, "lib/python3.10/site-packages/mosestokenizer/lib" + ) + print(f"Setting LD_LIBRARY_PATH to {lib_path}") + final_env["LD_LIBRARY_PATH"] = lib_path print("┌──────────────────────────────────────────────────────────") print("│ run_task:", " ".join(command_parts_split)) @@ -512,14 +523,12 @@ def get_mocked_downloads() -> str: ) # fmt: skip -def get_python_dirs(requirements: Optional[str]) -> Optional[Tuple[str, str]]: +def get_python_dirs(requirements: str) -> Tuple[str, str]: """ Creates a virtual environment for each requirements file that a task needs. The virtual environment is hashed based on the requirements file contents, and the system details. This way a virtual environment will be re-used between docker environments. """ - if not requirements: - return None system_details = "-".join( [ diff --git a/tests/test_merge_corpus.py b/tests/test_merge_corpus.py new file mode 100644 index 00000000..ce9d3b70 --- /dev/null +++ b/tests/test_merge_corpus.py @@ -0,0 +1,219 @@ +import json +import pytest +from fixtures import DataDir + +from pipeline.common.downloads import read_lines + +ada = [ + ("ADA 1", "АДА 1"), + ("ADA 2", "АДА 2"), + ("ADA 3", "АДА 3"), + ("SHARED 1", "ШАРЕД 1"), + ("SHARED 2", "ШАРЕД 2"), + ("ADA 4", "АДА 4"), + ("ADA 5", "АДА 5"), +] +wiki = [ + ("WIKI 1", "WИКИ 1"), + ("WIKI 2", "WИКИ 2"), + ("SHARED 3", "ШАРЕД 3"), + ("SHARED 4", "ШАРЕД 4"), + ("WIKI 3", "WИКИ 3"), + ("SHARED 1", "ШАРЕД 1"), + ("WIKI 4", "WИКИ 4"), +] +web_acquired = [ + ("WEB_ACQUIRED 1", "WЕБ_АЦQУИРЕД 1"), + ("WEB_ACQUIRED 2", "WЕБ_АЦQУИРЕД 2"), + ("SHARED 3", "ШАРЕД 3"), + ("SHARED 4", "ШАРЕД 4"), + ("WEB_ACQUIRED 3", "WЕБ_АЦQУИРЕД 3"), + ("SHARED 2", "ШАРЕД 2"), + ("WEB_ACQUIRED 4", "WЕБ_АЦQУИРЕД 4"), +] + + +def build_dataset_contents(lines: list[tuple[str, str]], index): + return "\n".join([line[index] for line in lines]) + "\n" + + +@pytest.fixture(scope="function") +def data_dir(): + data_dir = DataDir("test_merge_corpus") + data_dir.mkdir("artifacts") + data_dir.create_zst("ada83_v1.en.zst", build_dataset_contents(ada, 0)) + data_dir.create_zst("ada83_v1.ru.zst", build_dataset_contents(ada, 1)) + data_dir.create_zst("ELRC-3075-wikipedia_health_v1.en.zst", build_dataset_contents(wiki, 0)) + data_dir.create_zst("ELRC-3075-wikipedia_health_v1.ru.zst", build_dataset_contents(wiki, 1)) + data_dir.create_zst("ELRC-web_acquired_data.en.zst", build_dataset_contents(web_acquired, 0)) + data_dir.create_zst("ELRC-web_acquired_data.ru.zst", build_dataset_contents(web_acquired, 1)) + return data_dir + + +def assert_dataset(data_dir: DataDir, path: str, sorted_lines: list[str]): + with read_lines(data_dir.join(path)) as lines_iter: + # Sort the dataset, as the sorted lines are easier to scan and reason with. + # The datasets is still checked to be shuffled by comparing the original + # and sorted lines. + corpus_lines = list(lines_iter) + corpus_lines_sorted = list(corpus_lines) + corpus_lines_sorted.sort() + + assert corpus_lines_sorted == sorted_lines + assert corpus_lines != corpus_lines_sorted, "The results are shuffled." + + +@pytest.mark.parametrize( + "name", + ["corpus", "devset"], +) +def test_merge_corpus(data_dir, name): + data_dir.run_task( + # Tasks merge-corpus-en-ru, and merge-devset-en-ru. + f"merge-{name}-en-ru", + ) + data_dir.print_tree() + assert_dataset( + data_dir, + f"artifacts/{name}.en.zst", + sorted_lines=[ + "ADA 1\n", + "ADA 2\n", + "ADA 3\n", + "ADA 4\n", + "ADA 5\n", + "SHARED 1\n", + "SHARED 2\n", + "SHARED 3\n", + "SHARED 4\n", + "WEB_ACQUIRED 1\n", + "WEB_ACQUIRED 2\n", + "WEB_ACQUIRED 3\n", + "WEB_ACQUIRED 4\n", + "WIKI 1\n", + "WIKI 2\n", + "WIKI 3\n", + "WIKI 4\n", + ], + ) + + assert_dataset( + data_dir, + f"artifacts/{name}.ru.zst", + sorted_lines=[ + "WЕБ_АЦQУИРЕД 1\n", + "WЕБ_АЦQУИРЕД 2\n", + "WЕБ_АЦQУИРЕД 3\n", + "WЕБ_АЦQУИРЕД 4\n", + "WИКИ 1\n", + "WИКИ 2\n", + "WИКИ 3\n", + "WИКИ 4\n", + "АДА 1\n", + "АДА 2\n", + "АДА 3\n", + "АДА 4\n", + "АДА 5\n", + "ШАРЕД 1\n", + "ШАРЕД 2\n", + "ШАРЕД 3\n", + "ШАРЕД 4\n", + ], + ) + + assert json.loads(data_dir.load(f"artifacts/{name}.stats.json")) == { + "parallel_corpus": { + "description": "The parallel corpora are merged and deduplicated", + "filtered": 4, + "kept": 17, + "visited": 21, + }, + "final_truncated": { + "description": "The final result can be truncated by max_lines", + "filtered": 0, + "kept": 17, + "visited": 17, + }, + "datasets": [ + { + "description": "ELRC-3075-wikipedia_health_v1", + "filtered": 0, + "kept": 7, + "visited": 7, + }, + {"description": "ELRC-web_acquired_data", "filtered": 2, "kept": 5, "visited": 7}, + {"description": "ada83_v1", "filtered": 2, "kept": 5, "visited": 7}, + ], + } + + +@pytest.mark.parametrize( + "name", + ["corpus", "devset"], +) +def test_merge_devset_trimmed(data_dir, name): + data_dir.run_task( + # Tasks merge-corpus-en-ru, and merge-devset-en-ru. + f"merge-{name}-en-ru", + # Replace the max_sentences. + replace_args=[("None", "10")], + ) + data_dir.print_tree() + assert_dataset( + data_dir, + f"artifacts/{name}.en.zst", + sorted_lines=[ + "ADA 1\n", + "ADA 3\n", + "ADA 4\n", + "ADA 5\n", + "SHARED 1\n", + "SHARED 2\n", + "SHARED 3\n", + "WIKI 1\n", + "WIKI 2\n", + "WIKI 4\n", + ], + ) + + assert_dataset( + data_dir, + f"artifacts/{name}.ru.zst", + sorted_lines=[ + "WИКИ 1\n", + "WИКИ 2\n", + "WИКИ 4\n", + "АДА 1\n", + "АДА 3\n", + "АДА 4\n", + "АДА 5\n", + "ШАРЕД 1\n", + "ШАРЕД 2\n", + "ШАРЕД 3\n", + ], + ) + + assert json.loads(data_dir.load(f"artifacts/{name}.stats.json")) == { + "parallel_corpus": { + "description": "The parallel corpora are merged and deduplicated", + "filtered": 4, + "kept": 17, + "visited": 21, + }, + "final_truncated": { + "description": "The final result can be truncated by max_lines", + "filtered": 7, + "kept": 10, + "visited": 17, + }, + "datasets": [ + { + "description": "ELRC-3075-wikipedia_health_v1", + "filtered": 0, + "kept": 7, + "visited": 7, + }, + {"description": "ELRC-web_acquired_data", "filtered": 2, "kept": 5, "visited": 7}, + {"description": "ada83_v1", "filtered": 2, "kept": 5, "visited": 7}, + ], + } diff --git a/tests/test_merge_mono.py b/tests/test_merge_mono.py index fe7d2b2b..4778d65f 100644 --- a/tests/test_merge_mono.py +++ b/tests/test_merge_mono.py @@ -103,7 +103,9 @@ def test_merge_mono(task: str): assert mono_lines != mono_lines_sorted, "The results are shuffled." - with read_lines(data_dir.join(f"artifacts/mono.{locale}.sample.txt")) as lines_iter: + with read_lines( + data_dir.join(f"artifacts/mono.{locale}.sample.txt"), encoding="utf-8-sig" + ) as lines_iter: samples = list(lines_iter) assert len(samples) == sample_size, "There are the expected number of samples" assert len(set(samples)) == sample_size, "All of the samples are are unique."