Share code for splitting records from pseudo-JSONL
This commit is contained in:
Родитель
e8d5029912
Коммит
3c57597a19
|
@ -0,0 +1,23 @@
|
|||
import * as fs from 'fs-extra';
|
||||
|
||||
/**
|
||||
* Read a file consisting of multiple JSON objects. Each object is separated from the previous one
|
||||
* by a double newline sequence. This is basically a more human-readable form of JSONL.
|
||||
*
|
||||
* The current implementation reads the entire text of the document into memory, but in the future
|
||||
* it will stream the document to improve the performance with large documents.
|
||||
*
|
||||
* @param path The path to the file.
|
||||
* @param handler Callback to be invoked for each top-level JSON object in order.
|
||||
*/
|
||||
export async function readJsonlFile(path: string, handler: (value: any) => Promise<void>): Promise<void> {
|
||||
const logSummary = await fs.readFile(path, 'utf-8');
|
||||
|
||||
// Remove newline delimiters because summary is in .jsonl format.
|
||||
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
|
||||
|
||||
for (const obj of jsonSummaryObjects) {
|
||||
const jsonObj = JSON.parse(obj);
|
||||
await handler(jsonObj);
|
||||
}
|
||||
}
|
|
@ -1,3 +1,5 @@
|
|||
import { readJsonlFile } from '../log-insights/jsonl-reader';
|
||||
|
||||
// TODO(angelapwen): Only load in necessary information and
|
||||
// location in bytes for this log to save memory.
|
||||
export interface EvalLogData {
|
||||
|
@ -13,14 +15,10 @@ export interface EvalLogData {
|
|||
* an array of EvalLogData objects.
|
||||
*
|
||||
*/
|
||||
export function parseViewerData(logSummary: string): EvalLogData[] {
|
||||
// Remove newline delimiters because summary is in .jsonl format.
|
||||
const jsonSummaryObjects: string[] = logSummary.split(/\r?\n\r?\n/g);
|
||||
export async function parseViewerData(jsonSummaryPath: string): Promise<EvalLogData[]> {
|
||||
const viewerData: EvalLogData[] = [];
|
||||
|
||||
for (const obj of jsonSummaryObjects) {
|
||||
const jsonObj = JSON.parse(obj);
|
||||
|
||||
await readJsonlFile(jsonSummaryPath, async jsonObj => {
|
||||
// Only convert log items that have an RA and millis field
|
||||
if (jsonObj.ra !== undefined && jsonObj.millis !== undefined) {
|
||||
const newLogData: EvalLogData = {
|
||||
|
@ -31,6 +29,7 @@ export function parseViewerData(logSummary: string): EvalLogData[] {
|
|||
};
|
||||
viewerData.push(newLogData);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return viewerData;
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ import {
|
|||
window,
|
||||
workspace,
|
||||
} from 'vscode';
|
||||
import * as JsonlParser from 'stream-json/jsonl/Parser';
|
||||
import { QueryHistoryConfig } from './config';
|
||||
import {
|
||||
showAndLogErrorMessage,
|
||||
|
@ -53,6 +52,7 @@ import { EvalLogData, parseViewerData } from './pure/log-summary-parser';
|
|||
import { PipelineInfo, SummarySymbols } from './log-insights/summary-parser';
|
||||
import { DiagnosticSeverity } from 'vscode-languageclient';
|
||||
import { EvaluationLogProblemReporter, EvaluationLogScannerProvider } from './log-insights/log-scanner';
|
||||
import { readJsonlFile } from './log-insights/jsonl-reader';
|
||||
|
||||
/**
|
||||
* query-history.ts
|
||||
|
@ -983,7 +983,7 @@ export class QueryHistoryManager extends DisposableObject {
|
|||
}
|
||||
|
||||
// Summary log file doesn't exist.
|
||||
if (finalSingleItem.evalLogLocation && fs.pathExists(finalSingleItem.evalLogLocation)) {
|
||||
if (finalSingleItem.evalLogLocation && await fs.pathExists(finalSingleItem.evalLogLocation)) {
|
||||
// If raw log does exist, then the summary log is still being generated.
|
||||
this.warnInProgressEvalLogSummary();
|
||||
} else {
|
||||
|
@ -1008,14 +1008,13 @@ export class QueryHistoryManager extends DisposableObject {
|
|||
}
|
||||
|
||||
// TODO(angelapwen): Stream the file in.
|
||||
void fs.readFile(finalSingleItem.jsonEvalLogSummaryLocation, async (err, buffer) => {
|
||||
if (err) {
|
||||
throw new Error(`Could not read evaluator log summary JSON file to generate viewer data at ${finalSingleItem.jsonEvalLogSummaryLocation}.`);
|
||||
}
|
||||
const evalLogData: EvalLogData[] = parseViewerData(buffer.toString());
|
||||
try {
|
||||
const evalLogData: EvalLogData[] = await parseViewerData(finalSingleItem.jsonEvalLogSummaryLocation);
|
||||
const evalLogTreeBuilder = new EvalLogTreeBuilder(finalSingleItem.getQueryName(), evalLogData);
|
||||
this.evalLogViewer.updateRoots(await evalLogTreeBuilder.getRoots());
|
||||
});
|
||||
} catch (e) {
|
||||
throw new Error(`Could not read evaluator log summary JSON file to generate viewer data at ${finalSingleItem.jsonEvalLogSummaryLocation}.`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1027,12 +1026,12 @@ export class QueryHistoryManager extends DisposableObject {
|
|||
query: LocalQueryInfo
|
||||
): Promise<void> {
|
||||
this.diagnosticCollection.clear();
|
||||
if (query.evalLogJsonSummaryLocation) {
|
||||
const diagnostics = await this.scanLog(query.evalLogJsonSummaryLocation, query.evalLogSummarySymbolsLocation);
|
||||
if (query.jsonEvalLogSummaryLocation) {
|
||||
const diagnostics = await this.scanLog(query.jsonEvalLogSummaryLocation, query.evalLogSummarySymbolsLocation);
|
||||
const uri = Uri.file(query.evalLogSummaryLocation!);
|
||||
this.diagnosticCollection.set(uri, diagnostics);
|
||||
} else {
|
||||
this.warnNoEvalLog();
|
||||
this.warnNoEvalLogs();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1244,17 +1243,10 @@ export class QueryHistoryManager extends DisposableObject {
|
|||
|
||||
const scanners = [...this.scannerProviders.values()].map(p => p.createScanner(problemReporter));
|
||||
|
||||
const stream = fs.createReadStream(jsonSummaryLocation)
|
||||
.pipe(JsonlParser.parser())
|
||||
.on('data', ({ value }) => {
|
||||
scanners.forEach(scanner => {
|
||||
scanner.onEvent(value);
|
||||
});
|
||||
await readJsonlFile(jsonSummaryLocation, async obj => {
|
||||
scanners.forEach(scanner => {
|
||||
scanner.onEvent(obj);
|
||||
});
|
||||
|
||||
await new Promise(function(resolve, reject) {
|
||||
stream.on('end', resolve);
|
||||
stream.on('error', reject);
|
||||
});
|
||||
|
||||
scanners.forEach(scanner => scanner.onDone());
|
||||
|
|
Загрузка…
Ссылка в новой задаче