diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 8c6e2edf457..1f851f72c4f 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -181,7 +181,7 @@ import { } from "./opLifecycle/index.js"; import { pkgVersion } from "./packageVersion.js"; import { - IPendingBatchMessage, + PendingMessageResubmitData, IPendingLocalState, PendingStateManager, } from "./pendingStateManager.js"; @@ -668,7 +668,7 @@ export const makeLegacySendBatchFn = (batch: IBatch) => { // Default to negative one to match Container.submitBatch behavior let clientSequenceNumber: number = -1; - for (const message of batch.content) { + for (const message of batch.messages) { clientSequenceNumber = submitFn( MessageType.Operation, // For back-compat (submitFn only works on deserialized content) @@ -1549,10 +1549,6 @@ export class ContainerRuntime clientId: () => this.clientId, close: this.closeFn, connected: () => this.connected, - reSubmit: (message: IPendingBatchMessage) => { - this.reSubmit(message); - this.flush(); - }, reSubmitBatch: this.reSubmitBatch.bind(this), isActiveConnection: () => this.innerDeltaManager.active, isAttached: () => this.attachState !== AttachState.Detached, @@ -2909,7 +2905,7 @@ export class ContainerRuntime // Note: we are not touching any batches other than mainBatch here, for two reasons: // 1. It would not help, as other batches are flushed independently from main batch. // 2. There is no way to undo process of data store creation, blob creation, ID compressor ops, or other things tracked by other batches. - checkpoint = this.outbox.checkpoint().mainBatch; + checkpoint = this.outbox.getBatchCheckpoints().mainBatch; } try { this._orderSequentiallyCalls++; @@ -3052,7 +3048,7 @@ export class ContainerRuntime } /** - * Are we in the middle of batching ops together? + * Typically ops are batched and later flushed together, but in some cases we want to flush immediately. */ private currentlyBatching() { return this.flushMode !== FlushMode.Immediate || this._orderSequentiallyCalls !== 0; @@ -4032,7 +4028,9 @@ export class ContainerRuntime this.outbox.submit(message); } - if (!this.currentlyBatching()) { + // Note: Technically, the system "always" batches - if this case is true we'll just have a single-message batch. + const flushImmediatelyOnSubmit = !this.currentlyBatching(); + if (flushImmediatelyOnSubmit) { this.flush(); } else { this.scheduleFlush(); @@ -4113,7 +4111,7 @@ export class ContainerRuntime } } - private reSubmitBatch(batch: IPendingBatchMessage[]) { + private reSubmitBatch(batch: PendingMessageResubmitData[]) { this.orderSequentially(() => { for (const message of batch) { this.reSubmit(message); @@ -4122,7 +4120,7 @@ export class ContainerRuntime this.flush(); } - private reSubmit(message: IPendingBatchMessage) { + private reSubmit(message: PendingMessageResubmitData) { // Need to parse from string for back-compat const containerRuntimeMessage = this.parseLocalOpContent(message.content); this.reSubmitCore(containerRuntimeMessage, message.localOpMetadata, message.opMetadata); diff --git a/packages/runtime/container-runtime/src/metadata.ts b/packages/runtime/container-runtime/src/metadata.ts index 7e7fca33946..7e451310c8f 100644 --- a/packages/runtime/container-runtime/src/metadata.ts +++ b/packages/runtime/container-runtime/src/metadata.ts @@ -3,6 +3,20 @@ * Licensed under the MIT License. */ +/** + * Does the metadata object look like batch metadata? + */ +export function isBatchMetadata(metadata: any): metadata is IBatchMetadata { + return typeof metadata?.batch === "boolean"; +} + +/** + * Cast the given metadata object to IBatchMetadata if it is so, otherwise yield undefined + */ +export function asBatchMetadata(metadata: unknown): IBatchMetadata | undefined { + return isBatchMetadata(metadata) ? metadata : undefined; +} + /** * Batching makes assumptions about what might be on the metadata. This interface codifies those assumptions, but does not validate them. */ diff --git a/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts b/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts index b05700e7baf..804e33965bd 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts @@ -99,7 +99,7 @@ export class BatchManager { public popBatch(): IBatch { const batch: IBatch = { - content: this.pendingBatch, + messages: this.pendingBatch, contentSizeInBytes: this.batchContentSize, referenceSequenceNumber: this.referenceSequenceNumber, hasReentrantOps: this.hasReentrantOps, @@ -134,13 +134,13 @@ export class BatchManager { } const addBatchMetadata = (batch: IBatch): IBatch => { - if (batch.content.length > 1) { - batch.content[0].metadata = { - ...batch.content[0].metadata, + if (batch.messages.length > 1) { + batch.messages[0].metadata = { + ...batch.messages[0].metadata, batch: true, }; - batch.content[batch.content.length - 1].metadata = { - ...batch.content[batch.content.length - 1].metadata, + batch.messages[batch.messages.length - 1].metadata = { + ...batch.messages[batch.messages.length - 1].metadata, batch: false, }; } @@ -157,7 +157,7 @@ const addBatchMetadata = (batch: IBatch): IBatch => { * @returns An estimate of the payload size in bytes which will be produced when the batch is sent over the wire */ export const estimateSocketSize = (batch: IBatch): number => { - return batch.contentSizeInBytes + opOverhead * batch.content.length; + return batch.contentSizeInBytes + opOverhead * batch.messages.length; }; export const sequenceNumbersMatch = ( diff --git a/packages/runtime/container-runtime/src/opLifecycle/definitions.ts b/packages/runtime/container-runtime/src/opLifecycle/definitions.ts index 6898c9b8fe5..4556bfe9285 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/definitions.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/definitions.ts @@ -28,7 +28,7 @@ export interface IBatch { /** * All the messages in the batch */ - readonly content: TMessages; + readonly messages: TMessages; /** * The reference sequence number for the batch */ diff --git a/packages/runtime/container-runtime/src/opLifecycle/opCompressor.ts b/packages/runtime/container-runtime/src/opLifecycle/opCompressor.ts index 5ab0ff1d8c9..6bf19bb5fc0 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/opCompressor.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/opCompressor.ts @@ -35,7 +35,7 @@ export class OpCompressor { */ public compressBatch(batch: IBatch): IBatch { assert( - batch.contentSizeInBytes > 0 && batch.content.length > 0, + batch.contentSizeInBytes > 0 && batch.messages.length > 0, 0x5a4 /* Batch should not be empty */, ); @@ -47,14 +47,14 @@ export class OpCompressor { const messages: BatchMessage[] = []; messages.push({ - ...batch.content[0], + ...batch.messages[0], contents: JSON.stringify({ packedContents: compressedContent }), - metadata: batch.content[0].metadata, + metadata: batch.messages[0].metadata, compression: CompressionAlgorithms.lz4, }); // Add empty placeholder messages to reserve the sequence numbers - for (const message of batch.content.slice(1)) { + for (const message of batch.messages.slice(1)) { messages.push({ localOpMetadata: message.localOpMetadata, metadata: message.metadata, @@ -64,7 +64,7 @@ export class OpCompressor { const compressedBatch: IBatch = { contentSizeInBytes: compressedContent.length, - content: messages, + messages, referenceSequenceNumber: batch.referenceSequenceNumber, }; @@ -74,7 +74,7 @@ export class OpCompressor { duration, sizeBeforeCompression: batch.contentSizeInBytes, sizeAfterCompression: compressedBatch.contentSizeInBytes, - opCount: compressedBatch.content.length, + opCount: compressedBatch.messages.length, socketSize: estimateSocketSize(compressedBatch), }); } @@ -88,7 +88,7 @@ export class OpCompressor { private serializeBatchContents(batch: IBatch): string { try { // Yields a valid JSON array, since each message.contents is already serialized to JSON - return `[${batch.content.map((message) => message.contents).join(",")}]`; + return `[${batch.messages.map(({ contents }) => contents).join(",")}]`; } catch (e: any) { if (e.message === "Invalid string length") { // This is how JSON.stringify signals that @@ -98,7 +98,7 @@ export class OpCompressor { { eventName: "BatchTooLarge", size: batch.contentSizeInBytes, - length: batch.content.length, + length: batch.messages.length, }, error, ); diff --git a/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts b/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts index b4f83e3b286..864917a182a 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts @@ -59,17 +59,17 @@ export class OpGroupingManager { public groupBatch(batch: IBatch): IBatch<[BatchMessage]> { assert(this.shouldGroup(batch), 0x946 /* cannot group the provided batch */); - if (batch.content.length >= 1000) { + if (batch.messages.length >= 1000) { this.logger.sendTelemetryEvent({ eventName: "GroupLargeBatch", - length: batch.content.length, + length: batch.messages.length, threshold: this.config.opCountThreshold, reentrant: batch.hasReentrantOps, - referenceSequenceNumber: batch.content[0].referenceSequenceNumber, + referenceSequenceNumber: batch.messages[0].referenceSequenceNumber, }); } - for (const message of batch.content) { + for (const message of batch.messages) { if (message.metadata) { const keys = Object.keys(message.metadata); assert(keys.length < 2, 0x5dd /* cannot group ops with metadata */); @@ -79,7 +79,7 @@ export class OpGroupingManager { const serializedContent = JSON.stringify({ type: OpGroupingManager.groupedBatchOp, - contents: batch.content.map((message) => ({ + contents: batch.messages.map((message) => ({ contents: message.contents === undefined ? undefined : JSON.parse(message.contents), metadata: message.metadata, compression: message.compression, @@ -88,10 +88,10 @@ export class OpGroupingManager { const groupedBatch: IBatch<[BatchMessage]> = { ...batch, - content: [ + messages: [ { metadata: undefined, - referenceSequenceNumber: batch.content[0].referenceSequenceNumber, + referenceSequenceNumber: batch.messages[0].referenceSequenceNumber, contents: serializedContent, }, ], @@ -118,7 +118,7 @@ export class OpGroupingManager { // Grouped batching must be enabled this.config.groupedBatchingEnabled && // The number of ops in the batch must surpass the configured threshold - batch.content.length >= this.config.opCountThreshold && + batch.messages.length >= this.config.opCountThreshold && // Support for reentrant batches must be explicitly enabled (this.config.reentrantBatchGroupingEnabled || batch.hasReentrantOps !== true) ); diff --git a/packages/runtime/container-runtime/src/opLifecycle/opSplitter.ts b/packages/runtime/container-runtime/src/opLifecycle/opSplitter.ts index 2b9ae72f228..766161ace86 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/opSplitter.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/opSplitter.ts @@ -121,7 +121,7 @@ export class OpSplitter { public splitFirstBatchMessage(batch: IBatch): IBatch { assert(this.isBatchChunkingEnabled, 0x513 /* Chunking needs to be enabled */); assert( - batch.contentSizeInBytes > 0 && batch.content.length > 0, + batch.contentSizeInBytes > 0 && batch.messages.length > 0, 0x514 /* Batch needs to be non-empty */, ); assert( @@ -134,13 +134,13 @@ export class OpSplitter { 0x516 /* Chunk size needs to be smaller than the max batch size */, ); - const firstMessage = batch.content[0]; // we expect this to be the large compressed op, which needs to be split + const firstMessage = batch.messages[0]; // we expect this to be the large compressed op, which needs to be split assert( (firstMessage.contents?.length ?? 0) >= this.chunkSizeInBytes, 0x518 /* First message in the batch needs to be chunkable */, ); - const restOfMessages = batch.content.slice(1); // we expect these to be empty ops, created to reserve sequence numbers + const restOfMessages = batch.messages.slice(1); // we expect these to be empty ops, created to reserve sequence numbers const socketSize = estimateSocketSize(batch); const chunks = splitOp( firstMessage, @@ -171,7 +171,7 @@ export class OpSplitter { this.logger.sendPerformanceEvent({ // Used to be "Chunked compressed batch" eventName: "CompressedChunkedBatch", - length: batch.content.length, + length: batch.messages.length, sizeInBytes: batch.contentSizeInBytes, chunks: chunks.length, chunkSizeInBytes: this.chunkSizeInBytes, @@ -179,7 +179,7 @@ export class OpSplitter { }); return { - content: [lastChunk, ...restOfMessages], + messages: [lastChunk, ...restOfMessages], contentSizeInBytes: lastChunk.contents?.length ?? 0, referenceSequenceNumber: batch.referenceSequenceNumber, }; diff --git a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts index 76b78f6d49c..0b5cd292f06 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts @@ -15,7 +15,7 @@ import { } from "@fluidframework/telemetry-utils/internal"; import { ICompressionRuntimeOptions } from "../containerRuntime.js"; -import { IPendingBatchMessage, PendingStateManager } from "../pendingStateManager.js"; +import { PendingMessageResubmitData, PendingStateManager } from "../pendingStateManager.js"; import { BatchManager, @@ -48,7 +48,7 @@ export interface IOutboxParameters { readonly logger: ITelemetryBaseLogger; readonly groupingManager: OpGroupingManager; readonly getCurrentSequenceNumbers: () => BatchSequenceNumbers; - readonly reSubmit: (message: IPendingBatchMessage) => void; + readonly reSubmit: (message: PendingMessageResubmitData) => void; readonly opReentrancy: () => boolean; readonly closeContainer: (error?: ICriticalContainerError) => void; } @@ -87,6 +87,13 @@ export function getLongStack(action: () => T, length: number = 50): T { } } +/** + * The Outbox collects messages submitted by the ContainerRuntime into a batch, + * and then flushes the batch when requested. + * + * @remarks There are actually multiple independent batches (some are for a specific message type), + * to support slight variation in semantics for each batch (e.g. support for rebasing or grouping). + */ export class Outbox { private readonly mc: MonitoringContext; private readonly mainBatch: BatchManager; @@ -269,7 +276,7 @@ export class Outbox { clientSequenceNumber = this.sendBatch(processedBatch); } - this.params.pendingStateManager.onFlushBatch(rawBatch.content, clientSequenceNumber); + this.params.pendingStateManager.onFlushBatch(rawBatch.messages, clientSequenceNumber); } /** @@ -283,7 +290,7 @@ export class Outbox { assert(batchManager.options.canRebase, 0x9a7 /* BatchManager does not support rebase */); this.rebasing = true; - for (const message of rawBatch.content) { + for (const message of rawBatch.messages) { this.params.reSubmit({ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion content: message.contents!, @@ -296,7 +303,7 @@ export class Outbox { this.mc.logger.sendTelemetryEvent( { eventName: "BatchRebase", - length: rawBatch.content.length, + length: rawBatch.messages.length, referenceSequenceNumber: rawBatch.referenceSequenceNumber, }, new UsageError("BatchRebase"), @@ -323,7 +330,7 @@ export class Outbox { */ private compressBatch(batch: IBatch): IBatch { if ( - batch.content.length === 0 || + batch.messages.length === 0 || this.params.config.compressionOptions === undefined || this.params.config.compressionOptions.minimumBatchSizeInBytes > batch.contentSizeInBytes || @@ -345,7 +352,7 @@ export class Outbox { throw new GenericError("BatchTooLarge", /* error */ undefined, { batchSize: batch.contentSizeInBytes, compressedBatchSize: compressedBatch.contentSizeInBytes, - count: compressedBatch.content.length, + count: compressedBatch.messages.length, limit: this.params.config.maxBatchSizeInBytes, chunkingEnabled: this.params.splitter.isBatchChunkingEnabled, compressionOptions: JSON.stringify(this.params.config.compressionOptions), @@ -363,7 +370,7 @@ export class Outbox { * @returns the clientSequenceNumber of the start of the batch, or undefined if nothing was sent */ private sendBatch(batch: IBatch) { - const length = batch.content.length; + const length = batch.messages.length; if (length === 0) { return undefined; // Nothing submitted } @@ -372,7 +379,7 @@ export class Outbox { if (socketSize >= this.params.config.maxBatchSizeInBytes) { this.mc.logger.sendPerformanceEvent({ eventName: "LargeBatch", - length: batch.content.length, + length: batch.messages.length, sizeInBytes: batch.contentSizeInBytes, socketSize, }); @@ -383,7 +390,7 @@ export class Outbox { // Legacy path - supporting old loader versions. Can be removed only when LTS moves above // version that has support for batches (submitBatchFn) assert( - batch.content[0].compression === undefined, + batch.messages[0].compression === undefined, 0x5a6 /* Compression should not have happened if the loader does not support it */, ); @@ -391,7 +398,7 @@ export class Outbox { } else { assert(batch.referenceSequenceNumber !== undefined, 0x58e /* Batch must not be empty */); clientSequenceNumber = this.params.submitBatchFn( - batch.content.map((message) => ({ + batch.messages.map((message) => ({ contents: message.contents, metadata: message.metadata, compression: message.compression, @@ -407,7 +414,10 @@ export class Outbox { return clientSequenceNumber; } - public checkpoint() { + /** + * @returns A checkpoint object per batch that facilitates iterating over the batch messages when rolling back. + */ + public getBatchCheckpoints() { // This variable is declared with a specific type so that we have a standard import of the IBatchCheckpoint type. // When the type is inferred, the generated .d.ts uses a dynamic import which doesn't resolve. const mainBatch: IBatchCheckpoint = this.mainBatch.checkpoint(); diff --git a/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts b/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts index fc19b73c146..1a7e118779d 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/remoteMessageProcessor.ts @@ -16,6 +16,7 @@ import { type InboundSequencedContainerRuntimeMessageOrSystemMessage, type InboundSequencedRecentlyAddedContainerRuntimeMessage, } from "../messageTypes.js"; +import { asBatchMetadata } from "../metadata.js"; import { OpDecompressor } from "./opDecompressor.js"; import { OpGroupingManager, isGroupedBatch } from "./opGroupingManager.js"; @@ -127,7 +128,7 @@ export class RemoteMessageProcessor { * the batch tracking info (this.batchStartCsn) based on whether we're still mid-batch. */ private getAndUpdateBatchStartCsn(message: ISequencedDocumentMessage): number { - const batchMetadataFlag = (message.metadata as { batch: boolean | undefined })?.batch; + const batchMetadataFlag = asBatchMetadata(message.metadata)?.batch; if (this.batchStartCsn === undefined) { // We are waiting for a new batch assert(batchMetadataFlag !== false, "Unexpected batch end marker"); diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index 0fe54af6c3d..085b9aef06c 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -16,7 +16,7 @@ import { import Deque from "double-ended-queue"; import { InboundSequencedContainerRuntimeMessage } from "./messageTypes.js"; -import { IBatchMetadata } from "./metadata.js"; +import { asBatchMetadata, IBatchMetadata } from "./metadata.js"; import type { BatchMessage } from "./opLifecycle/index.js"; import { pkgVersion } from "./packageVersion.js"; @@ -41,19 +41,18 @@ export interface IPendingLocalState { pendingStates: IPendingMessage[]; } -export interface IPendingBatchMessage { - content: string; - localOpMetadata: unknown; - opMetadata: Record | undefined; -} +/** Info needed to replay/resubmit a pending message */ +export type PendingMessageResubmitData = Pick< + IPendingMessage, + "content" | "localOpMetadata" | "opMetadata" +>; export interface IRuntimeStateHandler { connected(): boolean; clientId(): string | undefined; close(error?: ICriticalContainerError): void; applyStashedOp(content: string): Promise; - reSubmit(message: IPendingBatchMessage): void; - reSubmitBatch(batch: IPendingBatchMessage[]): void; + reSubmitBatch(batch: PendingMessageResubmitData[]): void; isActiveConnection: () => boolean; isAttached: () => boolean; } @@ -116,7 +115,8 @@ export class PendingStateManager implements IDisposable { // the correct batch metadata. private pendingBatchBeginMessage: ISequencedDocumentMessage | undefined; - private clientId: string | undefined; + /** Used to ensure we don't replay ops on the same connection twice */ + private clientIdFromLastReplay: string | undefined; /** * The pending messages count. Includes `pendingMessages` and `initialMessages` to keep in sync with @@ -406,10 +406,10 @@ export class PendingStateManager implements IDisposable { // This assert suggests we are about to send same ops twice, which will result in data loss. assert( - this.clientId !== this.stateHandler.clientId(), + this.clientIdFromLastReplay !== this.stateHandler.clientId(), 0x173 /* "replayPendingStates called twice for same clientId!" */, ); - this.clientId = this.stateHandler.clientId(); + this.clientIdFromLastReplay = this.stateHandler.clientId(); assert( this.initialMessages.isEmpty(), @@ -426,54 +426,58 @@ export class PendingStateManager implements IDisposable { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion let pendingMessage = this.pendingMessages.shift()!; remainingPendingMessagesCount--; - assert( - pendingMessage.opMetadata?.batch !== false, - 0x41b /* We cannot process batches in chunks */, - ); + + const batchMetadataFlag = asBatchMetadata(pendingMessage.opMetadata)?.batch; + assert(batchMetadataFlag !== false, 0x41b /* We cannot process batches in chunks */); /** - * We want to ensure grouped messages get processed in a batch. + * We must preserve the distinct batches on resubmit. * Note: It is not possible for the PendingStateManager to receive a partially acked batch. It will - * either receive the whole batch ack or nothing at all. + * either receive the whole batch ack or nothing at all. @see ScheduleManager for how this works. */ - if (pendingMessage.opMetadata?.batch) { - assert( - remainingPendingMessagesCount > 0, - 0x554 /* Last pending message cannot be a batch begin */, - ); - - const batch: IPendingBatchMessage[] = []; - - // check is >= because batch end may be last pending message - while (remainingPendingMessagesCount >= 0) { - batch.push({ + if (batchMetadataFlag === undefined) { + // Single-message batch + this.stateHandler.reSubmitBatch([ + { content: pendingMessage.content, localOpMetadata: pendingMessage.localOpMetadata, opMetadata: pendingMessage.opMetadata, - }); + }, + ]); + continue; + } + // else: batchMetadataFlag === true (It's a typical multi-message batch) - if (pendingMessage.opMetadata?.batch === false) { - break; - } - assert(remainingPendingMessagesCount > 0, 0x555 /* No batch end found */); + assert( + remainingPendingMessagesCount > 0, + 0x554 /* Last pending message cannot be a batch begin */, + ); - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - pendingMessage = this.pendingMessages.shift()!; - remainingPendingMessagesCount--; - assert( - pendingMessage.opMetadata?.batch !== true, - 0x556 /* Batch start needs a corresponding batch end */, - ); - } + const batch: PendingMessageResubmitData[] = []; - this.stateHandler.reSubmitBatch(batch); - } else { - this.stateHandler.reSubmit({ + // check is >= because batch end may be last pending message + while (remainingPendingMessagesCount >= 0) { + batch.push({ content: pendingMessage.content, localOpMetadata: pendingMessage.localOpMetadata, opMetadata: pendingMessage.opMetadata, }); + + if (pendingMessage.opMetadata?.batch === false) { + break; + } + assert(remainingPendingMessagesCount > 0, 0x555 /* No batch end found */); + + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + pendingMessage = this.pendingMessages.shift()!; + remainingPendingMessagesCount--; + assert( + pendingMessage.opMetadata?.batch !== true, + 0x556 /* Batch start needs a corresponding batch end */, + ); } + + this.stateHandler.reSubmitBatch(batch); } // pending ops should no longer depend on previous sequenced local ops after resubmit diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/OpGroupingManager.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/OpGroupingManager.spec.ts index e8c435757a6..c2c4d7be55b 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/OpGroupingManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/OpGroupingManager.spec.ts @@ -26,7 +26,7 @@ describe("OpGroupingManager", () => { hasReentrantOps, }); const messagesToBatch = (messages: BatchMessage[]): IBatch => ({ - content: messages, + messages, contentSizeInBytes: messages .map((message) => JSON.stringify(message).length) .reduce((a, b) => a + b), @@ -99,8 +99,8 @@ describe("OpGroupingManager", () => { }, mockLogger, ).groupBatch(createBatch(5)); - assert.strictEqual(result.content.length, 1); - assert.deepStrictEqual(result.content, [ + assert.strictEqual(result.messages.length, 1); + assert.deepStrictEqual(result.messages, [ { contents: '{"type":"groupedBatch","contents":[{"contents":0},{"contents":0},{"contents":0},{"contents":0},{"contents":0}]}', diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts index 9be4e938f17..a46d033339e 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/batchManager.spec.ts @@ -56,9 +56,9 @@ describe("BatchManager", () => { ); const batch = batchManager.popBatch(); - assert.equal(batch.content[0].metadata?.batch, true); - assert.equal(batch.content[1].metadata?.batch, undefined); - assert.equal(batch.content[2].metadata?.batch, false); + assert.equal(batch.messages[0].metadata?.batch, true); + assert.equal(batch.messages[1].metadata?.batch, undefined); + assert.equal(batch.messages[2].metadata?.batch, false); assert.equal( batchManager.push( @@ -68,7 +68,7 @@ describe("BatchManager", () => { true, ); const singleOpBatch = batchManager.popBatch(); - assert.equal(singleOpBatch.content[0].metadata?.batch, undefined); + assert.equal(singleOpBatch.messages[0].metadata?.batch, undefined); }); it("Batch content size is tracked correctly", () => { diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/opCompressor.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/opCompressor.spec.ts index 6bf26a5efd6..8ded4fc76ca 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/opCompressor.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/opCompressor.spec.ts @@ -21,7 +21,7 @@ describe("OpCompressor", () => { const createBatch = (length: number, messageSize: number) => messagesToBatch(new Array(length).fill(createMessage(generateStringOfSize(messageSize)))); const messagesToBatch = (messages: BatchMessage[]): IBatch => ({ - content: messages, + messages, contentSizeInBytes: messages .map((message) => JSON.stringify(message).length) .reduce((a, b) => a + b), @@ -48,17 +48,17 @@ describe("OpCompressor", () => { // large batch with small messages createBatch(1000, 100 * 1024), ].forEach((batch) => { - it(`Batch of ${batch.content.length} ops of total size ${toMB( + it(`Batch of ${batch.messages.length} ops of total size ${toMB( batch.contentSizeInBytes, )} MB`, () => { const compressedBatch = compressor.compressBatch(batch); - assert.strictEqual(compressedBatch.content.length, batch.content.length); - assert.strictEqual(compressedBatch.content[0].compression, "lz4"); - assert.strictEqual(compressedBatch.content[0].metadata?.flag, true); - if (compressedBatch.content.length > 1) { - assert.strictEqual(compressedBatch.content[1].contents, undefined); - assert.strictEqual(compressedBatch.content[1].compression, undefined); - assert.strictEqual(compressedBatch.content[1].contents, undefined); + assert.strictEqual(compressedBatch.messages.length, batch.messages.length); + assert.strictEqual(compressedBatch.messages[0].compression, "lz4"); + assert.strictEqual(compressedBatch.messages[0].metadata?.flag, true); + if (compressedBatch.messages.length > 1) { + assert.strictEqual(compressedBatch.messages[1].contents, undefined); + assert.strictEqual(compressedBatch.messages[1].compression, undefined); + assert.strictEqual(compressedBatch.messages[1].contents, undefined); } }).timeout(3000); })); @@ -70,7 +70,7 @@ describe("OpCompressor", () => { // small batch with large messages createBatch(6, 100 * 1024 * 1024), ].forEach((batch) => { - it(`Not compressing batch of ${batch.content.length} ops of total size ${toMB( + it(`Not compressing batch of ${batch.messages.length} ops of total size ${toMB( batch.contentSizeInBytes, )} MB`, () => { assert.throws(() => compressor.compressBatch(batch)); @@ -78,7 +78,7 @@ describe("OpCompressor", () => { { eventName: "OpCompressor:BatchTooLarge", category: "error", - length: batch.content.length, + length: batch.messages.length, size: batch.contentSizeInBytes, }, ]); diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/opSplitter.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/opSplitter.spec.ts index 8d5cee7e2ef..015967a649c 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/opSplitter.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/opSplitter.spec.ts @@ -239,7 +239,7 @@ describe("OpSplitter", () => { // Empty batch assert.throws(() => opSplitter.splitFirstBatchMessage({ - content: [compressedMessage], + messages: [compressedMessage], contentSizeInBytes: 0, referenceSequenceNumber: 0, }), @@ -248,7 +248,7 @@ describe("OpSplitter", () => { // Empty batch assert.throws(() => opSplitter.splitFirstBatchMessage({ - content: [], + messages: [], contentSizeInBytes: 1, referenceSequenceNumber: 0, }), @@ -257,7 +257,7 @@ describe("OpSplitter", () => { // Batch is too small to be chunked assert.throws(() => opSplitter.splitFirstBatchMessage({ - content: [compressedMessage], + messages: [compressedMessage], contentSizeInBytes: 1, referenceSequenceNumber: 0, }), @@ -266,7 +266,7 @@ describe("OpSplitter", () => { // Batch is not compressed assert.throws(() => opSplitter.splitFirstBatchMessage({ - content: [regularMessage], + messages: [regularMessage], contentSizeInBytes: 3, referenceSequenceNumber: 0, }), @@ -281,7 +281,7 @@ describe("OpSplitter", () => { maxBatchSizeInBytes, mockLogger, ).splitFirstBatchMessage({ - content: [compressedMessage], + messages: [compressedMessage], contentSizeInBytes: 3, referenceSequenceNumber: 0, }), @@ -291,7 +291,7 @@ describe("OpSplitter", () => { assert.throws(() => new OpSplitter([], undefined, 0, maxBatchSizeInBytes, mockLogger).splitFirstBatchMessage( { - content: [compressedMessage], + messages: [compressedMessage], contentSizeInBytes: 3, referenceSequenceNumber: 0, }, @@ -301,7 +301,7 @@ describe("OpSplitter", () => { // Misconfigured op splitter assert.throws(() => new OpSplitter([], mockSubmitBatchFn, 2, 1, mockLogger).splitFirstBatchMessage({ - content: [compressedMessage], + messages: [compressedMessage], contentSizeInBytes: 3, referenceSequenceNumber: 0, }), @@ -316,7 +316,7 @@ describe("OpSplitter", () => { maxBatchSizeInBytes, mockLogger, ).splitFirstBatchMessage({ - content: [compressedMessage], + messages: [compressedMessage], contentSizeInBytes: 3, referenceSequenceNumber: 0, }), @@ -340,7 +340,7 @@ describe("OpSplitter", () => { const emptyMessage = generateChunkableOp(0); const result = opSplitter.splitFirstBatchMessage({ - content: [largeMessage, emptyMessage, emptyMessage, emptyMessage], + messages: [largeMessage, emptyMessage, emptyMessage, emptyMessage], contentSizeInBytes: largeMessage.contents?.length ?? 0, referenceSequenceNumber: 0, }); @@ -352,12 +352,13 @@ describe("OpSplitter", () => { assert.equal(batch.referenceSequenceNumber, 0); } - assert.equal(result.content.length, 4); - const lastChunk = JSON.parse(result.content[0].contents!).contents as IChunkedOp; + assert.equal(result.messages.length, 4); + const lastChunk = JSON.parse(result.messages[0].contents!).contents as IChunkedOp; assert.equal(lastChunk.chunkId, lastChunk.totalChunks); - assert.deepStrictEqual(result.content.slice(1), new Array(3).fill(emptyMessage)); + assert.deepStrictEqual(result.messages.slice(1), new Array(3).fill(emptyMessage)); assert.equal( - !extraOp || JSON.parse(result.content[0].contents!).contents?.contents?.length === 0, + !extraOp || + JSON.parse(result.messages[0].contents!).contents?.contents?.length === 0, true, ); assert.notEqual(result.contentSizeInBytes, largeMessage.contents?.length ?? 0); @@ -375,7 +376,7 @@ describe("OpSplitter", () => { mockLogger.matchEvents([ { eventName: "OpSplitter:CompressedChunkedBatch", - length: result.content.length, + length: result.messages.length, chunks: 100 / 20 + 1 + (extraOp ? 1 : 0), chunkSizeInBytes: 20, }, @@ -397,7 +398,7 @@ describe("OpSplitter", () => { const largeMessage = generateChunkableOp(100); const result = opSplitter.splitFirstBatchMessage({ - content: [largeMessage], + messages: [largeMessage], contentSizeInBytes: largeMessage.contents?.length ?? 0, referenceSequenceNumber: 0, }); @@ -409,12 +410,13 @@ describe("OpSplitter", () => { assert.equal(batch.referenceSequenceNumber, 0); } - assert.equal(result.content.length, 1); + assert.equal(result.messages.length, 1); assert.notEqual(result.contentSizeInBytes, largeMessage.contents?.length ?? 0); - const lastChunk = JSON.parse(result.content[0].contents!).contents as IChunkedOp; + const lastChunk = JSON.parse(result.messages[0].contents!).contents as IChunkedOp; assert.equal(lastChunk.chunkId, lastChunk.totalChunks); assert.equal( - !extraOp || JSON.parse(result.content[0].contents!).contents?.contents?.length === 0, + !extraOp || + JSON.parse(result.messages[0].contents!).contents?.contents?.length === 0, true, ); assert.notEqual(result.contentSizeInBytes, largeMessage.contents?.length ?? 0); @@ -432,7 +434,7 @@ describe("OpSplitter", () => { mockLogger.matchEvents([ { eventName: "OpSplitter:CompressedChunkedBatch", - length: result.content.length, + length: result.messages.length, chunks: 100 / 20 + 1 + (extraOp ? 1 : 0), chunkSizeInBytes: 20, }, diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts index 8610aa1c10e..b1d964c4327 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts @@ -35,7 +35,7 @@ import { Outbox, } from "../../opLifecycle/index.js"; import { - IPendingBatchMessage, + PendingMessageResubmitData, PendingStateManager, type IPendingMessage, } from "../../pendingStateManager.js"; @@ -174,7 +174,7 @@ describe("Outbox", () => { return messages; }; const toBatch = (messages: BatchMessage[]): IBatch => ({ - content: addBatchMetadata(messages), + messages: addBatchMetadata(messages), contentSizeInBytes: messages .map((message) => message.contents?.length ?? 0) .reduce((a, b) => a + b, 0), @@ -228,7 +228,7 @@ describe("Outbox", () => { mockLogger, ), getCurrentSequenceNumbers: () => currentSeqNumbers, - reSubmit: (message: IPendingBatchMessage) => { + reSubmit: (message: PendingMessageResubmitData) => { state.opsResubmitted++; }, opReentrancy: () => state.isReentrant, diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts index 5c3e618b83e..f945a722ba4 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/remoteMessageProcessor.spec.ts @@ -110,7 +110,7 @@ describe("RemoteMessageProcessor", () => { let batch: IBatch = { contentSizeInBytes: 1, referenceSequenceNumber: Infinity, - content: [ + messages: [ getOutboundMessage("a", true), getOutboundMessage("b"), getOutboundMessage("c"), @@ -154,7 +154,7 @@ describe("RemoteMessageProcessor", () => { } } let startSeqNum = outboundMessages.length + 1; - outboundMessages.push(...batch.content); + outboundMessages.push(...batch.messages); const messageProcessor = getMessageProcessor(); const actual: ISequencedDocumentMessage[] = []; diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index ad5ffadbab7..0b5a17aefb5 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -140,7 +140,6 @@ describe("Pending State Manager", () => { clientId: () => "oldClientId", close: (error?: ICriticalContainerError) => (closeError = error), connected: () => true, - reSubmit: () => {}, reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, @@ -361,7 +360,6 @@ describe("Pending State Manager", () => { clientId: () => undefined, close: () => {}, connected: () => true, - reSubmit: () => {}, reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, @@ -450,7 +448,6 @@ describe("Pending State Manager", () => { clientId: () => undefined, close: () => {}, connected: () => true, - reSubmit: () => {}, reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, @@ -583,7 +580,6 @@ describe("Pending State Manager", () => { clientId: () => "123", close: () => {}, connected: () => true, - reSubmit: () => {}, reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true,