Remove ability to reject reentrant ops (#20621)
Remove failed experiment from repo. It has been proven that pursuing this direction is not feasible / too expensive and does not provide good programming model. While it does not close venue for some misuse, we have covered some aspects of it (properly tracking referenceSequence number / rebasing for ops sent via re-entrancy). See https://dev.azure.com/fluidframework/internal/_workitems/edit/2322#12286846 for some details. Key changes: 1. Remove IContainerRuntimeOptions.enableOpReentryCheck and code associated with this option 2. Remove ensureNoDataModelChanges() from API surfaces where it's safe. Some interfaces can be cleaned only in the future (due to N/N-1 compat promises) 3. Instead of relying on DDS to use ensureNoDataModelChanges to catch re-entrnacy, runtime will call it from process(), thus casting wider net and also not requiring each DDS to do so. - the side-effect of it - it will not catch re-entrant local changes, but such changes should not be rebased, so that's Ok. Please note that opReentrancy.spec.ts has plenty of test coverage here, including UTs like "Eventual consistency with op reentry..." that uses SharedString to validate proper rebasing happens when dealing with reentrancy.
This commit is contained in:
Родитель
55b0f70003
Коммит
c9d156264b
|
@ -585,19 +585,7 @@ export abstract class SharedObjectCore<TEvent extends ISharedObjectEvents = ISha
|
|||
*/
|
||||
public emit(event: EventEmitterEventType, ...args: any[]): boolean {
|
||||
return this.callbacksHelper.measure(() => {
|
||||
// Creating ops while handling a DDS event can lead
|
||||
// to undefined behavior and events observed in the wrong order.
|
||||
// For example, we have two callbacks registered for a DDS, A and B.
|
||||
// Then if on change #1 callback A creates change #2, the invocation flow will be:
|
||||
//
|
||||
// A because of #1
|
||||
// A because of #2
|
||||
// B because of #2
|
||||
// B because of #1
|
||||
//
|
||||
// The runtime must enforce op coherence by not allowing any ops to be created during
|
||||
// the event handler
|
||||
return this.runtime.ensureNoDataModelChanges(() => super.emit(event, ...args));
|
||||
return super.emit(event, ...args);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -723,7 +723,6 @@ export interface IContainerRuntimeOptions {
|
|||
readonly chunkSizeInBytes?: number;
|
||||
readonly compressionOptions?: ICompressionRuntimeOptions;
|
||||
readonly enableGroupedBatching?: boolean;
|
||||
readonly enableOpReentryCheck?: boolean;
|
||||
readonly enableRuntimeIdCompressor?: IdCompressorMode;
|
||||
readonly explicitSchemaControl?: boolean;
|
||||
readonly flushMode?: FlushMode;
|
||||
|
|
|
@ -168,6 +168,7 @@ export function wrapContext(context: IFluidParentContext): IFluidParentContext {
|
|||
getAudience: (...args) => {
|
||||
return context.getAudience(...args);
|
||||
},
|
||||
// back-compat, to be removed in 2.0
|
||||
ensureNoDataModelChanges: (...args) => {
|
||||
return context.ensureNoDataModelChanges(...args);
|
||||
},
|
||||
|
|
|
@ -159,7 +159,6 @@ import {
|
|||
OpSplitter,
|
||||
Outbox,
|
||||
RemoteMessageProcessor,
|
||||
getLongStack,
|
||||
} from "./opLifecycle/index.js";
|
||||
import { pkgVersion } from "./packageVersion.js";
|
||||
import {
|
||||
|
@ -459,16 +458,6 @@ export interface IContainerRuntimeOptions {
|
|||
*/
|
||||
readonly enableRuntimeIdCompressor?: IdCompressorMode;
|
||||
|
||||
/**
|
||||
* If enabled, the runtime will block all attempts to send an op inside the
|
||||
* {@link ContainerRuntime#ensureNoDataModelChanges} callback. The callback is used by
|
||||
* {@link @fluidframework/shared-object-base#SharedObjectCore} for event handlers so enabling this
|
||||
* will disallow modifying DDSes while handling DDS events.
|
||||
*
|
||||
* By default, the feature is disabled. If enabled from options, the `Fluid.ContainerRuntime.DisableOpReentryCheck`
|
||||
* can be used to disable it at runtime.
|
||||
*/
|
||||
readonly enableOpReentryCheck?: boolean;
|
||||
/**
|
||||
* If enabled, the runtime will group messages within a batch into a single
|
||||
* message to be sent to the service.
|
||||
|
@ -808,7 +797,6 @@ export class ContainerRuntime
|
|||
maxBatchSizeInBytes = defaultMaxBatchSizeInBytes,
|
||||
enableRuntimeIdCompressor,
|
||||
chunkSizeInBytes = defaultChunkSizeInBytes,
|
||||
enableOpReentryCheck = false,
|
||||
enableGroupedBatching = false,
|
||||
explicitSchemaControl = false,
|
||||
} = runtimeOptions;
|
||||
|
@ -1021,7 +1009,6 @@ export class ContainerRuntime
|
|||
chunkSizeInBytes,
|
||||
// Requires<> drops undefined from IdCompressorType
|
||||
enableRuntimeIdCompressor: enableRuntimeIdCompressor as "on" | "delayed",
|
||||
enableOpReentryCheck,
|
||||
enableGroupedBatching,
|
||||
explicitSchemaControl,
|
||||
},
|
||||
|
@ -1224,14 +1211,6 @@ export class ContainerRuntime
|
|||
|
||||
private ensureNoDataModelChangesCalls = 0;
|
||||
|
||||
/**
|
||||
* Tracks the number of detected reentrant ops to report,
|
||||
* in order to self-throttle the telemetry events.
|
||||
*
|
||||
* This should be removed as part of ADO:2322
|
||||
*/
|
||||
private opReentryCallsToReport = 5;
|
||||
|
||||
/**
|
||||
* Invokes the given callback and expects that no ops are submitted
|
||||
* until execution finishes. If an op is submitted, an error will be raised.
|
||||
|
@ -1265,7 +1244,6 @@ export class ContainerRuntime
|
|||
|
||||
private dirtyContainer: boolean;
|
||||
private emitDirtyDocumentEvent = true;
|
||||
private readonly enableOpReentryCheck: boolean;
|
||||
private readonly disableAttachReorder: boolean | undefined;
|
||||
private readonly closeSummarizerDelayMs: number;
|
||||
/**
|
||||
|
@ -1549,14 +1527,6 @@ export class ContainerRuntime
|
|||
this.validateSummaryHeuristicConfiguration(this.summaryConfiguration);
|
||||
}
|
||||
|
||||
const disableOpReentryCheck = this.mc.config.getBoolean(
|
||||
"Fluid.ContainerRuntime.DisableOpReentryCheck",
|
||||
);
|
||||
this.enableOpReentryCheck =
|
||||
runtimeOptions.enableOpReentryCheck === true &&
|
||||
// Allow for a break-glass config to override the options
|
||||
disableOpReentryCheck !== true;
|
||||
|
||||
this.summariesDisabled = this.isSummariesDisabled();
|
||||
this.maxOpsSinceLastSummary = this.getMaxOpsSinceLastSummary();
|
||||
this.initialSummarizerDelayMs = this.getInitialSummarizerDelayMs();
|
||||
|
@ -1868,7 +1838,6 @@ export class ContainerRuntime
|
|||
idCompressorMode: this.idCompressorMode,
|
||||
featureGates: JSON.stringify({
|
||||
...featureGatesForTelemetry,
|
||||
disableOpReentryCheck,
|
||||
disableChunking,
|
||||
disableAttachReorder: this.disableAttachReorder,
|
||||
disablePartialFlush,
|
||||
|
@ -2550,8 +2519,8 @@ export class ContainerRuntime
|
|||
// but will not modify the contents object (likely it will replace it on the message).
|
||||
const messageCopy = { ...messageArg };
|
||||
for (const message of this.remoteMessageProcessor.process(messageCopy)) {
|
||||
if (modernRuntimeMessage) {
|
||||
this.processCore({
|
||||
const msg: MessageWithContext = modernRuntimeMessage
|
||||
? {
|
||||
// Cast it since we expect it to be this based on modernRuntimeMessage computation above.
|
||||
// There is nothing really ensuring that anytime original message.type is Operation that
|
||||
// the result messages will be so. In the end modern bool being true only directs to
|
||||
|
@ -2559,11 +2528,16 @@ export class ContainerRuntime
|
|||
message: message as InboundSequencedContainerRuntimeMessage,
|
||||
local,
|
||||
modernRuntimeMessage,
|
||||
});
|
||||
} else {
|
||||
// Unrecognized message will be ignored.
|
||||
this.processCore({ message, local, modernRuntimeMessage });
|
||||
}
|
||||
: // Unrecognized message will be ignored.
|
||||
{
|
||||
message,
|
||||
local,
|
||||
modernRuntimeMessage,
|
||||
};
|
||||
|
||||
// ensure that we observe any re-entrancy, and if needed, rebase ops
|
||||
this.ensureNoDataModelChanges(() => this.processCore(msg));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3884,7 +3858,6 @@ export class ContainerRuntime
|
|||
metadata?: { localId: string; blobId?: string },
|
||||
): void {
|
||||
this.verifyNotClosed();
|
||||
this.verifyCanSubmitOps();
|
||||
|
||||
// There should be no ops in detached container state!
|
||||
assert(
|
||||
|
@ -4039,37 +4012,6 @@ export class ContainerRuntime
|
|||
}
|
||||
}
|
||||
|
||||
private verifyCanSubmitOps() {
|
||||
if (this.ensureNoDataModelChangesCalls > 0) {
|
||||
const errorMessage =
|
||||
"Op was submitted from within a `ensureNoDataModelChanges` callback";
|
||||
if (this.opReentryCallsToReport > 0) {
|
||||
this.mc.logger.sendTelemetryEvent(
|
||||
{ eventName: "OpReentry" },
|
||||
// We need to capture the call stack in order to inspect the source of this usage pattern
|
||||
getLongStack(() => new UsageError(errorMessage)),
|
||||
);
|
||||
this.opReentryCallsToReport--;
|
||||
}
|
||||
|
||||
// Creating ops while processing ops can lead
|
||||
// to undefined behavior and events observed in the wrong order.
|
||||
// For example, we have two callbacks registered for a DDS, A and B.
|
||||
// Then if on change #1 callback A creates change #2, the invocation flow will be:
|
||||
//
|
||||
// A because of #1
|
||||
// A because of #2
|
||||
// B because of #2
|
||||
// B because of #1
|
||||
//
|
||||
// The runtime must enforce op coherence by not allowing ops to be submitted
|
||||
// while ops are being processed.
|
||||
if (this.enableOpReentryCheck) {
|
||||
throw new UsageError(errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private reSubmitBatch(batch: IPendingBatchMessage[]) {
|
||||
this.orderSequentially(() => {
|
||||
for (const message of batch) {
|
||||
|
|
|
@ -219,6 +219,7 @@ export abstract class FluidDataStoreContext
|
|||
return this._containerRuntime;
|
||||
}
|
||||
|
||||
// back-compat, to be removed in 2.0
|
||||
public ensureNoDataModelChanges<T>(callback: () => T): T {
|
||||
return this.parentContext.ensureNoDataModelChanges(callback);
|
||||
}
|
||||
|
|
|
@ -542,151 +542,6 @@ describe("Runtime", () => {
|
|||
});
|
||||
}));
|
||||
|
||||
describe("Op reentry enforcement", () => {
|
||||
let containerRuntime: ContainerRuntime;
|
||||
|
||||
it("By default, don't enforce the op reentry check", async () => {
|
||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
||||
context: getMockContext() as IContainerContext,
|
||||
registryEntries: [],
|
||||
provideEntryPoint: mockProvideEntryPoint,
|
||||
existing: false,
|
||||
});
|
||||
|
||||
assert.ok(
|
||||
containerRuntime.ensureNoDataModelChanges(() => {
|
||||
submitDataStoreOp(containerRuntime, "id", "test");
|
||||
return true;
|
||||
}),
|
||||
);
|
||||
|
||||
assert.ok(
|
||||
containerRuntime.ensureNoDataModelChanges(() =>
|
||||
containerRuntime.ensureNoDataModelChanges(() =>
|
||||
containerRuntime.ensureNoDataModelChanges(() => {
|
||||
submitDataStoreOp(containerRuntime, "id", "test");
|
||||
return true;
|
||||
}),
|
||||
),
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
it("If option enabled, enforce the op reentry check", async () => {
|
||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
||||
context: getMockContext() as IContainerContext,
|
||||
registryEntries: [],
|
||||
runtimeOptions: {
|
||||
enableOpReentryCheck: true,
|
||||
},
|
||||
provideEntryPoint: mockProvideEntryPoint,
|
||||
existing: false,
|
||||
});
|
||||
|
||||
assert.throws(() =>
|
||||
containerRuntime.ensureNoDataModelChanges(() =>
|
||||
submitDataStoreOp(containerRuntime, "id", "test"),
|
||||
),
|
||||
);
|
||||
|
||||
assert.throws(() =>
|
||||
containerRuntime.ensureNoDataModelChanges(() =>
|
||||
containerRuntime.ensureNoDataModelChanges(() =>
|
||||
containerRuntime.ensureNoDataModelChanges(() =>
|
||||
submitDataStoreOp(containerRuntime, "id", "test"),
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
it("If option enabled but disabled via feature gate, don't enforce the op reentry check", async () => {
|
||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
||||
context: getMockContext({
|
||||
"Fluid.ContainerRuntime.DisableOpReentryCheck": true,
|
||||
}) as IContainerContext,
|
||||
registryEntries: [],
|
||||
runtimeOptions: {
|
||||
enableOpReentryCheck: true,
|
||||
},
|
||||
provideEntryPoint: mockProvideEntryPoint,
|
||||
existing: false,
|
||||
});
|
||||
|
||||
containerRuntime.ensureNoDataModelChanges(() =>
|
||||
submitDataStoreOp(containerRuntime, "id", "test"),
|
||||
);
|
||||
|
||||
containerRuntime.ensureNoDataModelChanges(() =>
|
||||
containerRuntime.ensureNoDataModelChanges(() =>
|
||||
containerRuntime.ensureNoDataModelChanges(() =>
|
||||
submitDataStoreOp(containerRuntime, "id", "test"),
|
||||
),
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
it("Report at most 5 reentrant ops", async () => {
|
||||
const mockLogger = new MockLogger();
|
||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
||||
context: getMockContext({}, mockLogger) as IContainerContext,
|
||||
registryEntries: [],
|
||||
provideEntryPoint: mockProvideEntryPoint,
|
||||
existing: false,
|
||||
});
|
||||
|
||||
mockLogger.clear();
|
||||
containerRuntime.ensureNoDataModelChanges(() => {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
submitDataStoreOp(containerRuntime, "id", "test");
|
||||
}
|
||||
});
|
||||
|
||||
// We expect only 5 events
|
||||
mockLogger.assertMatchStrict(
|
||||
Array.from(Array(5).keys()).map(() => ({
|
||||
eventName: "ContainerRuntime:OpReentry",
|
||||
error: "Op was submitted from within a `ensureNoDataModelChanges` callback",
|
||||
})),
|
||||
);
|
||||
});
|
||||
|
||||
it("Can't call flush() inside ensureNoDataModelChanges's callback", async () => {
|
||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
||||
context: getMockContext() as IContainerContext,
|
||||
registryEntries: [],
|
||||
runtimeOptions: {
|
||||
flushMode: FlushMode.Immediate,
|
||||
},
|
||||
provideEntryPoint: mockProvideEntryPoint,
|
||||
existing: false,
|
||||
});
|
||||
|
||||
assert.throws(() =>
|
||||
containerRuntime.ensureNoDataModelChanges(() => {
|
||||
containerRuntime.orderSequentially(() => {});
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("Can't create an infinite ensureNoDataModelChanges recursive call ", async () => {
|
||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
||||
context: getMockContext() as IContainerContext,
|
||||
registryEntries: [],
|
||||
provideEntryPoint: mockProvideEntryPoint,
|
||||
existing: false,
|
||||
});
|
||||
|
||||
const callback = () => {
|
||||
containerRuntime.ensureNoDataModelChanges(() => {
|
||||
submitDataStoreOp(containerRuntime, "id", "test");
|
||||
callback();
|
||||
});
|
||||
};
|
||||
assert.throws(() => callback());
|
||||
});
|
||||
});
|
||||
|
||||
describe("orderSequentially with rollback", () =>
|
||||
[
|
||||
FlushMode.TurnBased,
|
||||
|
@ -1569,7 +1424,6 @@ describe("Runtime", () => {
|
|||
maxBatchSizeInBytes: 700 * 1024,
|
||||
chunkSizeInBytes: 204800,
|
||||
enableRuntimeIdCompressor: undefined,
|
||||
enableOpReentryCheck: false,
|
||||
enableGroupedBatching: false,
|
||||
explicitSchemaControl: false,
|
||||
} satisfies IContainerRuntimeOptions;
|
||||
|
@ -1598,7 +1452,6 @@ describe("Runtime", () => {
|
|||
const featureGates = {
|
||||
"Fluid.ContainerRuntime.CompressionDisabled": true,
|
||||
"Fluid.ContainerRuntime.CompressionChunkingDisabled": true,
|
||||
"Fluid.ContainerRuntime.DisableOpReentryCheck": false,
|
||||
"Fluid.ContainerRuntime.IdCompressorEnabled": true,
|
||||
"Fluid.ContainerRuntime.DisableGroupedBatching": true,
|
||||
};
|
||||
|
@ -1619,7 +1472,6 @@ describe("Runtime", () => {
|
|||
featureGates: JSON.stringify({
|
||||
disableGroupedBatching: true,
|
||||
disableCompression: true,
|
||||
disableOpReentryCheck: false,
|
||||
disableChunking: true,
|
||||
}),
|
||||
groupedBatchingEnabled: false,
|
||||
|
|
|
@ -101,7 +101,6 @@ export interface IFluidDataStoreRuntime extends IEventProvider<IFluidDataStoreRu
|
|||
createChannel(id: string | undefined, type: string): IChannel;
|
||||
// (undocumented)
|
||||
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
|
||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
||||
readonly entryPoint: IFluidHandle<FluidObject>;
|
||||
getAudience(): IAudience;
|
||||
getChannel(id: string): Promise<IChannel>;
|
||||
|
|
|
@ -72,16 +72,6 @@ export interface IFluidDataStoreRuntime
|
|||
*/
|
||||
getChannel(id: string): Promise<IChannel>;
|
||||
|
||||
/**
|
||||
* Invokes the given callback and expects that no ops are submitted
|
||||
* until execution finishes. If an op is submitted, an error will be raised.
|
||||
*
|
||||
* Can be disabled by feature gate `Fluid.ContainerRuntime.DisableOpReentryCheck`
|
||||
*
|
||||
* @param callback - the callback to be invoked
|
||||
*/
|
||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
||||
|
||||
/**
|
||||
* Creates a new channel of the given type.
|
||||
* @param id - ID of the channel to be created. A unique ID will be generated if left undefined.
|
||||
|
|
|
@ -69,7 +69,6 @@ export class FluidDataStoreRuntime extends TypedEventEmitter<IFluidDataStoreRunt
|
|||
dispose(): void;
|
||||
// (undocumented)
|
||||
get disposed(): boolean;
|
||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
||||
readonly entryPoint: IFluidHandle<FluidObject>;
|
||||
getAttachGCData(telemetryContext?: ITelemetryContext): IGarbageCollectionData;
|
||||
// (undocumented)
|
||||
|
|
|
@ -146,7 +146,8 @@
|
|||
"typeValidation": {
|
||||
"broken": {
|
||||
"ClassDeclaration_FluidDataStoreRuntime": {
|
||||
"forwardCompat": false
|
||||
"forwardCompat": false,
|
||||
"backCompat": false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -202,21 +202,6 @@ export class FluidDataStoreRuntime
|
|||
*/
|
||||
private localChangesTelemetryCount: number;
|
||||
|
||||
/**
|
||||
* Invokes the given callback and expects that no ops are submitted
|
||||
* until execution finishes. If an op is submitted, an error will be raised.
|
||||
*
|
||||
* Can be disabled by feature gate `Fluid.ContainerRuntime.DisableOpReentryCheck`
|
||||
*
|
||||
* @param callback - the callback to be invoked
|
||||
*/
|
||||
public ensureNoDataModelChanges<T>(callback: () => T): T {
|
||||
// back-compat ADO:2309
|
||||
return this.dataStoreContext.ensureNoDataModelChanges === undefined
|
||||
? callback()
|
||||
: this.dataStoreContext.ensureNoDataModelChanges(callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance of a DataStore runtime.
|
||||
*
|
||||
|
|
|
@ -70,6 +70,7 @@ declare function get_current_ClassDeclaration_FluidDataStoreRuntime():
|
|||
declare function use_old_ClassDeclaration_FluidDataStoreRuntime(
|
||||
use: TypeOnly<old.FluidDataStoreRuntime>): void;
|
||||
use_old_ClassDeclaration_FluidDataStoreRuntime(
|
||||
// @ts-expect-error compatibility expected to be broken
|
||||
get_current_ClassDeclaration_FluidDataStoreRuntime());
|
||||
|
||||
/*
|
||||
|
|
|
@ -257,6 +257,7 @@ export interface IFluidParentContext extends IProvideFluidHandleContext, Partial
|
|||
deleteChildSummarizerNode(id: string): void;
|
||||
// (undocumented)
|
||||
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
|
||||
// @deprecated
|
||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
||||
// (undocumented)
|
||||
readonly gcThrowOnTombstoneUsage: boolean;
|
||||
|
|
|
@ -437,6 +437,9 @@ export interface IFluidParentContext
|
|||
* Can be disabled by feature gate `Fluid.ContainerRuntime.DisableOpReentryCheck`
|
||||
*
|
||||
* @param callback - the callback to be invoked
|
||||
*
|
||||
* @deprecated
|
||||
* // back-compat: to be removed in 2.0
|
||||
*/
|
||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
||||
|
||||
|
|
|
@ -466,8 +466,6 @@ export class MockFluidDataStoreRuntime extends EventEmitter implements IFluidDat
|
|||
// (undocumented)
|
||||
readonly documentId: string;
|
||||
// (undocumented)
|
||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
||||
// (undocumented)
|
||||
readonly entryPoint: IFluidHandle<FluidObject>;
|
||||
// (undocumented)
|
||||
readonly existing: boolean;
|
||||
|
|
|
@ -766,10 +766,6 @@ export class MockFluidDataStoreRuntime
|
|||
return deltaConnection;
|
||||
}
|
||||
|
||||
public ensureNoDataModelChanges<T>(callback: () => T): T {
|
||||
return callback();
|
||||
}
|
||||
|
||||
public get absolutePath() {
|
||||
return `/${this.id}`;
|
||||
}
|
||||
|
|
|
@ -86,6 +86,7 @@ export class MockFluidDataStoreContext implements IFluidDataStoreContext {
|
|||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
// back-compat: to be removed in 2.0
|
||||
public ensureNoDataModelChanges<T>(callback: () => T): T {
|
||||
return callback();
|
||||
}
|
||||
|
|
|
@ -168,14 +168,7 @@ describe("Container create with feature flags", () => {
|
|||
|
||||
beforeEach("createAzureClient", () => {
|
||||
mockLogger = new MockLogger();
|
||||
client = createAzureClient(
|
||||
undefined,
|
||||
undefined,
|
||||
mockLogger,
|
||||
configProvider({
|
||||
"Fluid.ContainerRuntime.DisableOpReentryCheck": true,
|
||||
}),
|
||||
);
|
||||
client = createAzureClient(undefined, undefined, mockLogger, configProvider({}));
|
||||
schema = {
|
||||
initialObjects: {
|
||||
map1: SharedMap,
|
||||
|
@ -193,6 +186,6 @@ describe("Container create with feature flags", () => {
|
|||
const event = mockLogger.events.find((e) => e.eventName.endsWith("ContainerLoadStats"));
|
||||
assert(event !== undefined, "ContainerLoadStats event should exist");
|
||||
const featureGates = event.featureGates as string;
|
||||
assert(featureGates.includes('"disableOpReentryCheck":true'));
|
||||
assert(featureGates.length > 0);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
|
||||
import { strict as assert } from "assert";
|
||||
|
||||
import { describeCompat, itExpects } from "@fluid-private/test-version-utils";
|
||||
import { describeCompat } from "@fluid-private/test-version-utils";
|
||||
import { IContainer } from "@fluidframework/container-definitions/internal";
|
||||
import { ConfigTypes, IConfigProviderBase } from "@fluidframework/core-interfaces";
|
||||
import type { SharedDirectory, ISharedMap } from "@fluidframework/map/internal";
|
||||
|
@ -69,6 +69,7 @@ describeCompat(
|
|||
...containerConfig,
|
||||
loaderProps: { configProvider: configProvider(featureGates) },
|
||||
};
|
||||
provider.reset();
|
||||
container1 = await provider.makeTestContainer(configWithFeatureGates);
|
||||
container2 = await provider.loadTestContainer(configWithFeatureGates);
|
||||
|
||||
|
@ -89,26 +90,13 @@ describeCompat(
|
|||
await provider.ensureSynchronized();
|
||||
};
|
||||
|
||||
itExpects(
|
||||
"Should close the container when submitting an op while processing a batch",
|
||||
[
|
||||
{
|
||||
eventName: "fluid:telemetry:Container:ContainerClose",
|
||||
error: "Op was submitted from within a `ensureNoDataModelChanges` callback",
|
||||
},
|
||||
],
|
||||
async function () {
|
||||
it("Test reentrant op sending", async function () {
|
||||
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
||||
// This test is flaky on Tinylicious. ADO:5010
|
||||
this.skip();
|
||||
}
|
||||
|
||||
await setupContainers({
|
||||
...testContainerConfig,
|
||||
runtimeOptions: {
|
||||
enableOpReentryCheck: true,
|
||||
},
|
||||
});
|
||||
await setupContainers(testContainerConfig);
|
||||
|
||||
sharedMap1.on("valueChanged", (changed) => {
|
||||
if (changed.key !== "key2") {
|
||||
|
@ -116,22 +104,19 @@ describeCompat(
|
|||
}
|
||||
});
|
||||
|
||||
assert.throws(() => {
|
||||
sharedMap1.set("key1", "1");
|
||||
});
|
||||
|
||||
sharedMap2.set("key2", "2");
|
||||
|
||||
await provider.ensureSynchronized();
|
||||
|
||||
// The offending container is closed
|
||||
assert.ok(container1.closed);
|
||||
assert.ok(!container1.closed);
|
||||
assert.ok(!container2.closed);
|
||||
|
||||
// The other container is fine
|
||||
assert.equal(sharedMap2.get("key1"), undefined);
|
||||
assert.equal(sharedMap2.get("key1"), "1");
|
||||
assert.equal(sharedMap2.get("key2"), "2");
|
||||
assert.ok(!mapsAreEqual(sharedMap1, sharedMap2));
|
||||
},
|
||||
);
|
||||
assert.ok(mapsAreEqual(sharedMap1, sharedMap2));
|
||||
});
|
||||
|
||||
[false, true].forEach((enableGroupedBatching) => {
|
||||
it(`Eventual consistency with op reentry - ${
|
||||
|
@ -253,40 +238,6 @@ describeCompat(
|
|||
});
|
||||
|
||||
describe("Reentry safeguards", () => {
|
||||
itExpects(
|
||||
"Flushing is not supported",
|
||||
[
|
||||
{
|
||||
eventName: "fluid:telemetry:Container:ContainerClose",
|
||||
error: "Flushing is not supported inside DDS event handlers",
|
||||
},
|
||||
],
|
||||
async function () {
|
||||
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
||||
// This test is flaky on Tinylicious. ADO:5010
|
||||
this.skip();
|
||||
}
|
||||
|
||||
await setupContainers({
|
||||
...testContainerConfig,
|
||||
runtimeOptions: {
|
||||
flushMode: FlushMode.Immediate,
|
||||
},
|
||||
});
|
||||
|
||||
sharedString1.on("sequenceDelta", () =>
|
||||
assert.throws(() =>
|
||||
dataObject1.context.containerRuntime.orderSequentially(() =>
|
||||
sharedMap1.set("0", 0),
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
sharedString1.insertText(0, "ad");
|
||||
await provider.ensureSynchronized();
|
||||
},
|
||||
);
|
||||
|
||||
it("Flushing is supported if it happens in the next batch", async function () {
|
||||
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
||||
// This test is flaky on Tinylicious. ADO:5010
|
||||
|
@ -314,7 +265,7 @@ describeCompat(
|
|||
});
|
||||
});
|
||||
|
||||
it("Should throw when submitting an op while handling an event - offline", async function () {
|
||||
it("Should not throw when submitting an op while handling an event - offline", async function () {
|
||||
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
||||
// This test is flaky on Tinylicious. ADO:5010
|
||||
this.skip();
|
||||
|
@ -322,9 +273,7 @@ describeCompat(
|
|||
|
||||
await setupContainers({
|
||||
...testContainerConfig,
|
||||
runtimeOptions: {
|
||||
enableOpReentryCheck: true,
|
||||
},
|
||||
runtimeOptions: {},
|
||||
});
|
||||
|
||||
await container1.deltaManager.inbound.pause();
|
||||
|
@ -336,9 +285,7 @@ describeCompat(
|
|||
}
|
||||
});
|
||||
|
||||
assert.throws(() => {
|
||||
sharedMap1.set("key1", "1");
|
||||
});
|
||||
|
||||
container1.deltaManager.inbound.resume();
|
||||
container1.deltaManager.outbound.resume();
|
||||
|
@ -347,7 +294,7 @@ describeCompat(
|
|||
|
||||
// The offending container is not closed
|
||||
assert.ok(!container1.closed);
|
||||
assert.ok(!mapsAreEqual(sharedMap1, sharedMap2));
|
||||
assert.ok(mapsAreEqual(sharedMap1, sharedMap2));
|
||||
});
|
||||
|
||||
describe("Allow reentry", () =>
|
||||
|
@ -360,11 +307,8 @@ describeCompat(
|
|||
{
|
||||
options: {
|
||||
...testContainerConfig,
|
||||
runtimeOptions: {
|
||||
enableOpReentryCheck: true,
|
||||
runtimeOptions: {},
|
||||
},
|
||||
},
|
||||
featureGates: { "Fluid.ContainerRuntime.DisableOpReentryCheck": true },
|
||||
name: "Enabled by options, disabled by feature gate",
|
||||
},
|
||||
].forEach((testConfig) => {
|
||||
|
|
|
@ -102,7 +102,6 @@ export function generateRuntimeOptions(
|
|||
{ minimumBatchSizeInBytes: 500, compressionAlgorithm: CompressionAlgorithms.lz4 },
|
||||
],
|
||||
maxBatchSizeInBytes: [716800],
|
||||
enableOpReentryCheck: [true],
|
||||
// Compressed payloads exceeding this size will be chunked into messages of exactly this size
|
||||
chunkSizeInBytes: [204800],
|
||||
enableRuntimeIdCompressor: ["on", undefined, "delayed"],
|
||||
|
|
Загрузка…
Ссылка в новой задаче