Remove ability to reject reentrant ops (#20621)

Remove failed experiment from repo.
It has been proven that pursuing this direction is not feasible / too expensive and does not provide good programming model.
While it does not close venue for some misuse, we have covered some aspects of it (properly tracking referenceSequence number / rebasing for ops sent via re-entrancy).
See https://dev.azure.com/fluidframework/internal/_workitems/edit/2322#12286846 for some details.

Key changes:
1. Remove IContainerRuntimeOptions.enableOpReentryCheck and code associated with this option
2. Remove ensureNoDataModelChanges() from API surfaces where it's safe. Some interfaces can be cleaned only in the future (due to N/N-1 compat promises)
3. Instead of relying on DDS to use ensureNoDataModelChanges to catch re-entrnacy, runtime will call it from process(), thus casting wider net and also not requiring each DDS to do so.
   - the side-effect of it - it will not catch re-entrant local changes, but such changes should not be rebased, so that's Ok.

Please note that opReentrancy.spec.ts has plenty of test coverage here, including UTs like "Eventual consistency with op reentry..." that uses SharedString to validate proper rebasing happens when dealing with reentrancy.
This commit is contained in:
Vlad Sudzilouski 2024-04-19 20:26:05 -04:00 коммит произвёл GitHub
Родитель 55b0f70003
Коммит c9d156264b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
20 изменённых файлов: 61 добавлений и 368 удалений

Просмотреть файл

@ -585,19 +585,7 @@ export abstract class SharedObjectCore<TEvent extends ISharedObjectEvents = ISha
*/
public emit(event: EventEmitterEventType, ...args: any[]): boolean {
return this.callbacksHelper.measure(() => {
// Creating ops while handling a DDS event can lead
// to undefined behavior and events observed in the wrong order.
// For example, we have two callbacks registered for a DDS, A and B.
// Then if on change #1 callback A creates change #2, the invocation flow will be:
//
// A because of #1
// A because of #2
// B because of #2
// B because of #1
//
// The runtime must enforce op coherence by not allowing any ops to be created during
// the event handler
return this.runtime.ensureNoDataModelChanges(() => super.emit(event, ...args));
return super.emit(event, ...args);
});
}

Просмотреть файл

@ -723,7 +723,6 @@ export interface IContainerRuntimeOptions {
readonly chunkSizeInBytes?: number;
readonly compressionOptions?: ICompressionRuntimeOptions;
readonly enableGroupedBatching?: boolean;
readonly enableOpReentryCheck?: boolean;
readonly enableRuntimeIdCompressor?: IdCompressorMode;
readonly explicitSchemaControl?: boolean;
readonly flushMode?: FlushMode;

Просмотреть файл

@ -168,6 +168,7 @@ export function wrapContext(context: IFluidParentContext): IFluidParentContext {
getAudience: (...args) => {
return context.getAudience(...args);
},
// back-compat, to be removed in 2.0
ensureNoDataModelChanges: (...args) => {
return context.ensureNoDataModelChanges(...args);
},

Просмотреть файл

@ -159,7 +159,6 @@ import {
OpSplitter,
Outbox,
RemoteMessageProcessor,
getLongStack,
} from "./opLifecycle/index.js";
import { pkgVersion } from "./packageVersion.js";
import {
@ -459,16 +458,6 @@ export interface IContainerRuntimeOptions {
*/
readonly enableRuntimeIdCompressor?: IdCompressorMode;
/**
* If enabled, the runtime will block all attempts to send an op inside the
* {@link ContainerRuntime#ensureNoDataModelChanges} callback. The callback is used by
* {@link @fluidframework/shared-object-base#SharedObjectCore} for event handlers so enabling this
* will disallow modifying DDSes while handling DDS events.
*
* By default, the feature is disabled. If enabled from options, the `Fluid.ContainerRuntime.DisableOpReentryCheck`
* can be used to disable it at runtime.
*/
readonly enableOpReentryCheck?: boolean;
/**
* If enabled, the runtime will group messages within a batch into a single
* message to be sent to the service.
@ -808,7 +797,6 @@ export class ContainerRuntime
maxBatchSizeInBytes = defaultMaxBatchSizeInBytes,
enableRuntimeIdCompressor,
chunkSizeInBytes = defaultChunkSizeInBytes,
enableOpReentryCheck = false,
enableGroupedBatching = false,
explicitSchemaControl = false,
} = runtimeOptions;
@ -1021,7 +1009,6 @@ export class ContainerRuntime
chunkSizeInBytes,
// Requires<> drops undefined from IdCompressorType
enableRuntimeIdCompressor: enableRuntimeIdCompressor as "on" | "delayed",
enableOpReentryCheck,
enableGroupedBatching,
explicitSchemaControl,
},
@ -1224,14 +1211,6 @@ export class ContainerRuntime
private ensureNoDataModelChangesCalls = 0;
/**
* Tracks the number of detected reentrant ops to report,
* in order to self-throttle the telemetry events.
*
* This should be removed as part of ADO:2322
*/
private opReentryCallsToReport = 5;
/**
* Invokes the given callback and expects that no ops are submitted
* until execution finishes. If an op is submitted, an error will be raised.
@ -1265,7 +1244,6 @@ export class ContainerRuntime
private dirtyContainer: boolean;
private emitDirtyDocumentEvent = true;
private readonly enableOpReentryCheck: boolean;
private readonly disableAttachReorder: boolean | undefined;
private readonly closeSummarizerDelayMs: number;
/**
@ -1549,14 +1527,6 @@ export class ContainerRuntime
this.validateSummaryHeuristicConfiguration(this.summaryConfiguration);
}
const disableOpReentryCheck = this.mc.config.getBoolean(
"Fluid.ContainerRuntime.DisableOpReentryCheck",
);
this.enableOpReentryCheck =
runtimeOptions.enableOpReentryCheck === true &&
// Allow for a break-glass config to override the options
disableOpReentryCheck !== true;
this.summariesDisabled = this.isSummariesDisabled();
this.maxOpsSinceLastSummary = this.getMaxOpsSinceLastSummary();
this.initialSummarizerDelayMs = this.getInitialSummarizerDelayMs();
@ -1868,7 +1838,6 @@ export class ContainerRuntime
idCompressorMode: this.idCompressorMode,
featureGates: JSON.stringify({
...featureGatesForTelemetry,
disableOpReentryCheck,
disableChunking,
disableAttachReorder: this.disableAttachReorder,
disablePartialFlush,
@ -2550,8 +2519,8 @@ export class ContainerRuntime
// but will not modify the contents object (likely it will replace it on the message).
const messageCopy = { ...messageArg };
for (const message of this.remoteMessageProcessor.process(messageCopy)) {
if (modernRuntimeMessage) {
this.processCore({
const msg: MessageWithContext = modernRuntimeMessage
? {
// Cast it since we expect it to be this based on modernRuntimeMessage computation above.
// There is nothing really ensuring that anytime original message.type is Operation that
// the result messages will be so. In the end modern bool being true only directs to
@ -2559,11 +2528,16 @@ export class ContainerRuntime
message: message as InboundSequencedContainerRuntimeMessage,
local,
modernRuntimeMessage,
});
} else {
// Unrecognized message will be ignored.
this.processCore({ message, local, modernRuntimeMessage });
}
: // Unrecognized message will be ignored.
{
message,
local,
modernRuntimeMessage,
};
// ensure that we observe any re-entrancy, and if needed, rebase ops
this.ensureNoDataModelChanges(() => this.processCore(msg));
}
}
@ -3884,7 +3858,6 @@ export class ContainerRuntime
metadata?: { localId: string; blobId?: string },
): void {
this.verifyNotClosed();
this.verifyCanSubmitOps();
// There should be no ops in detached container state!
assert(
@ -4039,37 +4012,6 @@ export class ContainerRuntime
}
}
private verifyCanSubmitOps() {
if (this.ensureNoDataModelChangesCalls > 0) {
const errorMessage =
"Op was submitted from within a `ensureNoDataModelChanges` callback";
if (this.opReentryCallsToReport > 0) {
this.mc.logger.sendTelemetryEvent(
{ eventName: "OpReentry" },
// We need to capture the call stack in order to inspect the source of this usage pattern
getLongStack(() => new UsageError(errorMessage)),
);
this.opReentryCallsToReport--;
}
// Creating ops while processing ops can lead
// to undefined behavior and events observed in the wrong order.
// For example, we have two callbacks registered for a DDS, A and B.
// Then if on change #1 callback A creates change #2, the invocation flow will be:
//
// A because of #1
// A because of #2
// B because of #2
// B because of #1
//
// The runtime must enforce op coherence by not allowing ops to be submitted
// while ops are being processed.
if (this.enableOpReentryCheck) {
throw new UsageError(errorMessage);
}
}
}
private reSubmitBatch(batch: IPendingBatchMessage[]) {
this.orderSequentially(() => {
for (const message of batch) {

Просмотреть файл

@ -219,6 +219,7 @@ export abstract class FluidDataStoreContext
return this._containerRuntime;
}
// back-compat, to be removed in 2.0
public ensureNoDataModelChanges<T>(callback: () => T): T {
return this.parentContext.ensureNoDataModelChanges(callback);
}

Просмотреть файл

@ -542,151 +542,6 @@ describe("Runtime", () => {
});
}));
describe("Op reentry enforcement", () => {
let containerRuntime: ContainerRuntime;
it("By default, don't enforce the op reentry check", async () => {
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext() as IContainerContext,
registryEntries: [],
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});
assert.ok(
containerRuntime.ensureNoDataModelChanges(() => {
submitDataStoreOp(containerRuntime, "id", "test");
return true;
}),
);
assert.ok(
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() => {
submitDataStoreOp(containerRuntime, "id", "test");
return true;
}),
),
),
);
});
it("If option enabled, enforce the op reentry check", async () => {
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext() as IContainerContext,
registryEntries: [],
runtimeOptions: {
enableOpReentryCheck: true,
},
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});
assert.throws(() =>
containerRuntime.ensureNoDataModelChanges(() =>
submitDataStoreOp(containerRuntime, "id", "test"),
),
);
assert.throws(() =>
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() =>
submitDataStoreOp(containerRuntime, "id", "test"),
),
),
),
);
});
it("If option enabled but disabled via feature gate, don't enforce the op reentry check", async () => {
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext({
"Fluid.ContainerRuntime.DisableOpReentryCheck": true,
}) as IContainerContext,
registryEntries: [],
runtimeOptions: {
enableOpReentryCheck: true,
},
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});
containerRuntime.ensureNoDataModelChanges(() =>
submitDataStoreOp(containerRuntime, "id", "test"),
);
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() =>
submitDataStoreOp(containerRuntime, "id", "test"),
),
),
);
});
it("Report at most 5 reentrant ops", async () => {
const mockLogger = new MockLogger();
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext({}, mockLogger) as IContainerContext,
registryEntries: [],
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});
mockLogger.clear();
containerRuntime.ensureNoDataModelChanges(() => {
for (let i = 0; i < 10; i++) {
submitDataStoreOp(containerRuntime, "id", "test");
}
});
// We expect only 5 events
mockLogger.assertMatchStrict(
Array.from(Array(5).keys()).map(() => ({
eventName: "ContainerRuntime:OpReentry",
error: "Op was submitted from within a `ensureNoDataModelChanges` callback",
})),
);
});
it("Can't call flush() inside ensureNoDataModelChanges's callback", async () => {
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext() as IContainerContext,
registryEntries: [],
runtimeOptions: {
flushMode: FlushMode.Immediate,
},
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});
assert.throws(() =>
containerRuntime.ensureNoDataModelChanges(() => {
containerRuntime.orderSequentially(() => {});
}),
);
});
it("Can't create an infinite ensureNoDataModelChanges recursive call ", async () => {
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext() as IContainerContext,
registryEntries: [],
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});
const callback = () => {
containerRuntime.ensureNoDataModelChanges(() => {
submitDataStoreOp(containerRuntime, "id", "test");
callback();
});
};
assert.throws(() => callback());
});
});
describe("orderSequentially with rollback", () =>
[
FlushMode.TurnBased,
@ -1569,7 +1424,6 @@ describe("Runtime", () => {
maxBatchSizeInBytes: 700 * 1024,
chunkSizeInBytes: 204800,
enableRuntimeIdCompressor: undefined,
enableOpReentryCheck: false,
enableGroupedBatching: false,
explicitSchemaControl: false,
} satisfies IContainerRuntimeOptions;
@ -1598,7 +1452,6 @@ describe("Runtime", () => {
const featureGates = {
"Fluid.ContainerRuntime.CompressionDisabled": true,
"Fluid.ContainerRuntime.CompressionChunkingDisabled": true,
"Fluid.ContainerRuntime.DisableOpReentryCheck": false,
"Fluid.ContainerRuntime.IdCompressorEnabled": true,
"Fluid.ContainerRuntime.DisableGroupedBatching": true,
};
@ -1619,7 +1472,6 @@ describe("Runtime", () => {
featureGates: JSON.stringify({
disableGroupedBatching: true,
disableCompression: true,
disableOpReentryCheck: false,
disableChunking: true,
}),
groupedBatchingEnabled: false,

Просмотреть файл

@ -101,7 +101,6 @@ export interface IFluidDataStoreRuntime extends IEventProvider<IFluidDataStoreRu
createChannel(id: string | undefined, type: string): IChannel;
// (undocumented)
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
ensureNoDataModelChanges<T>(callback: () => T): T;
readonly entryPoint: IFluidHandle<FluidObject>;
getAudience(): IAudience;
getChannel(id: string): Promise<IChannel>;

Просмотреть файл

@ -72,16 +72,6 @@ export interface IFluidDataStoreRuntime
*/
getChannel(id: string): Promise<IChannel>;
/**
* Invokes the given callback and expects that no ops are submitted
* until execution finishes. If an op is submitted, an error will be raised.
*
* Can be disabled by feature gate `Fluid.ContainerRuntime.DisableOpReentryCheck`
*
* @param callback - the callback to be invoked
*/
ensureNoDataModelChanges<T>(callback: () => T): T;
/**
* Creates a new channel of the given type.
* @param id - ID of the channel to be created. A unique ID will be generated if left undefined.

Просмотреть файл

@ -69,7 +69,6 @@ export class FluidDataStoreRuntime extends TypedEventEmitter<IFluidDataStoreRunt
dispose(): void;
// (undocumented)
get disposed(): boolean;
ensureNoDataModelChanges<T>(callback: () => T): T;
readonly entryPoint: IFluidHandle<FluidObject>;
getAttachGCData(telemetryContext?: ITelemetryContext): IGarbageCollectionData;
// (undocumented)

Просмотреть файл

@ -146,7 +146,8 @@
"typeValidation": {
"broken": {
"ClassDeclaration_FluidDataStoreRuntime": {
"forwardCompat": false
"forwardCompat": false,
"backCompat": false
}
}
}

Просмотреть файл

@ -202,21 +202,6 @@ export class FluidDataStoreRuntime
*/
private localChangesTelemetryCount: number;
/**
* Invokes the given callback and expects that no ops are submitted
* until execution finishes. If an op is submitted, an error will be raised.
*
* Can be disabled by feature gate `Fluid.ContainerRuntime.DisableOpReentryCheck`
*
* @param callback - the callback to be invoked
*/
public ensureNoDataModelChanges<T>(callback: () => T): T {
// back-compat ADO:2309
return this.dataStoreContext.ensureNoDataModelChanges === undefined
? callback()
: this.dataStoreContext.ensureNoDataModelChanges(callback);
}
/**
* Create an instance of a DataStore runtime.
*

Просмотреть файл

@ -70,6 +70,7 @@ declare function get_current_ClassDeclaration_FluidDataStoreRuntime():
declare function use_old_ClassDeclaration_FluidDataStoreRuntime(
use: TypeOnly<old.FluidDataStoreRuntime>): void;
use_old_ClassDeclaration_FluidDataStoreRuntime(
// @ts-expect-error compatibility expected to be broken
get_current_ClassDeclaration_FluidDataStoreRuntime());
/*

Просмотреть файл

@ -257,6 +257,7 @@ export interface IFluidParentContext extends IProvideFluidHandleContext, Partial
deleteChildSummarizerNode(id: string): void;
// (undocumented)
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
// @deprecated
ensureNoDataModelChanges<T>(callback: () => T): T;
// (undocumented)
readonly gcThrowOnTombstoneUsage: boolean;

Просмотреть файл

@ -437,6 +437,9 @@ export interface IFluidParentContext
* Can be disabled by feature gate `Fluid.ContainerRuntime.DisableOpReentryCheck`
*
* @param callback - the callback to be invoked
*
* @deprecated
* // back-compat: to be removed in 2.0
*/
ensureNoDataModelChanges<T>(callback: () => T): T;

Просмотреть файл

@ -466,8 +466,6 @@ export class MockFluidDataStoreRuntime extends EventEmitter implements IFluidDat
// (undocumented)
readonly documentId: string;
// (undocumented)
ensureNoDataModelChanges<T>(callback: () => T): T;
// (undocumented)
readonly entryPoint: IFluidHandle<FluidObject>;
// (undocumented)
readonly existing: boolean;

Просмотреть файл

@ -766,10 +766,6 @@ export class MockFluidDataStoreRuntime
return deltaConnection;
}
public ensureNoDataModelChanges<T>(callback: () => T): T {
return callback();
}
public get absolutePath() {
return `/${this.id}`;
}

Просмотреть файл

@ -86,6 +86,7 @@ export class MockFluidDataStoreContext implements IFluidDataStoreContext {
throw new Error("Method not implemented.");
}
// back-compat: to be removed in 2.0
public ensureNoDataModelChanges<T>(callback: () => T): T {
return callback();
}

Просмотреть файл

@ -168,14 +168,7 @@ describe("Container create with feature flags", () => {
beforeEach("createAzureClient", () => {
mockLogger = new MockLogger();
client = createAzureClient(
undefined,
undefined,
mockLogger,
configProvider({
"Fluid.ContainerRuntime.DisableOpReentryCheck": true,
}),
);
client = createAzureClient(undefined, undefined, mockLogger, configProvider({}));
schema = {
initialObjects: {
map1: SharedMap,
@ -193,6 +186,6 @@ describe("Container create with feature flags", () => {
const event = mockLogger.events.find((e) => e.eventName.endsWith("ContainerLoadStats"));
assert(event !== undefined, "ContainerLoadStats event should exist");
const featureGates = event.featureGates as string;
assert(featureGates.includes('"disableOpReentryCheck":true'));
assert(featureGates.length > 0);
});
});

Просмотреть файл

@ -5,7 +5,7 @@
import { strict as assert } from "assert";
import { describeCompat, itExpects } from "@fluid-private/test-version-utils";
import { describeCompat } from "@fluid-private/test-version-utils";
import { IContainer } from "@fluidframework/container-definitions/internal";
import { ConfigTypes, IConfigProviderBase } from "@fluidframework/core-interfaces";
import type { SharedDirectory, ISharedMap } from "@fluidframework/map/internal";
@ -69,6 +69,7 @@ describeCompat(
...containerConfig,
loaderProps: { configProvider: configProvider(featureGates) },
};
provider.reset();
container1 = await provider.makeTestContainer(configWithFeatureGates);
container2 = await provider.loadTestContainer(configWithFeatureGates);
@ -89,26 +90,13 @@ describeCompat(
await provider.ensureSynchronized();
};
itExpects(
"Should close the container when submitting an op while processing a batch",
[
{
eventName: "fluid:telemetry:Container:ContainerClose",
error: "Op was submitted from within a `ensureNoDataModelChanges` callback",
},
],
async function () {
it("Test reentrant op sending", async function () {
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
// This test is flaky on Tinylicious. ADO:5010
this.skip();
}
await setupContainers({
...testContainerConfig,
runtimeOptions: {
enableOpReentryCheck: true,
},
});
await setupContainers(testContainerConfig);
sharedMap1.on("valueChanged", (changed) => {
if (changed.key !== "key2") {
@ -116,22 +104,19 @@ describeCompat(
}
});
assert.throws(() => {
sharedMap1.set("key1", "1");
});
sharedMap2.set("key2", "2");
await provider.ensureSynchronized();
// The offending container is closed
assert.ok(container1.closed);
assert.ok(!container1.closed);
assert.ok(!container2.closed);
// The other container is fine
assert.equal(sharedMap2.get("key1"), undefined);
assert.equal(sharedMap2.get("key1"), "1");
assert.equal(sharedMap2.get("key2"), "2");
assert.ok(!mapsAreEqual(sharedMap1, sharedMap2));
},
);
assert.ok(mapsAreEqual(sharedMap1, sharedMap2));
});
[false, true].forEach((enableGroupedBatching) => {
it(`Eventual consistency with op reentry - ${
@ -253,40 +238,6 @@ describeCompat(
});
describe("Reentry safeguards", () => {
itExpects(
"Flushing is not supported",
[
{
eventName: "fluid:telemetry:Container:ContainerClose",
error: "Flushing is not supported inside DDS event handlers",
},
],
async function () {
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
// This test is flaky on Tinylicious. ADO:5010
this.skip();
}
await setupContainers({
...testContainerConfig,
runtimeOptions: {
flushMode: FlushMode.Immediate,
},
});
sharedString1.on("sequenceDelta", () =>
assert.throws(() =>
dataObject1.context.containerRuntime.orderSequentially(() =>
sharedMap1.set("0", 0),
),
),
);
sharedString1.insertText(0, "ad");
await provider.ensureSynchronized();
},
);
it("Flushing is supported if it happens in the next batch", async function () {
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
// This test is flaky on Tinylicious. ADO:5010
@ -314,7 +265,7 @@ describeCompat(
});
});
it("Should throw when submitting an op while handling an event - offline", async function () {
it("Should not throw when submitting an op while handling an event - offline", async function () {
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
// This test is flaky on Tinylicious. ADO:5010
this.skip();
@ -322,9 +273,7 @@ describeCompat(
await setupContainers({
...testContainerConfig,
runtimeOptions: {
enableOpReentryCheck: true,
},
runtimeOptions: {},
});
await container1.deltaManager.inbound.pause();
@ -336,9 +285,7 @@ describeCompat(
}
});
assert.throws(() => {
sharedMap1.set("key1", "1");
});
container1.deltaManager.inbound.resume();
container1.deltaManager.outbound.resume();
@ -347,7 +294,7 @@ describeCompat(
// The offending container is not closed
assert.ok(!container1.closed);
assert.ok(!mapsAreEqual(sharedMap1, sharedMap2));
assert.ok(mapsAreEqual(sharedMap1, sharedMap2));
});
describe("Allow reentry", () =>
@ -360,11 +307,8 @@ describeCompat(
{
options: {
...testContainerConfig,
runtimeOptions: {
enableOpReentryCheck: true,
runtimeOptions: {},
},
},
featureGates: { "Fluid.ContainerRuntime.DisableOpReentryCheck": true },
name: "Enabled by options, disabled by feature gate",
},
].forEach((testConfig) => {

Просмотреть файл

@ -102,7 +102,6 @@ export function generateRuntimeOptions(
{ minimumBatchSizeInBytes: 500, compressionAlgorithm: CompressionAlgorithms.lz4 },
],
maxBatchSizeInBytes: [716800],
enableOpReentryCheck: [true],
// Compressed payloads exceeding this size will be chunked into messages of exactly this size
chunkSizeInBytes: [204800],
enableRuntimeIdCompressor: ["on", undefined, "delayed"],