PendingStateManager: Light refactoring/renaming (prep for BatchId) (#21738)

Some light refactoring:
* Rename `IPendingBatchMessage` to `PendingMessageResubmitData` to
better reflect its purpose
* Rename `IBatch.content` to `IBatch.messages`
* Add/use helper for strongly typed access to batch metadata
* Remove redundant `reSubmit` callback for PSM's use; it can/should
always use `reSubmitBatch`.
* Everything is always batches in Turn-Based mode, and there's no harm
in treating it the same for Immediate mode.
* There's no functional change here in this PR, thanks to [this bug
fix](https://github.com/markfields/FluidFramework/blob/main/packages/runtime/container-runtime/src/containerRuntime.ts#L1555)
a few months back.
This commit is contained in:
Mark Fields 2024-07-02 14:20:28 -07:00 коммит произвёл GitHub
Родитель 1dbe736b4c
Коммит 68feb6c6f2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
17 изменённых файлов: 168 добавлений и 143 удалений

Просмотреть файл

@ -181,7 +181,7 @@ import {
} from "./opLifecycle/index.js";
import { pkgVersion } from "./packageVersion.js";
import {
IPendingBatchMessage,
PendingMessageResubmitData,
IPendingLocalState,
PendingStateManager,
} from "./pendingStateManager.js";
@ -668,7 +668,7 @@ export const makeLegacySendBatchFn =
(batch: IBatch) => {
// Default to negative one to match Container.submitBatch behavior
let clientSequenceNumber: number = -1;
for (const message of batch.content) {
for (const message of batch.messages) {
clientSequenceNumber = submitFn(
MessageType.Operation,
// For back-compat (submitFn only works on deserialized content)
@ -1549,10 +1549,6 @@ export class ContainerRuntime
clientId: () => this.clientId,
close: this.closeFn,
connected: () => this.connected,
reSubmit: (message: IPendingBatchMessage) => {
this.reSubmit(message);
this.flush();
},
reSubmitBatch: this.reSubmitBatch.bind(this),
isActiveConnection: () => this.innerDeltaManager.active,
isAttached: () => this.attachState !== AttachState.Detached,
@ -2909,7 +2905,7 @@ export class ContainerRuntime
// Note: we are not touching any batches other than mainBatch here, for two reasons:
// 1. It would not help, as other batches are flushed independently from main batch.
// 2. There is no way to undo process of data store creation, blob creation, ID compressor ops, or other things tracked by other batches.
checkpoint = this.outbox.checkpoint().mainBatch;
checkpoint = this.outbox.getBatchCheckpoints().mainBatch;
}
try {
this._orderSequentiallyCalls++;
@ -3052,7 +3048,7 @@ export class ContainerRuntime
}
/**
* Are we in the middle of batching ops together?
* Typically ops are batched and later flushed together, but in some cases we want to flush immediately.
*/
private currentlyBatching() {
return this.flushMode !== FlushMode.Immediate || this._orderSequentiallyCalls !== 0;
@ -4032,7 +4028,9 @@ export class ContainerRuntime
this.outbox.submit(message);
}
if (!this.currentlyBatching()) {
// Note: Technically, the system "always" batches - if this case is true we'll just have a single-message batch.
const flushImmediatelyOnSubmit = !this.currentlyBatching();
if (flushImmediatelyOnSubmit) {
this.flush();
} else {
this.scheduleFlush();
@ -4113,7 +4111,7 @@ export class ContainerRuntime
}
}
private reSubmitBatch(batch: IPendingBatchMessage[]) {
private reSubmitBatch(batch: PendingMessageResubmitData[]) {
this.orderSequentially(() => {
for (const message of batch) {
this.reSubmit(message);
@ -4122,7 +4120,7 @@ export class ContainerRuntime
this.flush();
}
private reSubmit(message: IPendingBatchMessage) {
private reSubmit(message: PendingMessageResubmitData) {
// Need to parse from string for back-compat
const containerRuntimeMessage = this.parseLocalOpContent(message.content);
this.reSubmitCore(containerRuntimeMessage, message.localOpMetadata, message.opMetadata);

Просмотреть файл

@ -3,6 +3,20 @@
* Licensed under the MIT License.
*/
/**
* Does the metadata object look like batch metadata?
*/
export function isBatchMetadata(metadata: any): metadata is IBatchMetadata {
return typeof metadata?.batch === "boolean";
}
/**
* Cast the given metadata object to IBatchMetadata if it is so, otherwise yield undefined
*/
export function asBatchMetadata(metadata: unknown): IBatchMetadata | undefined {
return isBatchMetadata(metadata) ? metadata : undefined;
}
/**
* Batching makes assumptions about what might be on the metadata. This interface codifies those assumptions, but does not validate them.
*/

Просмотреть файл

@ -99,7 +99,7 @@ export class BatchManager {
public popBatch(): IBatch {
const batch: IBatch = {
content: this.pendingBatch,
messages: this.pendingBatch,
contentSizeInBytes: this.batchContentSize,
referenceSequenceNumber: this.referenceSequenceNumber,
hasReentrantOps: this.hasReentrantOps,
@ -134,13 +134,13 @@ export class BatchManager {
}
const addBatchMetadata = (batch: IBatch): IBatch => {
if (batch.content.length > 1) {
batch.content[0].metadata = {
...batch.content[0].metadata,
if (batch.messages.length > 1) {
batch.messages[0].metadata = {
...batch.messages[0].metadata,
batch: true,
};
batch.content[batch.content.length - 1].metadata = {
...batch.content[batch.content.length - 1].metadata,
batch.messages[batch.messages.length - 1].metadata = {
...batch.messages[batch.messages.length - 1].metadata,
batch: false,
};
}
@ -157,7 +157,7 @@ const addBatchMetadata = (batch: IBatch): IBatch => {
* @returns An estimate of the payload size in bytes which will be produced when the batch is sent over the wire
*/
export const estimateSocketSize = (batch: IBatch): number => {
return batch.contentSizeInBytes + opOverhead * batch.content.length;
return batch.contentSizeInBytes + opOverhead * batch.messages.length;
};
export const sequenceNumbersMatch = (

Просмотреть файл

@ -28,7 +28,7 @@ export interface IBatch<TMessages extends BatchMessage[] = BatchMessage[]> {
/**
* All the messages in the batch
*/
readonly content: TMessages;
readonly messages: TMessages;
/**
* The reference sequence number for the batch
*/

Просмотреть файл

@ -35,7 +35,7 @@ export class OpCompressor {
*/
public compressBatch(batch: IBatch): IBatch {
assert(
batch.contentSizeInBytes > 0 && batch.content.length > 0,
batch.contentSizeInBytes > 0 && batch.messages.length > 0,
0x5a4 /* Batch should not be empty */,
);
@ -47,14 +47,14 @@ export class OpCompressor {
const messages: BatchMessage[] = [];
messages.push({
...batch.content[0],
...batch.messages[0],
contents: JSON.stringify({ packedContents: compressedContent }),
metadata: batch.content[0].metadata,
metadata: batch.messages[0].metadata,
compression: CompressionAlgorithms.lz4,
});
// Add empty placeholder messages to reserve the sequence numbers
for (const message of batch.content.slice(1)) {
for (const message of batch.messages.slice(1)) {
messages.push({
localOpMetadata: message.localOpMetadata,
metadata: message.metadata,
@ -64,7 +64,7 @@ export class OpCompressor {
const compressedBatch: IBatch = {
contentSizeInBytes: compressedContent.length,
content: messages,
messages,
referenceSequenceNumber: batch.referenceSequenceNumber,
};
@ -74,7 +74,7 @@ export class OpCompressor {
duration,
sizeBeforeCompression: batch.contentSizeInBytes,
sizeAfterCompression: compressedBatch.contentSizeInBytes,
opCount: compressedBatch.content.length,
opCount: compressedBatch.messages.length,
socketSize: estimateSocketSize(compressedBatch),
});
}
@ -88,7 +88,7 @@ export class OpCompressor {
private serializeBatchContents(batch: IBatch): string {
try {
// Yields a valid JSON array, since each message.contents is already serialized to JSON
return `[${batch.content.map((message) => message.contents).join(",")}]`;
return `[${batch.messages.map(({ contents }) => contents).join(",")}]`;
} catch (e: any) {
if (e.message === "Invalid string length") {
// This is how JSON.stringify signals that
@ -98,7 +98,7 @@ export class OpCompressor {
{
eventName: "BatchTooLarge",
size: batch.contentSizeInBytes,
length: batch.content.length,
length: batch.messages.length,
},
error,
);

Просмотреть файл

@ -59,17 +59,17 @@ export class OpGroupingManager {
public groupBatch(batch: IBatch): IBatch<[BatchMessage]> {
assert(this.shouldGroup(batch), 0x946 /* cannot group the provided batch */);
if (batch.content.length >= 1000) {
if (batch.messages.length >= 1000) {
this.logger.sendTelemetryEvent({
eventName: "GroupLargeBatch",
length: batch.content.length,
length: batch.messages.length,
threshold: this.config.opCountThreshold,
reentrant: batch.hasReentrantOps,
referenceSequenceNumber: batch.content[0].referenceSequenceNumber,
referenceSequenceNumber: batch.messages[0].referenceSequenceNumber,
});
}
for (const message of batch.content) {
for (const message of batch.messages) {
if (message.metadata) {
const keys = Object.keys(message.metadata);
assert(keys.length < 2, 0x5dd /* cannot group ops with metadata */);
@ -79,7 +79,7 @@ export class OpGroupingManager {
const serializedContent = JSON.stringify({
type: OpGroupingManager.groupedBatchOp,
contents: batch.content.map<IGroupedMessage>((message) => ({
contents: batch.messages.map<IGroupedMessage>((message) => ({
contents: message.contents === undefined ? undefined : JSON.parse(message.contents),
metadata: message.metadata,
compression: message.compression,
@ -88,10 +88,10 @@ export class OpGroupingManager {
const groupedBatch: IBatch<[BatchMessage]> = {
...batch,
content: [
messages: [
{
metadata: undefined,
referenceSequenceNumber: batch.content[0].referenceSequenceNumber,
referenceSequenceNumber: batch.messages[0].referenceSequenceNumber,
contents: serializedContent,
},
],
@ -118,7 +118,7 @@ export class OpGroupingManager {
// Grouped batching must be enabled
this.config.groupedBatchingEnabled &&
// The number of ops in the batch must surpass the configured threshold
batch.content.length >= this.config.opCountThreshold &&
batch.messages.length >= this.config.opCountThreshold &&
// Support for reentrant batches must be explicitly enabled
(this.config.reentrantBatchGroupingEnabled || batch.hasReentrantOps !== true)
);

Просмотреть файл

@ -121,7 +121,7 @@ export class OpSplitter {
public splitFirstBatchMessage(batch: IBatch): IBatch {
assert(this.isBatchChunkingEnabled, 0x513 /* Chunking needs to be enabled */);
assert(
batch.contentSizeInBytes > 0 && batch.content.length > 0,
batch.contentSizeInBytes > 0 && batch.messages.length > 0,
0x514 /* Batch needs to be non-empty */,
);
assert(
@ -134,13 +134,13 @@ export class OpSplitter {
0x516 /* Chunk size needs to be smaller than the max batch size */,
);
const firstMessage = batch.content[0]; // we expect this to be the large compressed op, which needs to be split
const firstMessage = batch.messages[0]; // we expect this to be the large compressed op, which needs to be split
assert(
(firstMessage.contents?.length ?? 0) >= this.chunkSizeInBytes,
0x518 /* First message in the batch needs to be chunkable */,
);
const restOfMessages = batch.content.slice(1); // we expect these to be empty ops, created to reserve sequence numbers
const restOfMessages = batch.messages.slice(1); // we expect these to be empty ops, created to reserve sequence numbers
const socketSize = estimateSocketSize(batch);
const chunks = splitOp(
firstMessage,
@ -171,7 +171,7 @@ export class OpSplitter {
this.logger.sendPerformanceEvent({
// Used to be "Chunked compressed batch"
eventName: "CompressedChunkedBatch",
length: batch.content.length,
length: batch.messages.length,
sizeInBytes: batch.contentSizeInBytes,
chunks: chunks.length,
chunkSizeInBytes: this.chunkSizeInBytes,
@ -179,7 +179,7 @@ export class OpSplitter {
});
return {
content: [lastChunk, ...restOfMessages],
messages: [lastChunk, ...restOfMessages],
contentSizeInBytes: lastChunk.contents?.length ?? 0,
referenceSequenceNumber: batch.referenceSequenceNumber,
};

Просмотреть файл

@ -15,7 +15,7 @@ import {
} from "@fluidframework/telemetry-utils/internal";
import { ICompressionRuntimeOptions } from "../containerRuntime.js";
import { IPendingBatchMessage, PendingStateManager } from "../pendingStateManager.js";
import { PendingMessageResubmitData, PendingStateManager } from "../pendingStateManager.js";
import {
BatchManager,
@ -48,7 +48,7 @@ export interface IOutboxParameters {
readonly logger: ITelemetryBaseLogger;
readonly groupingManager: OpGroupingManager;
readonly getCurrentSequenceNumbers: () => BatchSequenceNumbers;
readonly reSubmit: (message: IPendingBatchMessage) => void;
readonly reSubmit: (message: PendingMessageResubmitData) => void;
readonly opReentrancy: () => boolean;
readonly closeContainer: (error?: ICriticalContainerError) => void;
}
@ -87,6 +87,13 @@ export function getLongStack<T>(action: () => T, length: number = 50): T {
}
}
/**
* The Outbox collects messages submitted by the ContainerRuntime into a batch,
* and then flushes the batch when requested.
*
* @remarks There are actually multiple independent batches (some are for a specific message type),
* to support slight variation in semantics for each batch (e.g. support for rebasing or grouping).
*/
export class Outbox {
private readonly mc: MonitoringContext;
private readonly mainBatch: BatchManager;
@ -269,7 +276,7 @@ export class Outbox {
clientSequenceNumber = this.sendBatch(processedBatch);
}
this.params.pendingStateManager.onFlushBatch(rawBatch.content, clientSequenceNumber);
this.params.pendingStateManager.onFlushBatch(rawBatch.messages, clientSequenceNumber);
}
/**
@ -283,7 +290,7 @@ export class Outbox {
assert(batchManager.options.canRebase, 0x9a7 /* BatchManager does not support rebase */);
this.rebasing = true;
for (const message of rawBatch.content) {
for (const message of rawBatch.messages) {
this.params.reSubmit({
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
content: message.contents!,
@ -296,7 +303,7 @@ export class Outbox {
this.mc.logger.sendTelemetryEvent(
{
eventName: "BatchRebase",
length: rawBatch.content.length,
length: rawBatch.messages.length,
referenceSequenceNumber: rawBatch.referenceSequenceNumber,
},
new UsageError("BatchRebase"),
@ -323,7 +330,7 @@ export class Outbox {
*/
private compressBatch(batch: IBatch): IBatch {
if (
batch.content.length === 0 ||
batch.messages.length === 0 ||
this.params.config.compressionOptions === undefined ||
this.params.config.compressionOptions.minimumBatchSizeInBytes >
batch.contentSizeInBytes ||
@ -345,7 +352,7 @@ export class Outbox {
throw new GenericError("BatchTooLarge", /* error */ undefined, {
batchSize: batch.contentSizeInBytes,
compressedBatchSize: compressedBatch.contentSizeInBytes,
count: compressedBatch.content.length,
count: compressedBatch.messages.length,
limit: this.params.config.maxBatchSizeInBytes,
chunkingEnabled: this.params.splitter.isBatchChunkingEnabled,
compressionOptions: JSON.stringify(this.params.config.compressionOptions),
@ -363,7 +370,7 @@ export class Outbox {
* @returns the clientSequenceNumber of the start of the batch, or undefined if nothing was sent
*/
private sendBatch(batch: IBatch) {
const length = batch.content.length;
const length = batch.messages.length;
if (length === 0) {
return undefined; // Nothing submitted
}
@ -372,7 +379,7 @@ export class Outbox {
if (socketSize >= this.params.config.maxBatchSizeInBytes) {
this.mc.logger.sendPerformanceEvent({
eventName: "LargeBatch",
length: batch.content.length,
length: batch.messages.length,
sizeInBytes: batch.contentSizeInBytes,
socketSize,
});
@ -383,7 +390,7 @@ export class Outbox {
// Legacy path - supporting old loader versions. Can be removed only when LTS moves above
// version that has support for batches (submitBatchFn)
assert(
batch.content[0].compression === undefined,
batch.messages[0].compression === undefined,
0x5a6 /* Compression should not have happened if the loader does not support it */,
);
@ -391,7 +398,7 @@ export class Outbox {
} else {
assert(batch.referenceSequenceNumber !== undefined, 0x58e /* Batch must not be empty */);
clientSequenceNumber = this.params.submitBatchFn(
batch.content.map((message) => ({
batch.messages.map<IBatchMessage>((message) => ({
contents: message.contents,
metadata: message.metadata,
compression: message.compression,
@ -407,7 +414,10 @@ export class Outbox {
return clientSequenceNumber;
}
public checkpoint() {
/**
* @returns A checkpoint object per batch that facilitates iterating over the batch messages when rolling back.
*/
public getBatchCheckpoints() {
// This variable is declared with a specific type so that we have a standard import of the IBatchCheckpoint type.
// When the type is inferred, the generated .d.ts uses a dynamic import which doesn't resolve.
const mainBatch: IBatchCheckpoint = this.mainBatch.checkpoint();

Просмотреть файл

@ -16,6 +16,7 @@ import {
type InboundSequencedContainerRuntimeMessageOrSystemMessage,
type InboundSequencedRecentlyAddedContainerRuntimeMessage,
} from "../messageTypes.js";
import { asBatchMetadata } from "../metadata.js";
import { OpDecompressor } from "./opDecompressor.js";
import { OpGroupingManager, isGroupedBatch } from "./opGroupingManager.js";
@ -127,7 +128,7 @@ export class RemoteMessageProcessor {
* the batch tracking info (this.batchStartCsn) based on whether we're still mid-batch.
*/
private getAndUpdateBatchStartCsn(message: ISequencedDocumentMessage): number {
const batchMetadataFlag = (message.metadata as { batch: boolean | undefined })?.batch;
const batchMetadataFlag = asBatchMetadata(message.metadata)?.batch;
if (this.batchStartCsn === undefined) {
// We are waiting for a new batch
assert(batchMetadataFlag !== false, "Unexpected batch end marker");

Просмотреть файл

@ -16,7 +16,7 @@ import {
import Deque from "double-ended-queue";
import { InboundSequencedContainerRuntimeMessage } from "./messageTypes.js";
import { IBatchMetadata } from "./metadata.js";
import { asBatchMetadata, IBatchMetadata } from "./metadata.js";
import type { BatchMessage } from "./opLifecycle/index.js";
import { pkgVersion } from "./packageVersion.js";
@ -41,19 +41,18 @@ export interface IPendingLocalState {
pendingStates: IPendingMessage[];
}
export interface IPendingBatchMessage {
content: string;
localOpMetadata: unknown;
opMetadata: Record<string, unknown> | undefined;
}
/** Info needed to replay/resubmit a pending message */
export type PendingMessageResubmitData = Pick<
IPendingMessage,
"content" | "localOpMetadata" | "opMetadata"
>;
export interface IRuntimeStateHandler {
connected(): boolean;
clientId(): string | undefined;
close(error?: ICriticalContainerError): void;
applyStashedOp(content: string): Promise<unknown>;
reSubmit(message: IPendingBatchMessage): void;
reSubmitBatch(batch: IPendingBatchMessage[]): void;
reSubmitBatch(batch: PendingMessageResubmitData[]): void;
isActiveConnection: () => boolean;
isAttached: () => boolean;
}
@ -116,7 +115,8 @@ export class PendingStateManager implements IDisposable {
// the correct batch metadata.
private pendingBatchBeginMessage: ISequencedDocumentMessage | undefined;
private clientId: string | undefined;
/** Used to ensure we don't replay ops on the same connection twice */
private clientIdFromLastReplay: string | undefined;
/**
* The pending messages count. Includes `pendingMessages` and `initialMessages` to keep in sync with
@ -406,10 +406,10 @@ export class PendingStateManager implements IDisposable {
// This assert suggests we are about to send same ops twice, which will result in data loss.
assert(
this.clientId !== this.stateHandler.clientId(),
this.clientIdFromLastReplay !== this.stateHandler.clientId(),
0x173 /* "replayPendingStates called twice for same clientId!" */,
);
this.clientId = this.stateHandler.clientId();
this.clientIdFromLastReplay = this.stateHandler.clientId();
assert(
this.initialMessages.isEmpty(),
@ -426,54 +426,58 @@ export class PendingStateManager implements IDisposable {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
let pendingMessage = this.pendingMessages.shift()!;
remainingPendingMessagesCount--;
assert(
pendingMessage.opMetadata?.batch !== false,
0x41b /* We cannot process batches in chunks */,
);
const batchMetadataFlag = asBatchMetadata(pendingMessage.opMetadata)?.batch;
assert(batchMetadataFlag !== false, 0x41b /* We cannot process batches in chunks */);
/**
* We want to ensure grouped messages get processed in a batch.
* We must preserve the distinct batches on resubmit.
* Note: It is not possible for the PendingStateManager to receive a partially acked batch. It will
* either receive the whole batch ack or nothing at all.
* either receive the whole batch ack or nothing at all. @see ScheduleManager for how this works.
*/
if (pendingMessage.opMetadata?.batch) {
assert(
remainingPendingMessagesCount > 0,
0x554 /* Last pending message cannot be a batch begin */,
);
const batch: IPendingBatchMessage[] = [];
// check is >= because batch end may be last pending message
while (remainingPendingMessagesCount >= 0) {
batch.push({
if (batchMetadataFlag === undefined) {
// Single-message batch
this.stateHandler.reSubmitBatch([
{
content: pendingMessage.content,
localOpMetadata: pendingMessage.localOpMetadata,
opMetadata: pendingMessage.opMetadata,
});
},
]);
continue;
}
// else: batchMetadataFlag === true (It's a typical multi-message batch)
if (pendingMessage.opMetadata?.batch === false) {
break;
}
assert(remainingPendingMessagesCount > 0, 0x555 /* No batch end found */);
assert(
remainingPendingMessagesCount > 0,
0x554 /* Last pending message cannot be a batch begin */,
);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
pendingMessage = this.pendingMessages.shift()!;
remainingPendingMessagesCount--;
assert(
pendingMessage.opMetadata?.batch !== true,
0x556 /* Batch start needs a corresponding batch end */,
);
}
const batch: PendingMessageResubmitData[] = [];
this.stateHandler.reSubmitBatch(batch);
} else {
this.stateHandler.reSubmit({
// check is >= because batch end may be last pending message
while (remainingPendingMessagesCount >= 0) {
batch.push({
content: pendingMessage.content,
localOpMetadata: pendingMessage.localOpMetadata,
opMetadata: pendingMessage.opMetadata,
});
if (pendingMessage.opMetadata?.batch === false) {
break;
}
assert(remainingPendingMessagesCount > 0, 0x555 /* No batch end found */);
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
pendingMessage = this.pendingMessages.shift()!;
remainingPendingMessagesCount--;
assert(
pendingMessage.opMetadata?.batch !== true,
0x556 /* Batch start needs a corresponding batch end */,
);
}
this.stateHandler.reSubmitBatch(batch);
}
// pending ops should no longer depend on previous sequenced local ops after resubmit

Просмотреть файл

@ -26,7 +26,7 @@ describe("OpGroupingManager", () => {
hasReentrantOps,
});
const messagesToBatch = (messages: BatchMessage[]): IBatch => ({
content: messages,
messages,
contentSizeInBytes: messages
.map((message) => JSON.stringify(message).length)
.reduce((a, b) => a + b),
@ -99,8 +99,8 @@ describe("OpGroupingManager", () => {
},
mockLogger,
).groupBatch(createBatch(5));
assert.strictEqual(result.content.length, 1);
assert.deepStrictEqual(result.content, [
assert.strictEqual(result.messages.length, 1);
assert.deepStrictEqual(result.messages, [
{
contents:
'{"type":"groupedBatch","contents":[{"contents":0},{"contents":0},{"contents":0},{"contents":0},{"contents":0}]}',

Просмотреть файл

@ -56,9 +56,9 @@ describe("BatchManager", () => {
);
const batch = batchManager.popBatch();
assert.equal(batch.content[0].metadata?.batch, true);
assert.equal(batch.content[1].metadata?.batch, undefined);
assert.equal(batch.content[2].metadata?.batch, false);
assert.equal(batch.messages[0].metadata?.batch, true);
assert.equal(batch.messages[1].metadata?.batch, undefined);
assert.equal(batch.messages[2].metadata?.batch, false);
assert.equal(
batchManager.push(
@ -68,7 +68,7 @@ describe("BatchManager", () => {
true,
);
const singleOpBatch = batchManager.popBatch();
assert.equal(singleOpBatch.content[0].metadata?.batch, undefined);
assert.equal(singleOpBatch.messages[0].metadata?.batch, undefined);
});
it("Batch content size is tracked correctly", () => {

Просмотреть файл

@ -21,7 +21,7 @@ describe("OpCompressor", () => {
const createBatch = (length: number, messageSize: number) =>
messagesToBatch(new Array(length).fill(createMessage(generateStringOfSize(messageSize))));
const messagesToBatch = (messages: BatchMessage[]): IBatch => ({
content: messages,
messages,
contentSizeInBytes: messages
.map((message) => JSON.stringify(message).length)
.reduce((a, b) => a + b),
@ -48,17 +48,17 @@ describe("OpCompressor", () => {
// large batch with small messages
createBatch(1000, 100 * 1024),
].forEach((batch) => {
it(`Batch of ${batch.content.length} ops of total size ${toMB(
it(`Batch of ${batch.messages.length} ops of total size ${toMB(
batch.contentSizeInBytes,
)} MB`, () => {
const compressedBatch = compressor.compressBatch(batch);
assert.strictEqual(compressedBatch.content.length, batch.content.length);
assert.strictEqual(compressedBatch.content[0].compression, "lz4");
assert.strictEqual(compressedBatch.content[0].metadata?.flag, true);
if (compressedBatch.content.length > 1) {
assert.strictEqual(compressedBatch.content[1].contents, undefined);
assert.strictEqual(compressedBatch.content[1].compression, undefined);
assert.strictEqual(compressedBatch.content[1].contents, undefined);
assert.strictEqual(compressedBatch.messages.length, batch.messages.length);
assert.strictEqual(compressedBatch.messages[0].compression, "lz4");
assert.strictEqual(compressedBatch.messages[0].metadata?.flag, true);
if (compressedBatch.messages.length > 1) {
assert.strictEqual(compressedBatch.messages[1].contents, undefined);
assert.strictEqual(compressedBatch.messages[1].compression, undefined);
assert.strictEqual(compressedBatch.messages[1].contents, undefined);
}
}).timeout(3000);
}));
@ -70,7 +70,7 @@ describe("OpCompressor", () => {
// small batch with large messages
createBatch(6, 100 * 1024 * 1024),
].forEach((batch) => {
it(`Not compressing batch of ${batch.content.length} ops of total size ${toMB(
it(`Not compressing batch of ${batch.messages.length} ops of total size ${toMB(
batch.contentSizeInBytes,
)} MB`, () => {
assert.throws(() => compressor.compressBatch(batch));
@ -78,7 +78,7 @@ describe("OpCompressor", () => {
{
eventName: "OpCompressor:BatchTooLarge",
category: "error",
length: batch.content.length,
length: batch.messages.length,
size: batch.contentSizeInBytes,
},
]);

Просмотреть файл

@ -239,7 +239,7 @@ describe("OpSplitter", () => {
// Empty batch
assert.throws(() =>
opSplitter.splitFirstBatchMessage({
content: [compressedMessage],
messages: [compressedMessage],
contentSizeInBytes: 0,
referenceSequenceNumber: 0,
}),
@ -248,7 +248,7 @@ describe("OpSplitter", () => {
// Empty batch
assert.throws(() =>
opSplitter.splitFirstBatchMessage({
content: [],
messages: [],
contentSizeInBytes: 1,
referenceSequenceNumber: 0,
}),
@ -257,7 +257,7 @@ describe("OpSplitter", () => {
// Batch is too small to be chunked
assert.throws(() =>
opSplitter.splitFirstBatchMessage({
content: [compressedMessage],
messages: [compressedMessage],
contentSizeInBytes: 1,
referenceSequenceNumber: 0,
}),
@ -266,7 +266,7 @@ describe("OpSplitter", () => {
// Batch is not compressed
assert.throws(() =>
opSplitter.splitFirstBatchMessage({
content: [regularMessage],
messages: [regularMessage],
contentSizeInBytes: 3,
referenceSequenceNumber: 0,
}),
@ -281,7 +281,7 @@ describe("OpSplitter", () => {
maxBatchSizeInBytes,
mockLogger,
).splitFirstBatchMessage({
content: [compressedMessage],
messages: [compressedMessage],
contentSizeInBytes: 3,
referenceSequenceNumber: 0,
}),
@ -291,7 +291,7 @@ describe("OpSplitter", () => {
assert.throws(() =>
new OpSplitter([], undefined, 0, maxBatchSizeInBytes, mockLogger).splitFirstBatchMessage(
{
content: [compressedMessage],
messages: [compressedMessage],
contentSizeInBytes: 3,
referenceSequenceNumber: 0,
},
@ -301,7 +301,7 @@ describe("OpSplitter", () => {
// Misconfigured op splitter
assert.throws(() =>
new OpSplitter([], mockSubmitBatchFn, 2, 1, mockLogger).splitFirstBatchMessage({
content: [compressedMessage],
messages: [compressedMessage],
contentSizeInBytes: 3,
referenceSequenceNumber: 0,
}),
@ -316,7 +316,7 @@ describe("OpSplitter", () => {
maxBatchSizeInBytes,
mockLogger,
).splitFirstBatchMessage({
content: [compressedMessage],
messages: [compressedMessage],
contentSizeInBytes: 3,
referenceSequenceNumber: 0,
}),
@ -340,7 +340,7 @@ describe("OpSplitter", () => {
const emptyMessage = generateChunkableOp(0);
const result = opSplitter.splitFirstBatchMessage({
content: [largeMessage, emptyMessage, emptyMessage, emptyMessage],
messages: [largeMessage, emptyMessage, emptyMessage, emptyMessage],
contentSizeInBytes: largeMessage.contents?.length ?? 0,
referenceSequenceNumber: 0,
});
@ -352,12 +352,13 @@ describe("OpSplitter", () => {
assert.equal(batch.referenceSequenceNumber, 0);
}
assert.equal(result.content.length, 4);
const lastChunk = JSON.parse(result.content[0].contents!).contents as IChunkedOp;
assert.equal(result.messages.length, 4);
const lastChunk = JSON.parse(result.messages[0].contents!).contents as IChunkedOp;
assert.equal(lastChunk.chunkId, lastChunk.totalChunks);
assert.deepStrictEqual(result.content.slice(1), new Array(3).fill(emptyMessage));
assert.deepStrictEqual(result.messages.slice(1), new Array(3).fill(emptyMessage));
assert.equal(
!extraOp || JSON.parse(result.content[0].contents!).contents?.contents?.length === 0,
!extraOp ||
JSON.parse(result.messages[0].contents!).contents?.contents?.length === 0,
true,
);
assert.notEqual(result.contentSizeInBytes, largeMessage.contents?.length ?? 0);
@ -375,7 +376,7 @@ describe("OpSplitter", () => {
mockLogger.matchEvents([
{
eventName: "OpSplitter:CompressedChunkedBatch",
length: result.content.length,
length: result.messages.length,
chunks: 100 / 20 + 1 + (extraOp ? 1 : 0),
chunkSizeInBytes: 20,
},
@ -397,7 +398,7 @@ describe("OpSplitter", () => {
const largeMessage = generateChunkableOp(100);
const result = opSplitter.splitFirstBatchMessage({
content: [largeMessage],
messages: [largeMessage],
contentSizeInBytes: largeMessage.contents?.length ?? 0,
referenceSequenceNumber: 0,
});
@ -409,12 +410,13 @@ describe("OpSplitter", () => {
assert.equal(batch.referenceSequenceNumber, 0);
}
assert.equal(result.content.length, 1);
assert.equal(result.messages.length, 1);
assert.notEqual(result.contentSizeInBytes, largeMessage.contents?.length ?? 0);
const lastChunk = JSON.parse(result.content[0].contents!).contents as IChunkedOp;
const lastChunk = JSON.parse(result.messages[0].contents!).contents as IChunkedOp;
assert.equal(lastChunk.chunkId, lastChunk.totalChunks);
assert.equal(
!extraOp || JSON.parse(result.content[0].contents!).contents?.contents?.length === 0,
!extraOp ||
JSON.parse(result.messages[0].contents!).contents?.contents?.length === 0,
true,
);
assert.notEqual(result.contentSizeInBytes, largeMessage.contents?.length ?? 0);
@ -432,7 +434,7 @@ describe("OpSplitter", () => {
mockLogger.matchEvents([
{
eventName: "OpSplitter:CompressedChunkedBatch",
length: result.content.length,
length: result.messages.length,
chunks: 100 / 20 + 1 + (extraOp ? 1 : 0),
chunkSizeInBytes: 20,
},

Просмотреть файл

@ -35,7 +35,7 @@ import {
Outbox,
} from "../../opLifecycle/index.js";
import {
IPendingBatchMessage,
PendingMessageResubmitData,
PendingStateManager,
type IPendingMessage,
} from "../../pendingStateManager.js";
@ -174,7 +174,7 @@ describe("Outbox", () => {
return messages;
};
const toBatch = (messages: BatchMessage[]): IBatch => ({
content: addBatchMetadata(messages),
messages: addBatchMetadata(messages),
contentSizeInBytes: messages
.map((message) => message.contents?.length ?? 0)
.reduce((a, b) => a + b, 0),
@ -228,7 +228,7 @@ describe("Outbox", () => {
mockLogger,
),
getCurrentSequenceNumbers: () => currentSeqNumbers,
reSubmit: (message: IPendingBatchMessage) => {
reSubmit: (message: PendingMessageResubmitData) => {
state.opsResubmitted++;
},
opReentrancy: () => state.isReentrant,

Просмотреть файл

@ -110,7 +110,7 @@ describe("RemoteMessageProcessor", () => {
let batch: IBatch = {
contentSizeInBytes: 1,
referenceSequenceNumber: Infinity,
content: [
messages: [
getOutboundMessage("a", true),
getOutboundMessage("b"),
getOutboundMessage("c"),
@ -154,7 +154,7 @@ describe("RemoteMessageProcessor", () => {
}
}
let startSeqNum = outboundMessages.length + 1;
outboundMessages.push(...batch.content);
outboundMessages.push(...batch.messages);
const messageProcessor = getMessageProcessor();
const actual: ISequencedDocumentMessage[] = [];

Просмотреть файл

@ -140,7 +140,6 @@ describe("Pending State Manager", () => {
clientId: () => "oldClientId",
close: (error?: ICriticalContainerError) => (closeError = error),
connected: () => true,
reSubmit: () => {},
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
@ -361,7 +360,6 @@ describe("Pending State Manager", () => {
clientId: () => undefined,
close: () => {},
connected: () => true,
reSubmit: () => {},
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
@ -450,7 +448,6 @@ describe("Pending State Manager", () => {
clientId: () => undefined,
close: () => {},
connected: () => true,
reSubmit: () => {},
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
@ -583,7 +580,6 @@ describe("Pending State Manager", () => {
clientId: () => "123",
close: () => {},
connected: () => true,
reSubmit: () => {},
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,