Merge pull request #3829 from asgerf/asgerf/jsonl-parser2

Faster streaming 'jsonl' parser
This commit is contained in:
Asger F 2024-11-19 11:03:45 +01:00 коммит произвёл GitHub
Родитель a5fb820f67 53a6f00ad1
Коммит 93dacb042e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
2 изменённых файлов: 109 добавлений и 48 удалений

Просмотреть файл

@ -1,6 +1,7 @@
import { statSync } from "fs";
import { stat } from "fs/promises";
import { createReadStream } from "fs-extra";
import { createInterface } from "readline";
const doubleLineBreakRegexp = /\n\r?\n/;
/**
* Read a file consisting of multiple JSON objects. Each object is separated from the previous one
@ -14,53 +15,40 @@ export async function readJsonlFile<T>(
handler: (value: T) => Promise<void>,
logger?: { log: (message: string) => void },
): Promise<void> {
function parseJsonFromCurrentLines() {
try {
return JSON.parse(currentLineSequence.join("\n")) as T;
} catch (e) {
void logger?.log(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
`Error: Failed to parse at line ${lineCount} of ${path} as JSON: ${(e as any)?.message ?? "UNKNOWN REASON"}. Problematic line below:\n${JSON.stringify(currentLineSequence, null, 2)}`,
);
throw e;
}
}
function logProgress() {
void logger?.log(
`Processed ${lineCount} lines with ${parseCounts} parses...`,
);
}
void logger?.log(
`Parsing ${path} (${statSync(path).size / 1024 / 1024} MB)...`,
`Parsing ${path} (${(await stat(path)).size / 1024 / 1024} MB)...`,
);
const fileStream = createReadStream(path, "utf8");
const rl = createInterface({
input: fileStream,
crlfDelay: Infinity,
return new Promise((resolve, reject) => {
const stream = createReadStream(path, { encoding: "utf8" });
let buffer = "";
stream.on("data", async (chunk: string) => {
const parts = (buffer + chunk).split(doubleLineBreakRegexp);
buffer = parts.pop()!;
if (parts.length > 0) {
try {
stream.pause();
for (const part of parts) {
await handler(JSON.parse(part));
}
stream.resume();
} catch (e) {
stream.destroy();
reject(e);
}
}
});
stream.on("end", async () => {
if (buffer.trim().length > 0) {
try {
await handler(JSON.parse(buffer));
} catch (e) {
reject(e);
return;
}
}
void logger?.log(`Finishing parsing ${path}`);
resolve();
});
stream.on("error", reject);
});
let lineCount = 0;
let parseCounts = 0;
let currentLineSequence: string[] = [];
for await (const line of rl) {
if (line === "") {
// as mentioned above: a double newline sequence indicates the end of the current JSON object, so we parse it and pass it to the handler
await handler(parseJsonFromCurrentLines());
parseCounts++;
currentLineSequence = [];
} else {
currentLineSequence.push(line);
}
lineCount++;
if (lineCount % 1000000 === 0) {
logProgress();
}
}
// in case the file is not newline-terminated, we need to handle the last JSON object
if (currentLineSequence.length > 0) {
await handler(parseJsonFromCurrentLines());
}
logProgress();
}

Просмотреть файл

@ -0,0 +1,73 @@
import { readFile } from "fs-extra";
import { readJsonlFile } from "../../src/common/jsonl-reader";
import { performance } from "perf_hooks";
import { join } from "path";
/** An "obviously correct" implementation to test against. */
async function readJsonlReferenceImpl<T>(
path: string,
handler: (value: T) => Promise<void>,
): Promise<void> {
const logSummary = await readFile(path, "utf-8");
// Remove newline delimiters because summary is in .jsonl format.
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
for (const obj of jsonSummaryObjects) {
const jsonObj = JSON.parse(obj) as T;
await handler(jsonObj);
}
}
type ParserFn = (
text: string,
callback: (v: unknown) => Promise<void>,
) => Promise<void>;
const parsers: Record<string, ParserFn> = {
readJsonlReferenceImpl,
readJsonlFile,
};
async function main() {
const args = process.argv.slice(2);
const file =
args.length > 0
? args[0]
: join(
__dirname,
"../unit-tests/data/evaluator-log-summaries/bad-join-order.jsonl",
);
const numTrials = args.length > 1 ? Number(args[1]) : 100;
const referenceValues: any[] = [];
await readJsonlReferenceImpl(file, async (event) => {
referenceValues.push(event);
});
const referenceValueString = JSON.stringify(referenceValues);
// Do warm-up runs and check against reference implementation
for (const [name, parser] of Object.entries(parsers)) {
const values: unknown[] = [];
await parser(file, async (event) => {
values.push(event);
});
if (JSON.stringify(values) !== referenceValueString) {
console.error(`${name}: failed to match reference implementation`);
}
}
for (const [name, parser] of Object.entries(parsers)) {
const startTime = performance.now();
for (let i = 0; i < numTrials; ++i) {
await Promise.all([
parser(file, async () => {}),
parser(file, async () => {}),
]);
}
const duration = performance.now() - startTime;
const durationPerTrial = duration / numTrials;
console.log(`${name}: ${durationPerTrial.toFixed(1)} ms`);
}
}
main().catch((err: unknown) => {
console.error(err);
});