Merge corpus rewrite to python (#851)

This commit is contained in:
Greg Tatum 2024-10-17 13:54:31 -05:00 коммит произвёл GitHub
Родитель db413298b7
Коммит f1668c1a1c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
15 изменённых файлов: 736 добавлений и 212 удалений

Просмотреть файл

@ -0,0 +1,318 @@
"""
Merges multiple corpora into a single "source" language file, and a single "target"
language file, each.
For instance:
dataset1.en.zst dataset1.ru.zst
dataset2.en.zst dataset2.ru.zst
dataset3.en.zst dataset3.ru.zst
Gets merged into:
corpus.en.zst
corpus.ru.zst
"""
import argparse
from contextlib import ExitStack
from glob import glob
from pathlib import Path
from typing import Generator, Optional
from pipeline.common.datasets import (
FilteringStep,
Statistics,
WeakStringSet,
shuffle_with_max_lines,
)
from pipeline.common.downloads import get_human_readable_file_size, read_lines, write_lines
from pipeline.common.logging import get_logger
logger = get_logger(__file__)
# TODO(CJK) - Issue #424
MAX_WORDS_IN_SENTENCE = 100
class FilteringStatistics(Statistics):
"""
Gather statistics about the filtering process.
"""
def __init__(self, dataset_path: Path) -> None:
super().__init__(dataset_path)
self.parallel_corpus = FilteringStep(
"The parallel corpora are merged and deduplicated",
)
self.final_truncated = FilteringStep("The final result can be truncated by max_lines")
self.datasets = []
def add_parallel_dataset(self, location: str):
# e.g. /path/to/ada83_v1.en.zst
path = Path(location)
# e.g. ada83_v1
dataset_stem = Path(path.stem).stem
step = FilteringStep(dataset_stem)
self.datasets.append(step)
return step
def log_dataset(location: str):
logger.info(f"Reading dataset {location}")
class DeduplicateCorpus:
def __init__(
self,
datasets_src: list[Path],
datasets_trg: list[Path],
src_outpath: Path,
trg_outpath: Path,
stats: FilteringStatistics,
) -> None:
self.datasets_src: list[Path] = datasets_src
self.datasets_trg: list[Path] = datasets_trg
self.src_outpath: Path = src_outpath
self.trg_outpath: Path = trg_outpath
self.stats: FilteringStatistics = stats
self.dataset_stats: FilteringStep = None
def run(
self,
total_corpus_bytes: int,
max_lines: Optional[int],
):
stats = self.stats
with ExitStack() as stack:
src_outfile = stack.enter_context(write_lines(self.src_outpath))
trg_outfile = stack.enter_context(write_lines(self.trg_outpath))
if max_lines:
for line in shuffle_with_max_lines(
line_stream=self.yield_lines_string(stack),
seed=38540735095,
max_lines=max_lines,
max_words_in_sentence=MAX_WORDS_IN_SENTENCE,
total_byte_size=total_corpus_bytes,
):
src_line, trg_line = line.split("\t")
src_outfile.write(src_line)
trg_outfile.write(trg_line)
stats.final_truncated.visited = stats.parallel_corpus.kept
stats.final_truncated.kept = min(max_lines, stats.parallel_corpus.kept)
else:
for src_line, trg_line in self.yield_lines_tuple(stack):
src_outfile.write(src_line)
trg_outfile.write(trg_line)
stats.final_truncated.kept = stats.parallel_corpus.kept
stats.final_truncated.visited = stats.parallel_corpus.kept
def yield_lines_tuple(self, stack: ExitStack) -> Generator[tuple[str, str], None, None]:
strings_seen = WeakStringSet()
stats = self.stats
src_lines: Generator[str, None, None] = stack.enter_context(
read_lines(self.datasets_src, on_enter_location=self.on_enter_location)
)
trg_lines: Generator[str, None, None] = stack.enter_context(
read_lines(self.datasets_trg, on_enter_location=log_dataset)
)
for src_line, trg_line in zip(src_lines, trg_lines):
# No separator is needed as the newline is included.
line = src_line + trg_line
if line in strings_seen:
stats.parallel_corpus.filtered += 1
self.dataset_stats.filtered += 1
else:
stats.parallel_corpus.kept += 1
self.dataset_stats.kept += 1
strings_seen.add(line)
yield src_line, trg_line
def yield_lines_string(self, stack: ExitStack) -> Generator[str, None, None]:
for src_line, trg_line in self.yield_lines_tuple(stack):
if "\t" in src_line or "\t" in trg_line:
logger.error("A line contained a tab character, skipping:")
logger.error(f" src: {src_line}")
logger.error(f" trg: {src_line}")
else:
yield f"{src_line}\t{trg_line}"
def on_enter_location(self, location):
log_dataset(location)
self.dataset_stats = self.stats.add_parallel_dataset(location)
def sample_corpus(
artifacts: Path, name: str, sample_size: int, src_outpath: Path, trg_outpath: Path
):
"""
Generate a sample of the corpus data with the following format:
e.g.
> cat artifacts/corpus.sample.txt
Sentence 1 in source language
Sentence 1 in target language
Sentence 2 in source language
Sentence 2 in target language
Sentence 3 in source language
Sentence 3 in target language
...
"""
total_byte_size = src_outpath.stat().st_size + trg_outpath.stat().st_size
with ExitStack() as stack:
sample_path = artifacts / f"{name}.sample.txt"
src_lines = stack.enter_context(read_lines(src_outpath))
trg_lines = stack.enter_context(read_lines(trg_outpath))
sample_outfile = stack.enter_context(
write_lines(
sample_path,
# The browser won't know the encoding when viewing this sample without including
# a "byte order mark", which python can do via this encoding.
encoding="utf-8-sig",
)
)
def join_src_trg():
for src_line, trg_line in zip(src_lines, trg_lines):
# The src and trg line each have a newline at the end. This means that
# each sentence pair will be separate by a blank line to make for easy
# scanning of datasets.
yield f"{src_line}{trg_line}\n"
logger.info("Stream in:")
logger.info(f" - {src_outpath}")
logger.info(f" - {trg_outpath}")
logger.info(f"Write a {sample_size:,} line sample of the merged corpus:")
logger.info(f" - {sample_path}")
for line in shuffle_with_max_lines(
line_stream=join_src_trg(),
seed=9834523434,
max_lines=sample_size,
max_words_in_sentence=MAX_WORDS_IN_SENTENCE,
total_byte_size=total_byte_size,
):
sample_outfile.write(line)
def get_datasets(src: str, trg: str, datasets_glob: str):
dataset_paths: list[str] = glob(datasets_glob)
datasets_src: list[Path] = []
datasets_trg: list[Path] = []
dataset_paths.sort()
total_corpus_bytes = 0
for dataset in dataset_paths:
path = Path(dataset)
if dataset.endswith(f"{src}.zst"):
datasets_src.append(path)
elif dataset.endswith(f"{trg}.zst"):
datasets_trg.append(path)
else:
raise Exception(f"Dataset does not match naming scheme: {dataset}")
formatted_size, bytes = get_human_readable_file_size(path)
logger.info(f" - {path} ({formatted_size})")
total_corpus_bytes += bytes
return datasets_src, datasets_trg, total_corpus_bytes
def main() -> None:
parser = argparse.ArgumentParser(
description=__doc__,
# Preserves whitespace in the help text.
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--src",
type=str,
help="The source locale",
)
parser.add_argument(
"--trg",
type=str,
help="The target locale",
)
parser.add_argument(
"--datasets_glob",
type=str,
help="A glob-style path to the mono datasets, e.g. /path/to/*.zst",
)
parser.add_argument(
"--max_lines",
type=str,
default="None",
help="The (optionally) maximum number of sentences that will be merged.",
)
parser.add_argument(
"--sample_size", type=int, default=10_000, help="Generate a random sample of sentences."
)
parser.add_argument(
"--artifacts",
type=Path,
help="The path to the artifacts directory.",
)
parser.add_argument(
"--name",
type=str,
help='The final corpus name, e.g. "corpus" will output a "corpus.en.zst" file.',
)
args = parser.parse_args()
datasets_src, datasets_trg, total_corpus_bytes = get_datasets(
args.src, args.trg, args.datasets_glob
)
logger.info("Parallel datasets:")
src_outpath = args.artifacts / f"{args.name}.{args.src}.zst"
trg_outpath = args.artifacts / f"{args.name}.{args.trg}.zst"
stats = FilteringStatistics(args.artifacts / args.name)
max_lines: Optional[int] = None
if args.max_lines != "None":
max_lines = int(args.max_lines)
deduplicate_corpus = DeduplicateCorpus(
datasets_src,
datasets_trg,
src_outpath,
trg_outpath,
stats,
)
deduplicate_corpus.run(total_corpus_bytes, max_lines)
sample_corpus(
artifacts=args.artifacts,
name=args.name,
sample_size=args.sample_size,
src_outpath=src_outpath,
trg_outpath=trg_outpath,
)
stats.save_json()
if __name__ == "__main__":
main()

Просмотреть файл

@ -1,42 +0,0 @@
#!/bin/bash
##
# Merges and deduplicates parallel datasets
#
set -x
set -euo pipefail
echo "###### Merging parallel datasets"
test -v SRC
test -v TRG
test -v BIN
output_prefix=$1
inputs=( "${@:2}" )
tmp="${output_prefix}/merge"
mkdir -p "${tmp}"
echo "### Merging"
if [[ "${inputs[0]}" == *.zst ]]; then
cat `echo ${inputs[@]} | tr ' ' '\n' | grep "${SRC}.zst" | tr '\n' ' '` >"${tmp}/corpus.${SRC}.dup.zst"
cat `echo ${inputs[@]} | tr ' ' '\n' | grep "${TRG}.zst" | tr '\n' ' '` >"${tmp}/corpus.${TRG}.dup.zst"
else
cat "${inputs[@]/%/.${SRC}.zst}" >"${tmp}/corpus.${SRC}.dup.zst"
cat "${inputs[@]/%/.${TRG}.zst}" >"${tmp}/corpus.${TRG}.dup.zst"
fi
# See pipeline/translate/merge-corpus.sh for more information on the deduplication step.
echo "### Deduplication"
paste <(zstdmt -dc "${tmp}/corpus.${SRC}.dup.zst") <(zstdmt -dc "${tmp}/corpus.${TRG}.dup.zst") |
${BIN}/dedupe |
zstdmt >"${tmp}.${SRC}${TRG}.zst"
zstdmt -dc "${tmp}.${SRC}${TRG}.zst" | cut -f1 | zstdmt > "${output_prefix}.${SRC}.zst"
zstdmt -dc "${tmp}.${SRC}${TRG}.zst" | cut -f2 | zstdmt > "${output_prefix}.${TRG}.zst"
rm -rf "${tmp}"
echo "###### Done: Merging parallel datasets"

Просмотреть файл

@ -150,7 +150,12 @@ def filter_and_write_monolingual_data(
log_memory(gc_collect=True)
sample_path = output_path.parent / f"{output_path.stem}.sample.txt"
logger.info(f"Write a 10,000 line sample of the final: {sample_path}")
with write_lines(sample_path) as outfile:
with write_lines(
sample_path,
# The browser won't know the encoding when viewing this sample without including
# a "byte order mark", which python can do via this encoding.
encoding="utf-8-sig",
) as outfile:
for line in shuffle_with_max_lines(
line_stream=final_lines,
seed=9834523434,

Просмотреть файл

@ -336,8 +336,9 @@ class DownloadChunkStreamer(io.IOBase):
@contextmanager
def _read_lines_multiple_files(
files: list[Union[str, Path]],
encoding: str,
path_in_archive: Optional[str],
on_enter_location: Optional[Callable[[], None]] = None,
on_enter_location: Optional[Callable[[str], None]] = None,
) -> Generator[str, None, None]:
"""
Iterates through each line in multiple files, combining it into a single stream.
@ -346,7 +347,9 @@ def _read_lines_multiple_files(
def iter(stack: ExitStack):
for file_path in files:
logger.info(f"Reading lines from: {file_path}")
lines = stack.enter_context(read_lines(file_path, path_in_archive, on_enter_location))
lines = stack.enter_context(
read_lines(file_path, path_in_archive, on_enter_location, encoding=encoding)
)
yield from lines
stack.close()
@ -360,8 +363,9 @@ def _read_lines_multiple_files(
@contextmanager
def _read_lines_single_file(
location: Union[Path, str],
encoding: str,
path_in_archive: Optional[str] = None,
on_enter_location: Optional[Callable[[], None]] = None,
on_enter_location: Optional[Callable[[str], None]] = None,
):
"""
A smart function to efficiently stream lines from a local or remote file.
@ -375,7 +379,7 @@ def _read_lines_single_file(
"""
location = str(location)
if on_enter_location:
on_enter_location()
on_enter_location(location)
if location.startswith("http://") or location.startswith("https://"):
# If this is mocked for a test, use the locally mocked path.
@ -415,12 +419,12 @@ def _read_lines_single_file(
else: # noqa: PLR5501
# This is a local file.
if location.endswith(".gz") or location.endswith(".gzip"):
yield stack.enter_context(gzip.open(location, "rt", encoding="utf-8"))
yield stack.enter_context(gzip.open(location, "rt", encoding=encoding))
elif location.endswith(".zst"):
input_file = stack.enter_context(open(location, "rb"))
zst_reader = stack.enter_context(ZstdDecompressor().stream_reader(input_file))
yield stack.enter_context(io.TextIOWrapper(zst_reader, encoding="utf-8"))
yield stack.enter_context(io.TextIOWrapper(zst_reader, encoding=encoding))
elif location.endswith(".zip"):
if not path_in_archive:
@ -428,11 +432,11 @@ def _read_lines_single_file(
zip = stack.enter_context(ZipFile(location, "r"))
if path_in_archive not in zip.namelist():
raise Exception(f"Path did not exist in the zip file: {path_in_archive}")
file = stack.enter_context(zip.open(path_in_archive, "r"))
yield stack.enter_context(io.TextIOWrapper(file, encoding="utf-8"))
file = stack.enter_context(zip.open(path_in_archive, "r", encoding=encoding))
yield stack.enter_context(io.TextIOWrapper(file, encoding=encoding))
else:
# Treat as plain text.
yield stack.enter_context(open(location, "rt", encoding="utf-8"))
yield stack.enter_context(open(location, "rt", encoding=encoding))
finally:
stack.close()
@ -440,7 +444,8 @@ def _read_lines_single_file(
def read_lines(
location_or_locations: Union[Path, str, list[Union[str, Path]]],
path_in_archive: Optional[str] = None,
on_enter_location: Optional[Callable[[], None]] = None,
on_enter_location: Optional[Callable[[str], None]] = None,
encoding="utf-8",
) -> Generator[str, None, None]:
"""
A smart function to efficiently stream lines from a local or remote file.
@ -468,17 +473,21 @@ def read_lines(
if isinstance(location_or_locations, list):
return _read_lines_multiple_files(
location_or_locations, path_in_archive, on_enter_location
location_or_locations, encoding, path_in_archive, on_enter_location
)
return _read_lines_single_file(location_or_locations, path_in_archive, on_enter_location)
return _read_lines_single_file(
location_or_locations, encoding, path_in_archive, on_enter_location
)
@contextmanager
def write_lines(path: Path | str):
def write_lines(path: Path | str, encoding="utf-8"):
"""
A smart function to create a context to write lines to a file. It works on .zst, .gz, and
raw text files. It reads the extension to determine the file type.
raw text files. It reads the extension to determine the file type. If writing out a raw
text file, for instance a sample of a dataset that is just used for viewing, include a
"byte order mark" so that the browser can properly detect the encoding.
with write_lines("output.txt.gz") as output:
output.write("writing a line\n")
@ -492,11 +501,11 @@ def write_lines(path: Path | str):
if path.endswith(".zst"):
file = stack.enter_context(open(path, "wb"))
compressor = stack.enter_context(ZstdCompressor().stream_writer(file))
yield stack.enter_context(io.TextIOWrapper(compressor, encoding="utf-8"))
yield stack.enter_context(io.TextIOWrapper(compressor, encoding=encoding))
elif path.endswith(".gz"):
yield stack.enter_context(gzip.open(path, "wt", encoding="utf-8"))
yield stack.enter_context(gzip.open(path, "wt", encoding=encoding))
else:
yield stack.enter_context(open(path, "wt", encoding="utf-8"))
yield stack.enter_context(open(path, "wt", encoding=encoding))
finally:
stack.close()

Просмотреть файл

@ -71,7 +71,7 @@ class FilteringStatistics(Statistics):
"How many lines were actually written. Smaller lines will be combined together.",
)
def count_shards_visited(self):
def count_shards_visited(self, *_args):
self.shards.filtered -= 1
self.shards.kept += 1

Просмотреть файл

@ -10,11 +10,11 @@ experiment:
teacher-ensemble: 1
mono-max-sentences-src:
total: 10000
per-dataset: 10000
total: 1000
per-dataset: 500
mono-max-sentences-trg:
total: 10000
per-dataset: 10000
total: 1000
per-dataset: 500
spm-sample-size: 1000
spm-vocab-size: 1000
@ -23,6 +23,7 @@ experiment:
use-opuscleaner: "true"
opuscleaner-mode: "custom"
teacher-mode: "two-stage"
corpus-max-sentences: 1000
bicleaner:
default-threshold: 0.5

Просмотреть файл

@ -17,80 +17,81 @@ kind-dependencies:
- bicleaner
- toolchain
task-defaults:
attributes:
src_locale: "{src_locale}"
trg_locale: "{trg_locale}"
cache:
resources:
- pipeline/clean/merge-corpus.sh
task-context:
from-parameters:
src_locale: training_config.experiment.src
trg_locale: training_config.experiment.trg
substitution-fields:
- name
- label
- description
- worker.env
- dependencies
- fetches
- attributes
- run.command
upstreams-config:
upstream-artifacts:
- "{dataset_sanitized}.{src_locale}.zst"
- "{dataset_sanitized}.{trg_locale}.zst"
worker-type: b-cpu-largedisk
worker:
docker-image: {"in-tree": "train"}
max-run-time: 86400
artifacts:
- name: public/build
path: /builds/worker/artifacts
type: directory
env:
SRC: "{src_locale}"
TRG: "{trg_locale}"
# 128 happens when cloning this repository fails
retry-exit-status: [128]
# Don't run unless explicitly scheduled
run-on-tasks-for: []
run:
using: run-task
command:
- bash
- -c
# Arguments are:
# 1) output prefix
# 2) input files
- >-
export BIN=$MOZ_FETCHES_DIR &&
$VCS_PATH/pipeline/clean/merge-corpus.sh
$TASK_WORKDIR/artifacts/{artifact_prefix}
$MOZ_FETCHES_DIR/*.zst
fetches:
toolchain:
- preprocess
tasks:
merge-corpus:
label: merge-corpus-{src_locale}-{trg_locale}
description: merge corpus for {src_locale}-{trg_locale}
worker-type: b-cpu-largedisk
worker:
docker-image: {"in-tree": "train"}
max-run-time: 86400
artifacts:
- name: public/build
path: /builds/worker/artifacts
type: directory
env:
SRC: "{src_locale}"
TRG: "{trg_locale}"
# 128 happens when cloning this repository fails
retry-exit-status: [128]
# Don't run unless explicitly scheduled
run-on-tasks-for: []
attributes:
src_locale: "{src_locale}"
trg_locale: "{trg_locale}"
dataset-category: train
stage: merge-corpus
cache:
type: merge-corpus
from-parameters:
max_sentences:
- training_config.experiment.corpus-max-sentences
resources:
- pipeline/clean/merge-corpus.py
- pipeline/clean/requirements/merge.txt
task-context:
from-parameters:
src_locale: training_config.experiment.src
trg_locale: training_config.experiment.trg
max_sentences: training_config.experiment.corpus-max-sentences
substitution-fields:
- name
- label
- description
- worker.env
- dependencies
- fetches
- attributes
- run.command
upstreams-config:
upstream-artifacts:
- "{dataset_sanitized}.{src_locale}.zst"
- "{dataset_sanitized}.{trg_locale}.zst"
upstream-task-attributes:
cleaning-type:
by-cleaning-type:
bicleaner-ai: bicleaner-ai
task-context:
from-object:
artifact_prefix: corpus
run:
using: run-task
command:
- bash
- -c
- >-
pip install -r $VCS_PATH/pipeline/clean/requirements/merge.txt &&
export PYTHONPATH=$PYTHONPATH:$VCS_PATH &&
python3 $VCS_PATH/pipeline/clean/merge-corpus.py
--src {src_locale}
--trg {trg_locale}
--artifacts $TASK_WORKDIR/artifacts
--name corpus
--max_lines {max_sentences}
--datasets_glob "$MOZ_FETCHES_DIR/*.zst"
fetches:
toolchain:
- preprocess

Просмотреть файл

@ -17,78 +17,79 @@ kind-dependencies:
- dataset
- toolchain
task-defaults:
attributes:
src_locale: "{src_locale}"
trg_locale: "{trg_locale}"
cache:
resources:
- pipeline/clean/merge-corpus.sh
task-context:
from-parameters:
src_locale: training_config.experiment.src
trg_locale: training_config.experiment.trg
substitution-fields:
- name
- label
- description
- worker.env
- dependencies
- fetches
- attributes
- run.command
upstreams-config:
upstream-artifacts:
- "{dataset_sanitized}.{src_locale}.zst"
- "{dataset_sanitized}.{trg_locale}.zst"
worker-type: b-cpu
worker:
docker-image: {"in-tree": "train"}
max-run-time: 86400
artifacts:
- name: public/build
path: /builds/worker/artifacts
type: directory
env:
SRC: "{src_locale}"
TRG: "{trg_locale}"
# 128 happens when cloning this repository fails
retry-exit-status: [128]
# Don't run unless explicitly scheduled
run-on-tasks-for: []
run:
using: run-task
command:
- bash
- -c
# Arguments are:
# 1) output prefix
# 2) input files
- >-
export BIN=$MOZ_FETCHES_DIR &&
$VCS_PATH/pipeline/clean/merge-corpus.sh
$TASK_WORKDIR/artifacts/{artifact_prefix}
$MOZ_FETCHES_DIR/*.zst
fetches:
toolchain:
- preprocess
tasks:
merge-devset:
label: merge-devset-{src_locale}-{trg_locale}
description: merge devset for {src_locale}-{trg_locale}
worker-type: b-cpu
worker:
docker-image: {"in-tree": "train"}
max-run-time: 86400
artifacts:
- name: public/build
path: /builds/worker/artifacts
type: directory
env:
SRC: "{src_locale}"
TRG: "{trg_locale}"
# 128 happens when cloning this repository fails
retry-exit-status: [128]
# Don't run unless explicitly scheduled
run-on-tasks-for: []
attributes:
src_locale: "{src_locale}"
trg_locale: "{trg_locale}"
dataset-category: devtest
stage: merge-devset
cache:
type: merge-devset
from-parameters:
max_sentences:
- training_config.experiment.corpus-max-sentences
resources:
- pipeline/clean/merge-corpus.py
- pipeline/clean/requirements/merge.txt
task-context:
from-parameters:
src_locale: training_config.experiment.src
trg_locale: training_config.experiment.trg
max_sentences: training_config.experiment.corpus-max-sentences
substitution-fields:
- name
- label
- description
- worker.env
- dependencies
- fetches
- attributes
- run.command
upstreams-config:
upstream-artifacts:
- "{dataset_sanitized}.{src_locale}.zst"
- "{dataset_sanitized}.{trg_locale}.zst"
upstream-task-attributes:
kind: dataset
task-context:
from-object:
artifact_prefix: devset
run:
using: run-task
command:
- bash
- -c
- >-
pip install -r $VCS_PATH/pipeline/clean/requirements/merge.txt &&
export PYTHONPATH=$PYTHONPATH:$VCS_PATH &&
python3 $VCS_PATH/pipeline/clean/merge-corpus.py
--src {src_locale}
--trg {trg_locale}
--artifacts $TASK_WORKDIR/artifacts
--name devset
--max_lines {max_sentences}
--datasets_glob "$MOZ_FETCHES_DIR/*.zst"
fetches:
toolchain:
- preprocess

Просмотреть файл

@ -26,7 +26,7 @@ task-defaults:
type: merge-mono
resources:
- pipeline/clean/merge-mono.py
- pipeline/clean/requirements/merge-mono.txt
- pipeline/clean/requirements/merge.txt
task-context:
from-parameters:
@ -77,7 +77,7 @@ task-defaults:
# 2) max_sentences
# 3) datasets
- >-
pip install -r $VCS_PATH/pipeline/clean/requirements/merge-mono.txt &&
pip install -r $VCS_PATH/pipeline/clean/requirements/merge.txt &&
export PYTHONPATH=$PYTHONPATH:$VCS_PATH &&
python3 $VCS_PATH/pipeline/clean/merge-mono.py
--parallel_corpus $MOZ_FETCHES_DIR/corpus/corpus.{locale}.zst

Просмотреть файл

@ -37,6 +37,7 @@ extend_parameters_schema(
Required("trg"): str,
Required("teacher-ensemble"): int,
Required("teacher-mode"): str,
Optional("corpus-max-sentences"): int,
Required("mono-max-sentences-trg"): {
Required("total"): int,
Required("per-dataset"): int,

53
tests/fixtures/__init__.py поставляемый
Просмотреть файл

@ -124,6 +124,7 @@ class DataDir:
fetches_dir: Optional[str] = None,
env: dict[str, str] = {},
extra_args: List[str] = None,
replace_args: List[str] = None,
):
"""
Runs a task from the taskgraph. See artifacts/full-task-graph.json after running a
@ -163,6 +164,12 @@ class DataDir:
if extra_args:
command_parts_split.extend(extra_args)
if replace_args:
for arg_from, arg_to in replace_args:
for index, arg in enumerate(command_parts_split):
if arg == arg_from:
command_parts_split[index] = arg_to
final_env = {
# The following are set by the Taskcluster server.
"TASK_ID": "fake_id",
@ -206,26 +213,30 @@ class DataDir:
command_parts_split[index] = part
# If using a venv, prepend the binary directory to the path so it is used.
python_bin_dir, venv_dir = get_python_dirs(requirements)
if python_bin_dir:
final_env = {**final_env, "PATH": f'{python_bin_dir}:{os.environ.get("PATH", "")}'}
if command_parts_split[0].endswith(".py"):
# This script is relying on a shebang, add the python3 from the executable instead.
command_parts_split.insert(0, os.path.join(python_bin_dir, "python3"))
elif command_parts_split[0].endswith(".py"):
# This script does not require a virtual environment.
command_parts_split.insert(0, "python3")
if requirements:
python_bin_dir, venv_dir = get_python_dirs(requirements)
if python_bin_dir:
final_env = {
**final_env,
"PATH": f'{python_bin_dir}:{os.environ.get("PATH", "")}',
}
if command_parts_split[0].endswith(".py"):
# This script is relying on a shebang, add the python3 from the executable instead.
command_parts_split.insert(0, os.path.join(python_bin_dir, "python3"))
elif command_parts_split[0].endswith(".py"):
# This script does not require a virtual environment.
command_parts_split.insert(0, "python3")
# We have to set the path to the C++ lib before the process is started
# https://github.com/Helsinki-NLP/opus-fast-mosestokenizer/issues/6
with open(requirements) as f:
reqs_txt = f.read()
if "opus-fast-mosestokenizer" in reqs_txt:
lib_path = os.path.join(
venv_dir, "lib/python3.10/site-packages/mosestokenizer/lib"
)
print(f"Setting LD_LIBRARY_PATH to {lib_path}")
final_env["LD_LIBRARY_PATH"] = lib_path
# We have to set the path to the C++ lib before the process is started
# https://github.com/Helsinki-NLP/opus-fast-mosestokenizer/issues/6
with open(requirements) as f:
reqs_txt = f.read()
if venv_dir and "opus-fast-mosestokenizer" in reqs_txt:
lib_path = os.path.join(
venv_dir, "lib/python3.10/site-packages/mosestokenizer/lib"
)
print(f"Setting LD_LIBRARY_PATH to {lib_path}")
final_env["LD_LIBRARY_PATH"] = lib_path
print("┌──────────────────────────────────────────────────────────")
print("│ run_task:", " ".join(command_parts_split))
@ -512,14 +523,12 @@ def get_mocked_downloads() -> str:
) # fmt: skip
def get_python_dirs(requirements: Optional[str]) -> Optional[Tuple[str, str]]:
def get_python_dirs(requirements: str) -> Tuple[str, str]:
"""
Creates a virtual environment for each requirements file that a task needs. The virtual
environment is hashed based on the requirements file contents, and the system details. This
way a virtual environment will be re-used between docker environments.
"""
if not requirements:
return None
system_details = "-".join(
[

219
tests/test_merge_corpus.py Normal file
Просмотреть файл

@ -0,0 +1,219 @@
import json
import pytest
from fixtures import DataDir
from pipeline.common.downloads import read_lines
ada = [
("ADA 1", "АДА 1"),
("ADA 2", "АДА 2"),
("ADA 3", "АДА 3"),
("SHARED 1", "ШАРЕД 1"),
("SHARED 2", "ШАРЕД 2"),
("ADA 4", "АДА 4"),
("ADA 5", "АДА 5"),
]
wiki = [
("WIKI 1", "WИКИ 1"),
("WIKI 2", "WИКИ 2"),
("SHARED 3", "ШАРЕД 3"),
("SHARED 4", "ШАРЕД 4"),
("WIKI 3", "WИКИ 3"),
("SHARED 1", "ШАРЕД 1"),
("WIKI 4", "WИКИ 4"),
]
web_acquired = [
("WEB_ACQUIRED 1", "WЕБ_АЦQУИРЕД 1"),
("WEB_ACQUIRED 2", "WЕБ_АЦQУИРЕД 2"),
("SHARED 3", "ШАРЕД 3"),
("SHARED 4", "ШАРЕД 4"),
("WEB_ACQUIRED 3", "WЕБ_АЦQУИРЕД 3"),
("SHARED 2", "ШАРЕД 2"),
("WEB_ACQUIRED 4", "WЕБ_АЦQУИРЕД 4"),
]
def build_dataset_contents(lines: list[tuple[str, str]], index):
return "\n".join([line[index] for line in lines]) + "\n"
@pytest.fixture(scope="function")
def data_dir():
data_dir = DataDir("test_merge_corpus")
data_dir.mkdir("artifacts")
data_dir.create_zst("ada83_v1.en.zst", build_dataset_contents(ada, 0))
data_dir.create_zst("ada83_v1.ru.zst", build_dataset_contents(ada, 1))
data_dir.create_zst("ELRC-3075-wikipedia_health_v1.en.zst", build_dataset_contents(wiki, 0))
data_dir.create_zst("ELRC-3075-wikipedia_health_v1.ru.zst", build_dataset_contents(wiki, 1))
data_dir.create_zst("ELRC-web_acquired_data.en.zst", build_dataset_contents(web_acquired, 0))
data_dir.create_zst("ELRC-web_acquired_data.ru.zst", build_dataset_contents(web_acquired, 1))
return data_dir
def assert_dataset(data_dir: DataDir, path: str, sorted_lines: list[str]):
with read_lines(data_dir.join(path)) as lines_iter:
# Sort the dataset, as the sorted lines are easier to scan and reason with.
# The datasets is still checked to be shuffled by comparing the original
# and sorted lines.
corpus_lines = list(lines_iter)
corpus_lines_sorted = list(corpus_lines)
corpus_lines_sorted.sort()
assert corpus_lines_sorted == sorted_lines
assert corpus_lines != corpus_lines_sorted, "The results are shuffled."
@pytest.mark.parametrize(
"name",
["corpus", "devset"],
)
def test_merge_corpus(data_dir, name):
data_dir.run_task(
# Tasks merge-corpus-en-ru, and merge-devset-en-ru.
f"merge-{name}-en-ru",
)
data_dir.print_tree()
assert_dataset(
data_dir,
f"artifacts/{name}.en.zst",
sorted_lines=[
"ADA 1\n",
"ADA 2\n",
"ADA 3\n",
"ADA 4\n",
"ADA 5\n",
"SHARED 1\n",
"SHARED 2\n",
"SHARED 3\n",
"SHARED 4\n",
"WEB_ACQUIRED 1\n",
"WEB_ACQUIRED 2\n",
"WEB_ACQUIRED 3\n",
"WEB_ACQUIRED 4\n",
"WIKI 1\n",
"WIKI 2\n",
"WIKI 3\n",
"WIKI 4\n",
],
)
assert_dataset(
data_dir,
f"artifacts/{name}.ru.zst",
sorted_lines=[
"WЕБ_АЦQУИРЕД 1\n",
"WЕБ_АЦQУИРЕД 2\n",
"WЕБ_АЦQУИРЕД 3\n",
"WЕБ_АЦQУИРЕД 4\n",
"WИКИ 1\n",
"WИКИ 2\n",
"WИКИ 3\n",
"WИКИ 4\n",
"АДА 1\n",
"АДА 2\n",
"АДА 3\n",
"АДА 4\n",
"АДА 5\n",
"ШАРЕД 1\n",
"ШАРЕД 2\n",
"ШАРЕД 3\n",
"ШАРЕД 4\n",
],
)
assert json.loads(data_dir.load(f"artifacts/{name}.stats.json")) == {
"parallel_corpus": {
"description": "The parallel corpora are merged and deduplicated",
"filtered": 4,
"kept": 17,
"visited": 21,
},
"final_truncated": {
"description": "The final result can be truncated by max_lines",
"filtered": 0,
"kept": 17,
"visited": 17,
},
"datasets": [
{
"description": "ELRC-3075-wikipedia_health_v1",
"filtered": 0,
"kept": 7,
"visited": 7,
},
{"description": "ELRC-web_acquired_data", "filtered": 2, "kept": 5, "visited": 7},
{"description": "ada83_v1", "filtered": 2, "kept": 5, "visited": 7},
],
}
@pytest.mark.parametrize(
"name",
["corpus", "devset"],
)
def test_merge_devset_trimmed(data_dir, name):
data_dir.run_task(
# Tasks merge-corpus-en-ru, and merge-devset-en-ru.
f"merge-{name}-en-ru",
# Replace the max_sentences.
replace_args=[("None", "10")],
)
data_dir.print_tree()
assert_dataset(
data_dir,
f"artifacts/{name}.en.zst",
sorted_lines=[
"ADA 1\n",
"ADA 3\n",
"ADA 4\n",
"ADA 5\n",
"SHARED 1\n",
"SHARED 2\n",
"SHARED 3\n",
"WIKI 1\n",
"WIKI 2\n",
"WIKI 4\n",
],
)
assert_dataset(
data_dir,
f"artifacts/{name}.ru.zst",
sorted_lines=[
"WИКИ 1\n",
"WИКИ 2\n",
"WИКИ 4\n",
"АДА 1\n",
"АДА 3\n",
"АДА 4\n",
"АДА 5\n",
"ШАРЕД 1\n",
"ШАРЕД 2\n",
"ШАРЕД 3\n",
],
)
assert json.loads(data_dir.load(f"artifacts/{name}.stats.json")) == {
"parallel_corpus": {
"description": "The parallel corpora are merged and deduplicated",
"filtered": 4,
"kept": 17,
"visited": 21,
},
"final_truncated": {
"description": "The final result can be truncated by max_lines",
"filtered": 7,
"kept": 10,
"visited": 17,
},
"datasets": [
{
"description": "ELRC-3075-wikipedia_health_v1",
"filtered": 0,
"kept": 7,
"visited": 7,
},
{"description": "ELRC-web_acquired_data", "filtered": 2, "kept": 5, "visited": 7},
{"description": "ada83_v1", "filtered": 2, "kept": 5, "visited": 7},
],
}

Просмотреть файл

@ -103,7 +103,9 @@ def test_merge_mono(task: str):
assert mono_lines != mono_lines_sorted, "The results are shuffled."
with read_lines(data_dir.join(f"artifacts/mono.{locale}.sample.txt")) as lines_iter:
with read_lines(
data_dir.join(f"artifacts/mono.{locale}.sample.txt"), encoding="utf-8-sig"
) as lines_iter:
samples = list(lines_iter)
assert len(samples) == sample_size, "There are the expected number of samples"
assert len(set(samples)) == sample_size, "All of the samples are are unique."