Stop parsing op contents in DeltaManager - runtime will do it (#22750)
A long time ago (5acfef448f
) we added
support in ContaineRuntime to parse op contents if it's a string. The
intention was to stop parsing in DeltaManager once that saturated. This
is that long overdue follow-up.
Taking this opportunity to make a few things hopefully clearer in
ContainerRuntime too:
* Highlighting where/how the serialization/deserialization of `contents`
happens
* Highlighting the different treatment/expectations for runtime v.
non-runtime messages during `process` flow
## Deprecations:
Deprecating use of `contents` on the event arg `op` for
`batchBegin`/`batchEnd` events, they're in for a surprise. I added a
changeset for this case.
This commit is contained in:
Родитель
5ca6cd2257
Коммит
de6928b528
|
@ -0,0 +1,14 @@
|
|||
---
|
||||
"@fluidframework/container-runtime": minor
|
||||
---
|
||||
---
|
||||
"section": other
|
||||
---
|
||||
|
||||
ContainerRuntime's `batchBegin`/`batchEnd` events: Deprecating the `contents` member on event arg `op`
|
||||
|
||||
The `batchBegin`/`batchEnd` events on ContainerRuntime indicate when a batch is beginning/finishing being processed.
|
||||
The events include an argument of type `ISequencedDocumentMessage` which is the first or last message of the batch.
|
||||
|
||||
The `contents` should not be used when reasoning over the begin/end of a batch.
|
||||
If you want to look at the `contents` of an op, wait for the `op` event.
|
|
@ -1031,15 +1031,6 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
|
|||
});
|
||||
}
|
||||
|
||||
// TODO: AB#12052: Stop parsing message.contents here, let the downstream handlers do it
|
||||
if (
|
||||
typeof message.contents === "string" &&
|
||||
message.contents !== "" &&
|
||||
message.type !== MessageType.ClientLeave
|
||||
) {
|
||||
message.contents = JSON.parse(message.contents);
|
||||
}
|
||||
|
||||
// Validate client sequence number has no gap. Decrement the noOpCount by gap
|
||||
// If the count ends up negative, that means we have a real gap and throw error
|
||||
if (
|
||||
|
|
|
@ -125,7 +125,9 @@ export class ProtocolOpHandler implements IProtocolHandler {
|
|||
}
|
||||
|
||||
case MessageType.Propose: {
|
||||
// This should become unconditional once DeltaManager.processInboundMessage() stops parsing content (ADO #12052)
|
||||
// DeltaManager is no longer parsing the contents, so we expect this to be a string here
|
||||
// (However, switching this to an assert caused some tests to fail, apparently due to the same
|
||||
// message object being processed multiple times - So we will retain the typeof check for now)
|
||||
if (typeof message.contents === "string") {
|
||||
message.contents = JSON.parse(message.contents);
|
||||
}
|
||||
|
|
|
@ -185,6 +185,7 @@ import {
|
|||
OpSplitter,
|
||||
Outbox,
|
||||
RemoteMessageProcessor,
|
||||
serializeOpContents,
|
||||
} from "./opLifecycle/index.js";
|
||||
import { pkgVersion } from "./packageVersion.js";
|
||||
import {
|
||||
|
@ -762,7 +763,7 @@ function lastMessageFromMetadata(metadata: IContainerRuntimeMetadata | undefined
|
|||
* to understand if/when it is hit.
|
||||
* We only want to log this once, to avoid spamming telemetry if we are wrong and these cases are hit commonly.
|
||||
*/
|
||||
let getSingleUseLegacyLogCallback = (logger: ITelemetryLoggerExt, type: string) => {
|
||||
export let getSingleUseLegacyLogCallback = (logger: ITelemetryLoggerExt, type: string) => {
|
||||
return (codePath: string) => {
|
||||
logger.sendTelemetryEvent({
|
||||
eventName: "LegacyMessageFormat",
|
||||
|
@ -2710,8 +2711,13 @@ export class ContainerRuntime
|
|||
const savedOp = (messageCopy.metadata as ISavedOpMetadata)?.savedOp;
|
||||
const logLegacyCase = getSingleUseLegacyLogCallback(this.logger, messageCopy.type);
|
||||
|
||||
let runtimeBatch: boolean =
|
||||
hasModernRuntimeMessageEnvelope || isUnpackedRuntimeMessage(messageCopy);
|
||||
if (runtimeBatch) {
|
||||
// We expect runtime messages to have JSON contents - deserialize it in place.
|
||||
ensureContentsDeserialized(messageCopy, hasModernRuntimeMessageEnvelope, logLegacyCase);
|
||||
ensureContentsDeserialized(messageCopy);
|
||||
}
|
||||
|
||||
if (hasModernRuntimeMessageEnvelope) {
|
||||
// If the message has the modern message envelope, then process it here.
|
||||
// Here we unpack the message (decompress, unchunk, and/or ungroup) into a batch of messages with ContainerMessageType
|
||||
|
@ -2749,7 +2755,6 @@ export class ContainerRuntime
|
|||
}
|
||||
}
|
||||
|
||||
let runtimeBatch: boolean = true;
|
||||
// Reach out to PendingStateManager, either to zip localOpMetadata into the *local* message list,
|
||||
// or to check to ensure the *remote* messages don't match the batchId of a pending local batch.
|
||||
// This latter case would indicate that the container has forked - two copies are trying to persist the same local changes.
|
||||
|
@ -2807,12 +2812,23 @@ export class ContainerRuntime
|
|||
runtimeBatch,
|
||||
);
|
||||
} else {
|
||||
if (!runtimeBatch) {
|
||||
// The DeltaManager used to do this, but doesn't anymore as of Loader v2.4
|
||||
// Anyone listening to our "op" event would expect the contents to be parsed per this same logic
|
||||
if (
|
||||
typeof messageCopy.contents === "string" &&
|
||||
messageCopy.contents !== "" &&
|
||||
messageCopy.type !== MessageType.ClientLeave
|
||||
) {
|
||||
messageCopy.contents = JSON.parse(messageCopy.contents);
|
||||
}
|
||||
}
|
||||
this.processInboundMessages(
|
||||
[{ message: messageCopy, localOpMetadata: undefined }],
|
||||
{ batchStart: true, batchEnd: true }, // Single message
|
||||
local,
|
||||
savedOp,
|
||||
isUnpackedRuntimeMessage(messageCopy) /* runtimeBatch */,
|
||||
runtimeBatch,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -4212,7 +4228,7 @@ export class ContainerRuntime
|
|||
contents: idRange,
|
||||
};
|
||||
const idAllocationBatchMessage: BatchMessage = {
|
||||
contents: JSON.stringify(idAllocationMessage),
|
||||
contents: serializeOpContents(idAllocationMessage),
|
||||
referenceSequenceNumber: this.deltaManager.lastSequenceNumber,
|
||||
};
|
||||
this.outbox.submitIdAllocation(idAllocationBatchMessage);
|
||||
|
@ -4275,13 +4291,13 @@ export class ContainerRuntime
|
|||
contents: schemaChangeMessage,
|
||||
};
|
||||
this.outbox.submit({
|
||||
contents: JSON.stringify(msg),
|
||||
contents: serializeOpContents(msg),
|
||||
referenceSequenceNumber: this.deltaManager.lastSequenceNumber,
|
||||
});
|
||||
}
|
||||
|
||||
const message: BatchMessage = {
|
||||
contents: JSON.stringify(containerRuntimeMessage) /* serialized content */,
|
||||
contents: serializeOpContents(containerRuntimeMessage),
|
||||
metadata,
|
||||
localOpMetadata,
|
||||
referenceSequenceNumber: this.deltaManager.lastSequenceNumber,
|
||||
|
|
|
@ -14,7 +14,7 @@ export {
|
|||
} from "./batchManager.js";
|
||||
export { BatchMessage, IBatch, IBatchCheckpoint, IChunkedOp } from "./definitions.js";
|
||||
export { DuplicateBatchDetector } from "./duplicateBatchDetector.js";
|
||||
export { Outbox, getLongStack } from "./outbox.js";
|
||||
export { Outbox, getLongStack, serializeOpContents } from "./outbox.js";
|
||||
export { OpCompressor } from "./opCompressor.js";
|
||||
export { OpDecompressor } from "./opDecompressor.js";
|
||||
export { OpSplitter, splitOp, isChunkedMessage } from "./opSplitter.js";
|
||||
|
|
|
@ -15,6 +15,7 @@ import {
|
|||
} from "@fluidframework/telemetry-utils/internal";
|
||||
|
||||
import { ICompressionRuntimeOptions } from "../containerRuntime.js";
|
||||
import { OutboundContainerRuntimeMessage } from "../messageTypes.js";
|
||||
import { PendingMessageResubmitData, PendingStateManager } from "../pendingStateManager.js";
|
||||
|
||||
import {
|
||||
|
@ -28,6 +29,8 @@ import { BatchMessage, IBatch, IBatchCheckpoint } from "./definitions.js";
|
|||
import { OpCompressor } from "./opCompressor.js";
|
||||
import { OpGroupingManager } from "./opGroupingManager.js";
|
||||
import { OpSplitter } from "./opSplitter.js";
|
||||
// eslint-disable-next-line unused-imports/no-unused-imports -- Used by "@link" comment annotation below
|
||||
import { ensureContentsDeserialized } from "./remoteMessageProcessor.js";
|
||||
|
||||
export interface IOutboxConfig {
|
||||
readonly compressionOptions: ICompressionRuntimeOptions;
|
||||
|
@ -54,6 +57,14 @@ export interface IOutboxParameters {
|
|||
readonly closeContainer: (error?: ICriticalContainerError) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Before submitting an op to the Outbox, its contents must be serialized using this function.
|
||||
* @remarks - The deserialization on process happens via the function {@link ensureContentsDeserialized}.
|
||||
*/
|
||||
export function serializeOpContents(contents: OutboundContainerRuntimeMessage): string {
|
||||
return JSON.stringify(contents);
|
||||
}
|
||||
|
||||
/**
|
||||
* Temporarily increase the stack limit while executing the provided action.
|
||||
* If a negative value is provided for `length`, no stack frames will be collected.
|
||||
|
|
|
@ -20,6 +20,8 @@ import { asBatchMetadata } from "../metadata.js";
|
|||
import { OpDecompressor } from "./opDecompressor.js";
|
||||
import { OpGroupingManager, isGroupedBatch } from "./opGroupingManager.js";
|
||||
import { OpSplitter, isChunkedMessage } from "./opSplitter.js";
|
||||
// eslint-disable-next-line unused-imports/no-unused-imports -- Used by "@link" comment annotation below
|
||||
import { serializeOpContents } from "./outbox.js";
|
||||
|
||||
/** Info about the batch we learn when we process the first message */
|
||||
export interface BatchStartInfo {
|
||||
|
@ -236,32 +238,16 @@ export class RemoteMessageProcessor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Takes an incoming message and if the contents is a string, JSON.parse's it in place
|
||||
* Takes an incoming runtime message JSON.parse's its contents in place, if needed (old Loader does this for us).
|
||||
* Only to be used for runtine messages.
|
||||
* @remarks - Serialization during submit happens via {@link serializeOpContents}
|
||||
* @param mutableMessage - op message received
|
||||
* @param hasModernRuntimeMessageEnvelope - false if the message does not contain the modern op envelop where message.type is MessageType.Operation
|
||||
* @param logLegacyCase - callback to log when legacy op is encountered
|
||||
*/
|
||||
export function ensureContentsDeserialized(
|
||||
mutableMessage: ISequencedDocumentMessage,
|
||||
hasModernRuntimeMessageEnvelope: boolean,
|
||||
logLegacyCase: (codePath: string) => void,
|
||||
): void {
|
||||
// This should become unconditional once (Loader LTS) DeltaManager.processInboundMessage() stops parsing content (ADO #12052)
|
||||
// Note: Until that change is made in the loader, this case will never be hit.
|
||||
// Then there will be a long time of needing both cases, until LTS catches up to the change.
|
||||
let didParseJsonContents: boolean;
|
||||
export function ensureContentsDeserialized(mutableMessage: ISequencedDocumentMessage): void {
|
||||
// This should become unconditional once Loader LTS reaches 2.4 or later.
|
||||
// There will be a long time of needing both cases, until LTS advances to that point.
|
||||
if (typeof mutableMessage.contents === "string" && mutableMessage.contents !== "") {
|
||||
mutableMessage.contents = JSON.parse(mutableMessage.contents);
|
||||
didParseJsonContents = true;
|
||||
} else {
|
||||
didParseJsonContents = false;
|
||||
}
|
||||
|
||||
// The DeltaManager parses the contents of the message as JSON if it is a string,
|
||||
// so we should never end up parsing it here.
|
||||
// Let's observe if we are wrong about this to learn about these cases.
|
||||
if (didParseJsonContents) {
|
||||
logLegacyCase("ensureContentsDeserialized_foundJsonContents");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -353,11 +353,10 @@ export class SummaryCollection extends TypedEventEmitter<ISummaryCollectionOpEve
|
|||
}
|
||||
|
||||
private parseContent(op: ISequencedDocumentMessage) {
|
||||
// This should become unconditional once (Loader LTS) DeltaManager.processInboundMessage() stops parsing content (ADO #12052)
|
||||
// Note: Until that change is made in the loader, this case will never be hit.
|
||||
// Then there will be a long time of needing both cases, until LTS catches up to the change.
|
||||
// This should become unconditional once (Loader LTS) reaches 2.4 or later
|
||||
// There will be a long time of needing both cases, until LTS catches up to the change.
|
||||
// That said, we may instead move to listen for "op" events from ContainerRuntime,
|
||||
// and parsing may not be required at all if ContainerRuntime.process() would parse it for all types of ops.
|
||||
// and parsing may not be required at all if ContainerRuntime.process() continues to parse it for all types of ops.
|
||||
if (typeof op.contents === "string") {
|
||||
op.contents = JSON.parse(op.contents);
|
||||
}
|
||||
|
|
|
@ -64,6 +64,7 @@ import {
|
|||
IContainerRuntimeOptions,
|
||||
IPendingRuntimeState,
|
||||
defaultPendingOpsWaitTimeoutMs,
|
||||
getSingleUseLegacyLogCallback,
|
||||
} from "../containerRuntime.js";
|
||||
import {
|
||||
ContainerMessageType,
|
||||
|
@ -2550,40 +2551,25 @@ describe("Runtime", () => {
|
|||
|
||||
it("Only log legacy codepath once", async () => {
|
||||
const mockLogger = new MockLogger();
|
||||
const containerRuntime = await ContainerRuntime.loadRuntime({
|
||||
context: getMockContext({ logger: mockLogger }) as IContainerContext,
|
||||
registryEntries: [],
|
||||
existing: false,
|
||||
provideEntryPoint: mockProvideEntryPoint,
|
||||
});
|
||||
mockLogger.clear();
|
||||
|
||||
const json = JSON.stringify({ hello: "world" });
|
||||
const messageBase = { contents: json, clientId: "CLIENT_ID" };
|
||||
|
||||
// This message won't trigger the legacy op log
|
||||
containerRuntime.process(
|
||||
{
|
||||
...messageBase,
|
||||
contents: {},
|
||||
sequenceNumber: 1,
|
||||
} as unknown as ISequencedDocumentMessage,
|
||||
false /* local */,
|
||||
let legacyLogger = getSingleUseLegacyLogCallback(
|
||||
createChildLogger({ logger: mockLogger }),
|
||||
"someType",
|
||||
);
|
||||
assert.equal(mockLogger.events.length, 0, "Expected no event logged");
|
||||
|
||||
// This message should trigger the legacy op log
|
||||
containerRuntime.process(
|
||||
{ ...messageBase, sequenceNumber: 2 } as unknown as ISequencedDocumentMessage,
|
||||
false /* local */,
|
||||
legacyLogger = getSingleUseLegacyLogCallback(
|
||||
createChildLogger({ logger: mockLogger }),
|
||||
"someType",
|
||||
);
|
||||
legacyLogger("codePath1");
|
||||
mockLogger.assertMatch([{ eventName: "LegacyMessageFormat" }]);
|
||||
|
||||
// This message would trigger the legacy op log, except we already logged once
|
||||
containerRuntime.process(
|
||||
{ ...messageBase, sequenceNumber: 3 } as unknown as ISequencedDocumentMessage,
|
||||
false /* local */,
|
||||
legacyLogger = getSingleUseLegacyLogCallback(
|
||||
createChildLogger({ logger: mockLogger }),
|
||||
"someType",
|
||||
);
|
||||
legacyLogger("codePath2");
|
||||
assert.equal(mockLogger.events.length, 0, "Expected no more events logged");
|
||||
});
|
||||
|
||||
|
|
|
@ -177,7 +177,7 @@ describe("RemoteMessageProcessor", () => {
|
|||
referenceSequenceNumber: message.referenceSequenceNumber,
|
||||
} as ISequencedDocumentMessage;
|
||||
|
||||
ensureContentsDeserialized(inboundMessage, true, () => {});
|
||||
ensureContentsDeserialized(inboundMessage);
|
||||
const result = messageProcessor.process(inboundMessage, () => {});
|
||||
switch (result?.type) {
|
||||
case "fullBatch":
|
||||
|
@ -486,7 +486,7 @@ describe("RemoteMessageProcessor", () => {
|
|||
metadata: { meta: "data" },
|
||||
};
|
||||
const documentMessage = message as ISequencedDocumentMessage;
|
||||
ensureContentsDeserialized(documentMessage, true, () => {});
|
||||
ensureContentsDeserialized(documentMessage);
|
||||
const processResult = messageProcessor.process(documentMessage, () => {});
|
||||
|
||||
assert.equal(
|
||||
|
|
|
@ -95,12 +95,16 @@ export interface IContainerRuntimeBase extends IEventProvider<IContainerRuntimeB
|
|||
// @alpha @sealed (undocumented)
|
||||
export interface IContainerRuntimeBaseEvents extends IEvent {
|
||||
// (undocumented)
|
||||
(event: "batchBegin", listener: (op: ISequencedDocumentMessage) => void): any;
|
||||
(event: "batchBegin", listener: (op: Omit<ISequencedDocumentMessage, "contents"> & {
|
||||
contents: unknown;
|
||||
}) => void): any;
|
||||
// (undocumented)
|
||||
(event: "batchEnd", listener: (error: any, op: Omit<ISequencedDocumentMessage, "contents"> & {
|
||||
contents: unknown;
|
||||
}) => void): any;
|
||||
// (undocumented)
|
||||
(event: "op", listener: (op: ISequencedDocumentMessage, runtimeMessage?: boolean) => void): any;
|
||||
// (undocumented)
|
||||
(event: "batchEnd", listener: (error: any, op: ISequencedDocumentMessage) => void): any;
|
||||
// (undocumented)
|
||||
(event: "signal", listener: (message: IInboundSignalMessage, local: boolean) => void): any;
|
||||
// (undocumented)
|
||||
(event: "dispose", listener: () => void): any;
|
||||
|
|
|
@ -122,13 +122,30 @@ export type VisibilityState = (typeof VisibilityState)[keyof typeof VisibilitySt
|
|||
* @sealed
|
||||
*/
|
||||
export interface IContainerRuntimeBaseEvents extends IEvent {
|
||||
(event: "batchBegin", listener: (op: ISequencedDocumentMessage) => void);
|
||||
(
|
||||
event: "batchBegin",
|
||||
listener: (
|
||||
op: Omit<ISequencedDocumentMessage, "contents"> & {
|
||||
/** @deprecated Use the "op" event to get details about the message contents */
|
||||
contents: unknown;
|
||||
},
|
||||
) => void,
|
||||
);
|
||||
(
|
||||
event: "batchEnd",
|
||||
listener: (
|
||||
error: any,
|
||||
op: Omit<ISequencedDocumentMessage, "contents"> & {
|
||||
/** @deprecated Use the "op" event to get details about the message contents */
|
||||
contents: unknown;
|
||||
},
|
||||
) => void,
|
||||
);
|
||||
/**
|
||||
* @param runtimeMessage - tells if op is runtime op. If it is, it was unpacked, i.e. it's type and content
|
||||
* @param runtimeMessage - tells if op is runtime op. If it is, it was unpacked, i.e. its type and content
|
||||
* represent internal container runtime type / content.
|
||||
*/
|
||||
(event: "op", listener: (op: ISequencedDocumentMessage, runtimeMessage?: boolean) => void);
|
||||
(event: "batchEnd", listener: (error: any, op: ISequencedDocumentMessage) => void);
|
||||
(event: "signal", listener: (message: IInboundSignalMessage, local: boolean) => void);
|
||||
(event: "dispose", listener: () => void);
|
||||
}
|
||||
|
|
|
@ -639,7 +639,8 @@ for (const testOpts of testMatrix) {
|
|||
const message = call.firstArg as ISequencedDocumentMessage;
|
||||
if (
|
||||
message.type === MessageType.Operation &&
|
||||
(message.contents as { type: string }).type === "groupedBatch"
|
||||
typeof message.contents === "string" &&
|
||||
(JSON.parse(message.contents) as { type?: unknown }).type === "groupedBatch"
|
||||
) {
|
||||
groupedBatchCount++;
|
||||
}
|
||||
|
|
|
@ -299,7 +299,8 @@ describeCompat("blobs", "NoCompat", (getTestObjectProvider, apis) => {
|
|||
const attachOpP = new Promise<void>((resolve, reject) =>
|
||||
container1.on("op", (op) => {
|
||||
if (
|
||||
(op.contents as { type?: unknown } | undefined)?.type ===
|
||||
typeof op.contents === "string" &&
|
||||
(JSON.parse(op.contents) as { type?: unknown })?.type ===
|
||||
ContainerMessageType.BlobAttach
|
||||
) {
|
||||
if ((op.metadata as { blobId?: unknown } | undefined)?.blobId) {
|
||||
|
|
|
@ -579,7 +579,8 @@ describeCompat("Message size", "NoCompat", (getTestObjectProvider, apis) => {
|
|||
const secondConnection = reconnectAfterOpProcessing(
|
||||
remoteContainer,
|
||||
(op) =>
|
||||
(op.contents as { type?: unknown } | undefined)?.type ===
|
||||
typeof op.contents === "string" &&
|
||||
(JSON.parse(op.contents) as { type?: unknown })?.type ===
|
||||
ContainerMessageType.ChunkedOp,
|
||||
2,
|
||||
);
|
||||
|
@ -593,7 +594,8 @@ describeCompat("Message size", "NoCompat", (getTestObjectProvider, apis) => {
|
|||
const secondConnection = reconnectAfterOpProcessing(
|
||||
remoteContainer,
|
||||
(op) => {
|
||||
const contents = op.contents as any | undefined;
|
||||
const contents =
|
||||
typeof op.contents === "string" ? JSON.parse(op.contents) : undefined;
|
||||
return (
|
||||
contents?.type === ContainerMessageType.ChunkedOp &&
|
||||
contents?.contents?.chunkId === contents?.contents?.totalChunks
|
||||
|
|
|
@ -337,7 +337,7 @@ describeCompat("Stamped v2 ops", "NoCompat", (getTestObjectProvider, apis) => {
|
|||
|
||||
// Check if the shim2 received the op
|
||||
const op2 = await opSent;
|
||||
const env = op2.contents as any;
|
||||
const env = JSON.parse(op2.contents as string);
|
||||
assert.equal(
|
||||
env.contents.address,
|
||||
testObj2.id,
|
||||
|
|
|
@ -121,9 +121,7 @@ export class ProtocolOpHandler implements IProtocolHandler {
|
|||
break;
|
||||
|
||||
case MessageType.Propose:
|
||||
// This should become unconditional once (Loader LTS) DeltaManager.processInboundMessage() stops parsing content (ADO #12052)
|
||||
// Note: Until that change is made in the loader, this case will never be hit.
|
||||
// Then there will be a long time of needing both cases, until LTS catches up to the change.
|
||||
// TODO: Update callers to stop parsing the contents and do it here unconditionally
|
||||
if (typeof message.contents === "string") {
|
||||
message.contents = JSON.parse(message.contents);
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче