Remove ability to reject reentrant ops (#20621)
Remove failed experiment from repo. It has been proven that pursuing this direction is not feasible / too expensive and does not provide good programming model. While it does not close venue for some misuse, we have covered some aspects of it (properly tracking referenceSequence number / rebasing for ops sent via re-entrancy). See https://dev.azure.com/fluidframework/internal/_workitems/edit/2322#12286846 for some details. Key changes: 1. Remove IContainerRuntimeOptions.enableOpReentryCheck and code associated with this option 2. Remove ensureNoDataModelChanges() from API surfaces where it's safe. Some interfaces can be cleaned only in the future (due to N/N-1 compat promises) 3. Instead of relying on DDS to use ensureNoDataModelChanges to catch re-entrnacy, runtime will call it from process(), thus casting wider net and also not requiring each DDS to do so. - the side-effect of it - it will not catch re-entrant local changes, but such changes should not be rebased, so that's Ok. Please note that opReentrancy.spec.ts has plenty of test coverage here, including UTs like "Eventual consistency with op reentry..." that uses SharedString to validate proper rebasing happens when dealing with reentrancy.
This commit is contained in:
Родитель
55b0f70003
Коммит
c9d156264b
|
@ -585,19 +585,7 @@ export abstract class SharedObjectCore<TEvent extends ISharedObjectEvents = ISha
|
||||||
*/
|
*/
|
||||||
public emit(event: EventEmitterEventType, ...args: any[]): boolean {
|
public emit(event: EventEmitterEventType, ...args: any[]): boolean {
|
||||||
return this.callbacksHelper.measure(() => {
|
return this.callbacksHelper.measure(() => {
|
||||||
// Creating ops while handling a DDS event can lead
|
return super.emit(event, ...args);
|
||||||
// to undefined behavior and events observed in the wrong order.
|
|
||||||
// For example, we have two callbacks registered for a DDS, A and B.
|
|
||||||
// Then if on change #1 callback A creates change #2, the invocation flow will be:
|
|
||||||
//
|
|
||||||
// A because of #1
|
|
||||||
// A because of #2
|
|
||||||
// B because of #2
|
|
||||||
// B because of #1
|
|
||||||
//
|
|
||||||
// The runtime must enforce op coherence by not allowing any ops to be created during
|
|
||||||
// the event handler
|
|
||||||
return this.runtime.ensureNoDataModelChanges(() => super.emit(event, ...args));
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -723,7 +723,6 @@ export interface IContainerRuntimeOptions {
|
||||||
readonly chunkSizeInBytes?: number;
|
readonly chunkSizeInBytes?: number;
|
||||||
readonly compressionOptions?: ICompressionRuntimeOptions;
|
readonly compressionOptions?: ICompressionRuntimeOptions;
|
||||||
readonly enableGroupedBatching?: boolean;
|
readonly enableGroupedBatching?: boolean;
|
||||||
readonly enableOpReentryCheck?: boolean;
|
|
||||||
readonly enableRuntimeIdCompressor?: IdCompressorMode;
|
readonly enableRuntimeIdCompressor?: IdCompressorMode;
|
||||||
readonly explicitSchemaControl?: boolean;
|
readonly explicitSchemaControl?: boolean;
|
||||||
readonly flushMode?: FlushMode;
|
readonly flushMode?: FlushMode;
|
||||||
|
|
|
@ -168,6 +168,7 @@ export function wrapContext(context: IFluidParentContext): IFluidParentContext {
|
||||||
getAudience: (...args) => {
|
getAudience: (...args) => {
|
||||||
return context.getAudience(...args);
|
return context.getAudience(...args);
|
||||||
},
|
},
|
||||||
|
// back-compat, to be removed in 2.0
|
||||||
ensureNoDataModelChanges: (...args) => {
|
ensureNoDataModelChanges: (...args) => {
|
||||||
return context.ensureNoDataModelChanges(...args);
|
return context.ensureNoDataModelChanges(...args);
|
||||||
},
|
},
|
||||||
|
|
|
@ -159,7 +159,6 @@ import {
|
||||||
OpSplitter,
|
OpSplitter,
|
||||||
Outbox,
|
Outbox,
|
||||||
RemoteMessageProcessor,
|
RemoteMessageProcessor,
|
||||||
getLongStack,
|
|
||||||
} from "./opLifecycle/index.js";
|
} from "./opLifecycle/index.js";
|
||||||
import { pkgVersion } from "./packageVersion.js";
|
import { pkgVersion } from "./packageVersion.js";
|
||||||
import {
|
import {
|
||||||
|
@ -459,16 +458,6 @@ export interface IContainerRuntimeOptions {
|
||||||
*/
|
*/
|
||||||
readonly enableRuntimeIdCompressor?: IdCompressorMode;
|
readonly enableRuntimeIdCompressor?: IdCompressorMode;
|
||||||
|
|
||||||
/**
|
|
||||||
* If enabled, the runtime will block all attempts to send an op inside the
|
|
||||||
* {@link ContainerRuntime#ensureNoDataModelChanges} callback. The callback is used by
|
|
||||||
* {@link @fluidframework/shared-object-base#SharedObjectCore} for event handlers so enabling this
|
|
||||||
* will disallow modifying DDSes while handling DDS events.
|
|
||||||
*
|
|
||||||
* By default, the feature is disabled. If enabled from options, the `Fluid.ContainerRuntime.DisableOpReentryCheck`
|
|
||||||
* can be used to disable it at runtime.
|
|
||||||
*/
|
|
||||||
readonly enableOpReentryCheck?: boolean;
|
|
||||||
/**
|
/**
|
||||||
* If enabled, the runtime will group messages within a batch into a single
|
* If enabled, the runtime will group messages within a batch into a single
|
||||||
* message to be sent to the service.
|
* message to be sent to the service.
|
||||||
|
@ -808,7 +797,6 @@ export class ContainerRuntime
|
||||||
maxBatchSizeInBytes = defaultMaxBatchSizeInBytes,
|
maxBatchSizeInBytes = defaultMaxBatchSizeInBytes,
|
||||||
enableRuntimeIdCompressor,
|
enableRuntimeIdCompressor,
|
||||||
chunkSizeInBytes = defaultChunkSizeInBytes,
|
chunkSizeInBytes = defaultChunkSizeInBytes,
|
||||||
enableOpReentryCheck = false,
|
|
||||||
enableGroupedBatching = false,
|
enableGroupedBatching = false,
|
||||||
explicitSchemaControl = false,
|
explicitSchemaControl = false,
|
||||||
} = runtimeOptions;
|
} = runtimeOptions;
|
||||||
|
@ -1021,7 +1009,6 @@ export class ContainerRuntime
|
||||||
chunkSizeInBytes,
|
chunkSizeInBytes,
|
||||||
// Requires<> drops undefined from IdCompressorType
|
// Requires<> drops undefined from IdCompressorType
|
||||||
enableRuntimeIdCompressor: enableRuntimeIdCompressor as "on" | "delayed",
|
enableRuntimeIdCompressor: enableRuntimeIdCompressor as "on" | "delayed",
|
||||||
enableOpReentryCheck,
|
|
||||||
enableGroupedBatching,
|
enableGroupedBatching,
|
||||||
explicitSchemaControl,
|
explicitSchemaControl,
|
||||||
},
|
},
|
||||||
|
@ -1224,14 +1211,6 @@ export class ContainerRuntime
|
||||||
|
|
||||||
private ensureNoDataModelChangesCalls = 0;
|
private ensureNoDataModelChangesCalls = 0;
|
||||||
|
|
||||||
/**
|
|
||||||
* Tracks the number of detected reentrant ops to report,
|
|
||||||
* in order to self-throttle the telemetry events.
|
|
||||||
*
|
|
||||||
* This should be removed as part of ADO:2322
|
|
||||||
*/
|
|
||||||
private opReentryCallsToReport = 5;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invokes the given callback and expects that no ops are submitted
|
* Invokes the given callback and expects that no ops are submitted
|
||||||
* until execution finishes. If an op is submitted, an error will be raised.
|
* until execution finishes. If an op is submitted, an error will be raised.
|
||||||
|
@ -1265,7 +1244,6 @@ export class ContainerRuntime
|
||||||
|
|
||||||
private dirtyContainer: boolean;
|
private dirtyContainer: boolean;
|
||||||
private emitDirtyDocumentEvent = true;
|
private emitDirtyDocumentEvent = true;
|
||||||
private readonly enableOpReentryCheck: boolean;
|
|
||||||
private readonly disableAttachReorder: boolean | undefined;
|
private readonly disableAttachReorder: boolean | undefined;
|
||||||
private readonly closeSummarizerDelayMs: number;
|
private readonly closeSummarizerDelayMs: number;
|
||||||
/**
|
/**
|
||||||
|
@ -1549,14 +1527,6 @@ export class ContainerRuntime
|
||||||
this.validateSummaryHeuristicConfiguration(this.summaryConfiguration);
|
this.validateSummaryHeuristicConfiguration(this.summaryConfiguration);
|
||||||
}
|
}
|
||||||
|
|
||||||
const disableOpReentryCheck = this.mc.config.getBoolean(
|
|
||||||
"Fluid.ContainerRuntime.DisableOpReentryCheck",
|
|
||||||
);
|
|
||||||
this.enableOpReentryCheck =
|
|
||||||
runtimeOptions.enableOpReentryCheck === true &&
|
|
||||||
// Allow for a break-glass config to override the options
|
|
||||||
disableOpReentryCheck !== true;
|
|
||||||
|
|
||||||
this.summariesDisabled = this.isSummariesDisabled();
|
this.summariesDisabled = this.isSummariesDisabled();
|
||||||
this.maxOpsSinceLastSummary = this.getMaxOpsSinceLastSummary();
|
this.maxOpsSinceLastSummary = this.getMaxOpsSinceLastSummary();
|
||||||
this.initialSummarizerDelayMs = this.getInitialSummarizerDelayMs();
|
this.initialSummarizerDelayMs = this.getInitialSummarizerDelayMs();
|
||||||
|
@ -1868,7 +1838,6 @@ export class ContainerRuntime
|
||||||
idCompressorMode: this.idCompressorMode,
|
idCompressorMode: this.idCompressorMode,
|
||||||
featureGates: JSON.stringify({
|
featureGates: JSON.stringify({
|
||||||
...featureGatesForTelemetry,
|
...featureGatesForTelemetry,
|
||||||
disableOpReentryCheck,
|
|
||||||
disableChunking,
|
disableChunking,
|
||||||
disableAttachReorder: this.disableAttachReorder,
|
disableAttachReorder: this.disableAttachReorder,
|
||||||
disablePartialFlush,
|
disablePartialFlush,
|
||||||
|
@ -2550,20 +2519,25 @@ export class ContainerRuntime
|
||||||
// but will not modify the contents object (likely it will replace it on the message).
|
// but will not modify the contents object (likely it will replace it on the message).
|
||||||
const messageCopy = { ...messageArg };
|
const messageCopy = { ...messageArg };
|
||||||
for (const message of this.remoteMessageProcessor.process(messageCopy)) {
|
for (const message of this.remoteMessageProcessor.process(messageCopy)) {
|
||||||
if (modernRuntimeMessage) {
|
const msg: MessageWithContext = modernRuntimeMessage
|
||||||
this.processCore({
|
? {
|
||||||
// Cast it since we expect it to be this based on modernRuntimeMessage computation above.
|
// Cast it since we expect it to be this based on modernRuntimeMessage computation above.
|
||||||
// There is nothing really ensuring that anytime original message.type is Operation that
|
// There is nothing really ensuring that anytime original message.type is Operation that
|
||||||
// the result messages will be so. In the end modern bool being true only directs to
|
// the result messages will be so. In the end modern bool being true only directs to
|
||||||
// throw error if ultimately unrecognized without compat details saying otherwise.
|
// throw error if ultimately unrecognized without compat details saying otherwise.
|
||||||
message: message as InboundSequencedContainerRuntimeMessage,
|
message: message as InboundSequencedContainerRuntimeMessage,
|
||||||
local,
|
local,
|
||||||
modernRuntimeMessage,
|
modernRuntimeMessage,
|
||||||
});
|
}
|
||||||
} else {
|
: // Unrecognized message will be ignored.
|
||||||
// Unrecognized message will be ignored.
|
{
|
||||||
this.processCore({ message, local, modernRuntimeMessage });
|
message,
|
||||||
}
|
local,
|
||||||
|
modernRuntimeMessage,
|
||||||
|
};
|
||||||
|
|
||||||
|
// ensure that we observe any re-entrancy, and if needed, rebase ops
|
||||||
|
this.ensureNoDataModelChanges(() => this.processCore(msg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3884,7 +3858,6 @@ export class ContainerRuntime
|
||||||
metadata?: { localId: string; blobId?: string },
|
metadata?: { localId: string; blobId?: string },
|
||||||
): void {
|
): void {
|
||||||
this.verifyNotClosed();
|
this.verifyNotClosed();
|
||||||
this.verifyCanSubmitOps();
|
|
||||||
|
|
||||||
// There should be no ops in detached container state!
|
// There should be no ops in detached container state!
|
||||||
assert(
|
assert(
|
||||||
|
@ -4039,37 +4012,6 @@ export class ContainerRuntime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private verifyCanSubmitOps() {
|
|
||||||
if (this.ensureNoDataModelChangesCalls > 0) {
|
|
||||||
const errorMessage =
|
|
||||||
"Op was submitted from within a `ensureNoDataModelChanges` callback";
|
|
||||||
if (this.opReentryCallsToReport > 0) {
|
|
||||||
this.mc.logger.sendTelemetryEvent(
|
|
||||||
{ eventName: "OpReentry" },
|
|
||||||
// We need to capture the call stack in order to inspect the source of this usage pattern
|
|
||||||
getLongStack(() => new UsageError(errorMessage)),
|
|
||||||
);
|
|
||||||
this.opReentryCallsToReport--;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Creating ops while processing ops can lead
|
|
||||||
// to undefined behavior and events observed in the wrong order.
|
|
||||||
// For example, we have two callbacks registered for a DDS, A and B.
|
|
||||||
// Then if on change #1 callback A creates change #2, the invocation flow will be:
|
|
||||||
//
|
|
||||||
// A because of #1
|
|
||||||
// A because of #2
|
|
||||||
// B because of #2
|
|
||||||
// B because of #1
|
|
||||||
//
|
|
||||||
// The runtime must enforce op coherence by not allowing ops to be submitted
|
|
||||||
// while ops are being processed.
|
|
||||||
if (this.enableOpReentryCheck) {
|
|
||||||
throw new UsageError(errorMessage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private reSubmitBatch(batch: IPendingBatchMessage[]) {
|
private reSubmitBatch(batch: IPendingBatchMessage[]) {
|
||||||
this.orderSequentially(() => {
|
this.orderSequentially(() => {
|
||||||
for (const message of batch) {
|
for (const message of batch) {
|
||||||
|
|
|
@ -219,6 +219,7 @@ export abstract class FluidDataStoreContext
|
||||||
return this._containerRuntime;
|
return this._containerRuntime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// back-compat, to be removed in 2.0
|
||||||
public ensureNoDataModelChanges<T>(callback: () => T): T {
|
public ensureNoDataModelChanges<T>(callback: () => T): T {
|
||||||
return this.parentContext.ensureNoDataModelChanges(callback);
|
return this.parentContext.ensureNoDataModelChanges(callback);
|
||||||
}
|
}
|
||||||
|
|
|
@ -542,151 +542,6 @@ describe("Runtime", () => {
|
||||||
});
|
});
|
||||||
}));
|
}));
|
||||||
|
|
||||||
describe("Op reentry enforcement", () => {
|
|
||||||
let containerRuntime: ContainerRuntime;
|
|
||||||
|
|
||||||
it("By default, don't enforce the op reentry check", async () => {
|
|
||||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
|
||||||
context: getMockContext() as IContainerContext,
|
|
||||||
registryEntries: [],
|
|
||||||
provideEntryPoint: mockProvideEntryPoint,
|
|
||||||
existing: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
assert.ok(
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() => {
|
|
||||||
submitDataStoreOp(containerRuntime, "id", "test");
|
|
||||||
return true;
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
assert.ok(
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() =>
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() =>
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() => {
|
|
||||||
submitDataStoreOp(containerRuntime, "id", "test");
|
|
||||||
return true;
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("If option enabled, enforce the op reentry check", async () => {
|
|
||||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
|
||||||
context: getMockContext() as IContainerContext,
|
|
||||||
registryEntries: [],
|
|
||||||
runtimeOptions: {
|
|
||||||
enableOpReentryCheck: true,
|
|
||||||
},
|
|
||||||
provideEntryPoint: mockProvideEntryPoint,
|
|
||||||
existing: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
assert.throws(() =>
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() =>
|
|
||||||
submitDataStoreOp(containerRuntime, "id", "test"),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
|
|
||||||
assert.throws(() =>
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() =>
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() =>
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() =>
|
|
||||||
submitDataStoreOp(containerRuntime, "id", "test"),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("If option enabled but disabled via feature gate, don't enforce the op reentry check", async () => {
|
|
||||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
|
||||||
context: getMockContext({
|
|
||||||
"Fluid.ContainerRuntime.DisableOpReentryCheck": true,
|
|
||||||
}) as IContainerContext,
|
|
||||||
registryEntries: [],
|
|
||||||
runtimeOptions: {
|
|
||||||
enableOpReentryCheck: true,
|
|
||||||
},
|
|
||||||
provideEntryPoint: mockProvideEntryPoint,
|
|
||||||
existing: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() =>
|
|
||||||
submitDataStoreOp(containerRuntime, "id", "test"),
|
|
||||||
);
|
|
||||||
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() =>
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() =>
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() =>
|
|
||||||
submitDataStoreOp(containerRuntime, "id", "test"),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Report at most 5 reentrant ops", async () => {
|
|
||||||
const mockLogger = new MockLogger();
|
|
||||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
|
||||||
context: getMockContext({}, mockLogger) as IContainerContext,
|
|
||||||
registryEntries: [],
|
|
||||||
provideEntryPoint: mockProvideEntryPoint,
|
|
||||||
existing: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
mockLogger.clear();
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() => {
|
|
||||||
for (let i = 0; i < 10; i++) {
|
|
||||||
submitDataStoreOp(containerRuntime, "id", "test");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// We expect only 5 events
|
|
||||||
mockLogger.assertMatchStrict(
|
|
||||||
Array.from(Array(5).keys()).map(() => ({
|
|
||||||
eventName: "ContainerRuntime:OpReentry",
|
|
||||||
error: "Op was submitted from within a `ensureNoDataModelChanges` callback",
|
|
||||||
})),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Can't call flush() inside ensureNoDataModelChanges's callback", async () => {
|
|
||||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
|
||||||
context: getMockContext() as IContainerContext,
|
|
||||||
registryEntries: [],
|
|
||||||
runtimeOptions: {
|
|
||||||
flushMode: FlushMode.Immediate,
|
|
||||||
},
|
|
||||||
provideEntryPoint: mockProvideEntryPoint,
|
|
||||||
existing: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
assert.throws(() =>
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() => {
|
|
||||||
containerRuntime.orderSequentially(() => {});
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Can't create an infinite ensureNoDataModelChanges recursive call ", async () => {
|
|
||||||
containerRuntime = await ContainerRuntime.loadRuntime({
|
|
||||||
context: getMockContext() as IContainerContext,
|
|
||||||
registryEntries: [],
|
|
||||||
provideEntryPoint: mockProvideEntryPoint,
|
|
||||||
existing: false,
|
|
||||||
});
|
|
||||||
|
|
||||||
const callback = () => {
|
|
||||||
containerRuntime.ensureNoDataModelChanges(() => {
|
|
||||||
submitDataStoreOp(containerRuntime, "id", "test");
|
|
||||||
callback();
|
|
||||||
});
|
|
||||||
};
|
|
||||||
assert.throws(() => callback());
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("orderSequentially with rollback", () =>
|
describe("orderSequentially with rollback", () =>
|
||||||
[
|
[
|
||||||
FlushMode.TurnBased,
|
FlushMode.TurnBased,
|
||||||
|
@ -1569,7 +1424,6 @@ describe("Runtime", () => {
|
||||||
maxBatchSizeInBytes: 700 * 1024,
|
maxBatchSizeInBytes: 700 * 1024,
|
||||||
chunkSizeInBytes: 204800,
|
chunkSizeInBytes: 204800,
|
||||||
enableRuntimeIdCompressor: undefined,
|
enableRuntimeIdCompressor: undefined,
|
||||||
enableOpReentryCheck: false,
|
|
||||||
enableGroupedBatching: false,
|
enableGroupedBatching: false,
|
||||||
explicitSchemaControl: false,
|
explicitSchemaControl: false,
|
||||||
} satisfies IContainerRuntimeOptions;
|
} satisfies IContainerRuntimeOptions;
|
||||||
|
@ -1598,7 +1452,6 @@ describe("Runtime", () => {
|
||||||
const featureGates = {
|
const featureGates = {
|
||||||
"Fluid.ContainerRuntime.CompressionDisabled": true,
|
"Fluid.ContainerRuntime.CompressionDisabled": true,
|
||||||
"Fluid.ContainerRuntime.CompressionChunkingDisabled": true,
|
"Fluid.ContainerRuntime.CompressionChunkingDisabled": true,
|
||||||
"Fluid.ContainerRuntime.DisableOpReentryCheck": false,
|
|
||||||
"Fluid.ContainerRuntime.IdCompressorEnabled": true,
|
"Fluid.ContainerRuntime.IdCompressorEnabled": true,
|
||||||
"Fluid.ContainerRuntime.DisableGroupedBatching": true,
|
"Fluid.ContainerRuntime.DisableGroupedBatching": true,
|
||||||
};
|
};
|
||||||
|
@ -1619,7 +1472,6 @@ describe("Runtime", () => {
|
||||||
featureGates: JSON.stringify({
|
featureGates: JSON.stringify({
|
||||||
disableGroupedBatching: true,
|
disableGroupedBatching: true,
|
||||||
disableCompression: true,
|
disableCompression: true,
|
||||||
disableOpReentryCheck: false,
|
|
||||||
disableChunking: true,
|
disableChunking: true,
|
||||||
}),
|
}),
|
||||||
groupedBatchingEnabled: false,
|
groupedBatchingEnabled: false,
|
||||||
|
|
|
@ -101,7 +101,6 @@ export interface IFluidDataStoreRuntime extends IEventProvider<IFluidDataStoreRu
|
||||||
createChannel(id: string | undefined, type: string): IChannel;
|
createChannel(id: string | undefined, type: string): IChannel;
|
||||||
// (undocumented)
|
// (undocumented)
|
||||||
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
|
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
|
||||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
|
||||||
readonly entryPoint: IFluidHandle<FluidObject>;
|
readonly entryPoint: IFluidHandle<FluidObject>;
|
||||||
getAudience(): IAudience;
|
getAudience(): IAudience;
|
||||||
getChannel(id: string): Promise<IChannel>;
|
getChannel(id: string): Promise<IChannel>;
|
||||||
|
|
|
@ -72,16 +72,6 @@ export interface IFluidDataStoreRuntime
|
||||||
*/
|
*/
|
||||||
getChannel(id: string): Promise<IChannel>;
|
getChannel(id: string): Promise<IChannel>;
|
||||||
|
|
||||||
/**
|
|
||||||
* Invokes the given callback and expects that no ops are submitted
|
|
||||||
* until execution finishes. If an op is submitted, an error will be raised.
|
|
||||||
*
|
|
||||||
* Can be disabled by feature gate `Fluid.ContainerRuntime.DisableOpReentryCheck`
|
|
||||||
*
|
|
||||||
* @param callback - the callback to be invoked
|
|
||||||
*/
|
|
||||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new channel of the given type.
|
* Creates a new channel of the given type.
|
||||||
* @param id - ID of the channel to be created. A unique ID will be generated if left undefined.
|
* @param id - ID of the channel to be created. A unique ID will be generated if left undefined.
|
||||||
|
|
|
@ -69,7 +69,6 @@ export class FluidDataStoreRuntime extends TypedEventEmitter<IFluidDataStoreRunt
|
||||||
dispose(): void;
|
dispose(): void;
|
||||||
// (undocumented)
|
// (undocumented)
|
||||||
get disposed(): boolean;
|
get disposed(): boolean;
|
||||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
|
||||||
readonly entryPoint: IFluidHandle<FluidObject>;
|
readonly entryPoint: IFluidHandle<FluidObject>;
|
||||||
getAttachGCData(telemetryContext?: ITelemetryContext): IGarbageCollectionData;
|
getAttachGCData(telemetryContext?: ITelemetryContext): IGarbageCollectionData;
|
||||||
// (undocumented)
|
// (undocumented)
|
||||||
|
|
|
@ -146,7 +146,8 @@
|
||||||
"typeValidation": {
|
"typeValidation": {
|
||||||
"broken": {
|
"broken": {
|
||||||
"ClassDeclaration_FluidDataStoreRuntime": {
|
"ClassDeclaration_FluidDataStoreRuntime": {
|
||||||
"forwardCompat": false
|
"forwardCompat": false,
|
||||||
|
"backCompat": false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,21 +202,6 @@ export class FluidDataStoreRuntime
|
||||||
*/
|
*/
|
||||||
private localChangesTelemetryCount: number;
|
private localChangesTelemetryCount: number;
|
||||||
|
|
||||||
/**
|
|
||||||
* Invokes the given callback and expects that no ops are submitted
|
|
||||||
* until execution finishes. If an op is submitted, an error will be raised.
|
|
||||||
*
|
|
||||||
* Can be disabled by feature gate `Fluid.ContainerRuntime.DisableOpReentryCheck`
|
|
||||||
*
|
|
||||||
* @param callback - the callback to be invoked
|
|
||||||
*/
|
|
||||||
public ensureNoDataModelChanges<T>(callback: () => T): T {
|
|
||||||
// back-compat ADO:2309
|
|
||||||
return this.dataStoreContext.ensureNoDataModelChanges === undefined
|
|
||||||
? callback()
|
|
||||||
: this.dataStoreContext.ensureNoDataModelChanges(callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an instance of a DataStore runtime.
|
* Create an instance of a DataStore runtime.
|
||||||
*
|
*
|
||||||
|
|
|
@ -70,6 +70,7 @@ declare function get_current_ClassDeclaration_FluidDataStoreRuntime():
|
||||||
declare function use_old_ClassDeclaration_FluidDataStoreRuntime(
|
declare function use_old_ClassDeclaration_FluidDataStoreRuntime(
|
||||||
use: TypeOnly<old.FluidDataStoreRuntime>): void;
|
use: TypeOnly<old.FluidDataStoreRuntime>): void;
|
||||||
use_old_ClassDeclaration_FluidDataStoreRuntime(
|
use_old_ClassDeclaration_FluidDataStoreRuntime(
|
||||||
|
// @ts-expect-error compatibility expected to be broken
|
||||||
get_current_ClassDeclaration_FluidDataStoreRuntime());
|
get_current_ClassDeclaration_FluidDataStoreRuntime());
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -257,6 +257,7 @@ export interface IFluidParentContext extends IProvideFluidHandleContext, Partial
|
||||||
deleteChildSummarizerNode(id: string): void;
|
deleteChildSummarizerNode(id: string): void;
|
||||||
// (undocumented)
|
// (undocumented)
|
||||||
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
|
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
|
||||||
|
// @deprecated
|
||||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
ensureNoDataModelChanges<T>(callback: () => T): T;
|
||||||
// (undocumented)
|
// (undocumented)
|
||||||
readonly gcThrowOnTombstoneUsage: boolean;
|
readonly gcThrowOnTombstoneUsage: boolean;
|
||||||
|
|
|
@ -437,6 +437,9 @@ export interface IFluidParentContext
|
||||||
* Can be disabled by feature gate `Fluid.ContainerRuntime.DisableOpReentryCheck`
|
* Can be disabled by feature gate `Fluid.ContainerRuntime.DisableOpReentryCheck`
|
||||||
*
|
*
|
||||||
* @param callback - the callback to be invoked
|
* @param callback - the callback to be invoked
|
||||||
|
*
|
||||||
|
* @deprecated
|
||||||
|
* // back-compat: to be removed in 2.0
|
||||||
*/
|
*/
|
||||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
ensureNoDataModelChanges<T>(callback: () => T): T;
|
||||||
|
|
||||||
|
|
|
@ -466,8 +466,6 @@ export class MockFluidDataStoreRuntime extends EventEmitter implements IFluidDat
|
||||||
// (undocumented)
|
// (undocumented)
|
||||||
readonly documentId: string;
|
readonly documentId: string;
|
||||||
// (undocumented)
|
// (undocumented)
|
||||||
ensureNoDataModelChanges<T>(callback: () => T): T;
|
|
||||||
// (undocumented)
|
|
||||||
readonly entryPoint: IFluidHandle<FluidObject>;
|
readonly entryPoint: IFluidHandle<FluidObject>;
|
||||||
// (undocumented)
|
// (undocumented)
|
||||||
readonly existing: boolean;
|
readonly existing: boolean;
|
||||||
|
|
|
@ -766,10 +766,6 @@ export class MockFluidDataStoreRuntime
|
||||||
return deltaConnection;
|
return deltaConnection;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ensureNoDataModelChanges<T>(callback: () => T): T {
|
|
||||||
return callback();
|
|
||||||
}
|
|
||||||
|
|
||||||
public get absolutePath() {
|
public get absolutePath() {
|
||||||
return `/${this.id}`;
|
return `/${this.id}`;
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,6 +86,7 @@ export class MockFluidDataStoreContext implements IFluidDataStoreContext {
|
||||||
throw new Error("Method not implemented.");
|
throw new Error("Method not implemented.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// back-compat: to be removed in 2.0
|
||||||
public ensureNoDataModelChanges<T>(callback: () => T): T {
|
public ensureNoDataModelChanges<T>(callback: () => T): T {
|
||||||
return callback();
|
return callback();
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,14 +168,7 @@ describe("Container create with feature flags", () => {
|
||||||
|
|
||||||
beforeEach("createAzureClient", () => {
|
beforeEach("createAzureClient", () => {
|
||||||
mockLogger = new MockLogger();
|
mockLogger = new MockLogger();
|
||||||
client = createAzureClient(
|
client = createAzureClient(undefined, undefined, mockLogger, configProvider({}));
|
||||||
undefined,
|
|
||||||
undefined,
|
|
||||||
mockLogger,
|
|
||||||
configProvider({
|
|
||||||
"Fluid.ContainerRuntime.DisableOpReentryCheck": true,
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
schema = {
|
schema = {
|
||||||
initialObjects: {
|
initialObjects: {
|
||||||
map1: SharedMap,
|
map1: SharedMap,
|
||||||
|
@ -193,6 +186,6 @@ describe("Container create with feature flags", () => {
|
||||||
const event = mockLogger.events.find((e) => e.eventName.endsWith("ContainerLoadStats"));
|
const event = mockLogger.events.find((e) => e.eventName.endsWith("ContainerLoadStats"));
|
||||||
assert(event !== undefined, "ContainerLoadStats event should exist");
|
assert(event !== undefined, "ContainerLoadStats event should exist");
|
||||||
const featureGates = event.featureGates as string;
|
const featureGates = event.featureGates as string;
|
||||||
assert(featureGates.includes('"disableOpReentryCheck":true'));
|
assert(featureGates.length > 0);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
import { strict as assert } from "assert";
|
import { strict as assert } from "assert";
|
||||||
|
|
||||||
import { describeCompat, itExpects } from "@fluid-private/test-version-utils";
|
import { describeCompat } from "@fluid-private/test-version-utils";
|
||||||
import { IContainer } from "@fluidframework/container-definitions/internal";
|
import { IContainer } from "@fluidframework/container-definitions/internal";
|
||||||
import { ConfigTypes, IConfigProviderBase } from "@fluidframework/core-interfaces";
|
import { ConfigTypes, IConfigProviderBase } from "@fluidframework/core-interfaces";
|
||||||
import type { SharedDirectory, ISharedMap } from "@fluidframework/map/internal";
|
import type { SharedDirectory, ISharedMap } from "@fluidframework/map/internal";
|
||||||
|
@ -69,6 +69,7 @@ describeCompat(
|
||||||
...containerConfig,
|
...containerConfig,
|
||||||
loaderProps: { configProvider: configProvider(featureGates) },
|
loaderProps: { configProvider: configProvider(featureGates) },
|
||||||
};
|
};
|
||||||
|
provider.reset();
|
||||||
container1 = await provider.makeTestContainer(configWithFeatureGates);
|
container1 = await provider.makeTestContainer(configWithFeatureGates);
|
||||||
container2 = await provider.loadTestContainer(configWithFeatureGates);
|
container2 = await provider.loadTestContainer(configWithFeatureGates);
|
||||||
|
|
||||||
|
@ -89,49 +90,33 @@ describeCompat(
|
||||||
await provider.ensureSynchronized();
|
await provider.ensureSynchronized();
|
||||||
};
|
};
|
||||||
|
|
||||||
itExpects(
|
it("Test reentrant op sending", async function () {
|
||||||
"Should close the container when submitting an op while processing a batch",
|
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
||||||
[
|
// This test is flaky on Tinylicious. ADO:5010
|
||||||
{
|
this.skip();
|
||||||
eventName: "fluid:telemetry:Container:ContainerClose",
|
}
|
||||||
error: "Op was submitted from within a `ensureNoDataModelChanges` callback",
|
|
||||||
},
|
await setupContainers(testContainerConfig);
|
||||||
],
|
|
||||||
async function () {
|
sharedMap1.on("valueChanged", (changed) => {
|
||||||
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
if (changed.key !== "key2") {
|
||||||
// This test is flaky on Tinylicious. ADO:5010
|
sharedMap1.set("key2", `${sharedMap1.get("key1")} updated`);
|
||||||
this.skip();
|
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
|
||||||
await setupContainers({
|
sharedMap1.set("key1", "1");
|
||||||
...testContainerConfig,
|
sharedMap2.set("key2", "2");
|
||||||
runtimeOptions: {
|
|
||||||
enableOpReentryCheck: true,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
sharedMap1.on("valueChanged", (changed) => {
|
await provider.ensureSynchronized();
|
||||||
if (changed.key !== "key2") {
|
|
||||||
sharedMap1.set("key2", `${sharedMap1.get("key1")} updated`);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
assert.throws(() => {
|
assert.ok(!container1.closed);
|
||||||
sharedMap1.set("key1", "1");
|
assert.ok(!container2.closed);
|
||||||
});
|
|
||||||
|
|
||||||
sharedMap2.set("key2", "2");
|
// The other container is fine
|
||||||
await provider.ensureSynchronized();
|
assert.equal(sharedMap2.get("key1"), "1");
|
||||||
|
assert.equal(sharedMap2.get("key2"), "2");
|
||||||
// The offending container is closed
|
assert.ok(mapsAreEqual(sharedMap1, sharedMap2));
|
||||||
assert.ok(container1.closed);
|
});
|
||||||
|
|
||||||
// The other container is fine
|
|
||||||
assert.equal(sharedMap2.get("key1"), undefined);
|
|
||||||
assert.equal(sharedMap2.get("key2"), "2");
|
|
||||||
assert.ok(!mapsAreEqual(sharedMap1, sharedMap2));
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
[false, true].forEach((enableGroupedBatching) => {
|
[false, true].forEach((enableGroupedBatching) => {
|
||||||
it(`Eventual consistency with op reentry - ${
|
it(`Eventual consistency with op reentry - ${
|
||||||
|
@ -253,40 +238,6 @@ describeCompat(
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("Reentry safeguards", () => {
|
describe("Reentry safeguards", () => {
|
||||||
itExpects(
|
|
||||||
"Flushing is not supported",
|
|
||||||
[
|
|
||||||
{
|
|
||||||
eventName: "fluid:telemetry:Container:ContainerClose",
|
|
||||||
error: "Flushing is not supported inside DDS event handlers",
|
|
||||||
},
|
|
||||||
],
|
|
||||||
async function () {
|
|
||||||
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
|
||||||
// This test is flaky on Tinylicious. ADO:5010
|
|
||||||
this.skip();
|
|
||||||
}
|
|
||||||
|
|
||||||
await setupContainers({
|
|
||||||
...testContainerConfig,
|
|
||||||
runtimeOptions: {
|
|
||||||
flushMode: FlushMode.Immediate,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
sharedString1.on("sequenceDelta", () =>
|
|
||||||
assert.throws(() =>
|
|
||||||
dataObject1.context.containerRuntime.orderSequentially(() =>
|
|
||||||
sharedMap1.set("0", 0),
|
|
||||||
),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
|
|
||||||
sharedString1.insertText(0, "ad");
|
|
||||||
await provider.ensureSynchronized();
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
it("Flushing is supported if it happens in the next batch", async function () {
|
it("Flushing is supported if it happens in the next batch", async function () {
|
||||||
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
||||||
// This test is flaky on Tinylicious. ADO:5010
|
// This test is flaky on Tinylicious. ADO:5010
|
||||||
|
@ -314,7 +265,7 @@ describeCompat(
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Should throw when submitting an op while handling an event - offline", async function () {
|
it("Should not throw when submitting an op while handling an event - offline", async function () {
|
||||||
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
if (provider.driver.type === "t9s" || provider.driver.type === "tinylicious") {
|
||||||
// This test is flaky on Tinylicious. ADO:5010
|
// This test is flaky on Tinylicious. ADO:5010
|
||||||
this.skip();
|
this.skip();
|
||||||
|
@ -322,9 +273,7 @@ describeCompat(
|
||||||
|
|
||||||
await setupContainers({
|
await setupContainers({
|
||||||
...testContainerConfig,
|
...testContainerConfig,
|
||||||
runtimeOptions: {
|
runtimeOptions: {},
|
||||||
enableOpReentryCheck: true,
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
|
|
||||||
await container1.deltaManager.inbound.pause();
|
await container1.deltaManager.inbound.pause();
|
||||||
|
@ -336,9 +285,7 @@ describeCompat(
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
assert.throws(() => {
|
sharedMap1.set("key1", "1");
|
||||||
sharedMap1.set("key1", "1");
|
|
||||||
});
|
|
||||||
|
|
||||||
container1.deltaManager.inbound.resume();
|
container1.deltaManager.inbound.resume();
|
||||||
container1.deltaManager.outbound.resume();
|
container1.deltaManager.outbound.resume();
|
||||||
|
@ -347,7 +294,7 @@ describeCompat(
|
||||||
|
|
||||||
// The offending container is not closed
|
// The offending container is not closed
|
||||||
assert.ok(!container1.closed);
|
assert.ok(!container1.closed);
|
||||||
assert.ok(!mapsAreEqual(sharedMap1, sharedMap2));
|
assert.ok(mapsAreEqual(sharedMap1, sharedMap2));
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("Allow reentry", () =>
|
describe("Allow reentry", () =>
|
||||||
|
@ -360,11 +307,8 @@ describeCompat(
|
||||||
{
|
{
|
||||||
options: {
|
options: {
|
||||||
...testContainerConfig,
|
...testContainerConfig,
|
||||||
runtimeOptions: {
|
runtimeOptions: {},
|
||||||
enableOpReentryCheck: true,
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
featureGates: { "Fluid.ContainerRuntime.DisableOpReentryCheck": true },
|
|
||||||
name: "Enabled by options, disabled by feature gate",
|
name: "Enabled by options, disabled by feature gate",
|
||||||
},
|
},
|
||||||
].forEach((testConfig) => {
|
].forEach((testConfig) => {
|
||||||
|
|
|
@ -102,7 +102,6 @@ export function generateRuntimeOptions(
|
||||||
{ minimumBatchSizeInBytes: 500, compressionAlgorithm: CompressionAlgorithms.lz4 },
|
{ minimumBatchSizeInBytes: 500, compressionAlgorithm: CompressionAlgorithms.lz4 },
|
||||||
],
|
],
|
||||||
maxBatchSizeInBytes: [716800],
|
maxBatchSizeInBytes: [716800],
|
||||||
enableOpReentryCheck: [true],
|
|
||||||
// Compressed payloads exceeding this size will be chunked into messages of exactly this size
|
// Compressed payloads exceeding this size will be chunked into messages of exactly this size
|
||||||
chunkSizeInBytes: [204800],
|
chunkSizeInBytes: [204800],
|
||||||
enableRuntimeIdCompressor: ["on", undefined, "delayed"],
|
enableRuntimeIdCompressor: ["on", undefined, "delayed"],
|
||||||
|
|
Загрузка…
Ссылка в новой задаче