Simplify/Refactor BranchCommitEnricher (#23151)

## Description

This tightens the state machine space for `BranchCommitEnricher` and
makes various simplifications to the API and code.

* The API surface of `BranchCommitEnricher` is reduced to functions
that:
  1. Process branch changes
  2. Get enriched commits
  3. Acknowledge transaction state transitions
  
Item (i) used to be made of a few different methods whose invocations
needed to be coordinated by `SharedTreeCore`. Now,
`BranchCommitEnricher` manages this internally. Previously, it needed to
know whether or not the commit being submitted was the commit of the
outermost transaction. This could be problematic for future refactors of
SharedTreeCore, so the enricher now derives that information from its
internals (since it already keeps track of ongoing transactions via the
`TransactionEnricher`.
* The map of processed commits stored in the enricher is now a WeakMap,
meaning that the enricher does not need to worry about accidentally
holding on to commits forever. The logic that "purges" map entries
between changes has been removed since the entries will now be dropped
automatically as the commits become unreferenced.
* TransactionEnricher no longer allows getting the enriched change in
the middle of a transaction.
* We now properly throw an error if a the SharedTree is attached in the
middle of a transaction - we do not and cannot currently support this
scenario.
* Various renames, conditional refactors, inlining, etc. to shorten
lines, improve code flow, simplify conditional logic, etc. and
additional documentation.
This commit is contained in:
Noah Encke 2024-11-19 19:35:47 -08:00 коммит произвёл GitHub
Родитель 421ba40375
Коммит 948fc52e8d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
5 изменённых файлов: 161 добавлений и 171 удалений

Просмотреть файл

@ -4,106 +4,111 @@
*/
import { assert } from "@fluidframework/core-utils/internal";
import { type ChangeRebaser, type GraphCommit, replaceChange } from "../core/index.js";
import {
type ChangeRebaser,
type GraphCommit,
replaceChange,
type RevisionTag,
} from "../core/index.js";
import type { ChangeEnricherReadonlyCheckout } from "./changeEnricher.js";
import { TransactionEnricher } from "./transactionEnricher.js";
import { getChangeReplaceType, type SharedTreeBranchChange } from "./branch.js";
/**
* Utility for enriching commits from a {@link Branch} before these commits are applied and submitted.
*/
export class BranchCommitEnricher<TChange> {
private readonly transactionEnricher: TransactionEnricher<TChange>;
private readonly enricher: ChangeEnricherReadonlyCheckout<TChange>;
readonly #transactionEnricher: TransactionEnricher<TChange>;
readonly #enricher: ChangeEnricherReadonlyCheckout<TChange>;
/**
* Maps each local commit to the corresponding enriched commit.
* Entries are added when the commits are prepared (before being applied and submitted).
* Entries are removed when the commits are retrieved for submission (after being applied).
* It's possible an entry will linger in the map indefinitely if it is never retrieved for submission.
* This would happen if applying a commit were to fail and the commit were not retrieved/purged after the failure.
* @remarks
* Entries are added when the commits are {@link BranchCommitEnricher.processChange | processed during a change}.
* Each entry is removed when it is {@link BranchCommitEnricher.enrich | retrieved}.
* In the event that an entry is not explicitly removed, it will eventually be {@link WeakMap | dropped from memory} along with the associated commit.
*/
private readonly preparedCommits: Map<GraphCommit<TChange>, GraphCommit<TChange>> =
new Map();
readonly #preparedCommits: WeakMap<GraphCommit<TChange>, GraphCommit<TChange>> = new Map();
/**
* If defined, a top-level transaction has been {@link BranchCommitEnricher.commitTransaction | committed} since the last {@link BranchCommitEnricher.processChange | change has been processed}.
* Calling this function will compute the composition of that transaction's commits.
* @remarks This function will be reset to undefined after each {@link BranchCommitEnricher.processChange | change is processed}.
*/
#getOuterTransactionChange?: (revision: RevisionTag) => TChange;
public constructor(
rebaser: ChangeRebaser<TChange>,
enricher: ChangeEnricherReadonlyCheckout<TChange>,
) {
this.enricher = enricher;
this.transactionEnricher = new TransactionEnricher(rebaser, this.enricher);
this.#enricher = enricher;
this.#transactionEnricher = new TransactionEnricher(rebaser, this.#enricher);
}
/**
* @returns The number of commits that have been prepared but not yet retrieved.
* Process the given change, preparing new commits for {@link BranchCommitEnricher.enrich | enrichment}.
* @param change - The change to process.
* @param isAttached - Whether or not the SharedTree is attached to the service.
*/
public get preparedCommitsCount(): number {
return this.preparedCommits.size;
}
public startNewTransaction(): void {
this.transactionEnricher.startNewTransaction();
}
public commitCurrentTransaction(): void {
this.transactionEnricher.commitCurrentTransaction();
}
public abortCurrentTransaction(): void {
this.transactionEnricher.abortCurrentTransaction();
}
/**
* Adds a commit to the enricher.
* @param commit - A commit that is part of a transaction.
*/
public ingestTransactionCommit(commit: GraphCommit<TChange>): void {
// We do not submit ops for changes that are part of a transaction.
// But we need to enrich the commits that will be sent if the transaction is committed.
this.transactionEnricher.addTransactionStep(commit);
}
/**
* Prepares an enriched commit for later submission (see {@link BranchCommitEnricher.getPreparedCommit}).
* @param commit - The commit to prepare an enriched version of.
* @param concludesOuterTransaction - Whether the commit concludes an outer transaction.
*
* Each call to this method must be followed by a call to {@link BranchCommitEnricher.getPreparedCommit} or
* {@link BranchCommitEnricher.purgePreparedCommits}. Failing to do so will result in a memory leak.
*/
public prepareCommit(
commit: GraphCommit<TChange>,
concludesOuterTransaction: boolean,
): void {
let enrichedChange: TChange;
if (concludesOuterTransaction) {
assert(
this.transactionEnricher !== undefined,
0x97f /* Unexpected transaction commit without transaction steps */,
);
enrichedChange = this.transactionEnricher.getComposedChange(commit.revision);
public processChange(change: SharedTreeBranchChange<TChange>): void {
if (this.#transactionEnricher.isTransacting()) {
if (change.type === "append") {
for (const commit of change.newCommits) {
// We do not submit ops for changes that are part of a transaction.
// But we need to enrich the commits that will be sent if the transaction is committed.
this.#transactionEnricher.addTransactionStep(commit);
}
}
} else {
enrichedChange = this.enricher.updateChangeEnrichments(commit.change, commit.revision);
if (
change.type === "append" ||
(change.type === "replace" && getChangeReplaceType(change) === "transactionCommit")
) {
for (const newCommit of change.newCommits) {
const newChange =
this.#getOuterTransactionChange?.(newCommit.revision) ??
this.#enricher.updateChangeEnrichments(newCommit.change, newCommit.revision);
this.#preparedCommits.set(newCommit, replaceChange(newCommit, newChange));
}
}
}
this.preparedCommits.set(commit, replaceChange(commit, enrichedChange));
this.#getOuterTransactionChange = undefined;
}
/**
* @param commit - A commit previously passed to {@link BranchCommitEnricher.prepareCommit}.
* @returns The enriched commit corresponds to the given commit.
* Retrieves the enriched version of the given commit.
* @param commit - A commit {@link BranchCommitEnricher.processChange | processed during the most recent change}.
* @remarks A commit can only be enriched once - subsequent calls to this method with the same commit will throw an error.
*/
public getPreparedCommit(commit: GraphCommit<TChange>): GraphCommit<TChange> {
const prepared = this.preparedCommits.get(commit);
public enrich(commit: GraphCommit<TChange>): GraphCommit<TChange> {
const prepared = this.#preparedCommits.get(commit);
assert(prepared !== undefined, 0x980 /* Unknown commit */);
this.preparedCommits.delete(commit);
this.#preparedCommits.delete(commit);
return prepared;
}
/**
* Purges all commits that have been prepared but not been retrieved.
* This should be called to avoid memory leaks if the prepared commits are no longer needed.
*
* Does not affect ingested transaction commits.
* Notify the enricher that a new transaction has started.
* @remarks This may be called multiple times without calling {@link BranchCommitEnricher.commitTransaction | commitTransaction}, producing "nested transactions".
*/
public purgePreparedCommits(): void {
this.preparedCommits.clear();
public startTransaction(): void {
this.#transactionEnricher.startTransaction();
}
/**
* Commit the current transaction.
* @remarks This should be called _before_ the corresponding transaction commit change is {@link BranchCommitEnricher.processChange | processed}.
*/
public commitTransaction(): void {
this.#getOuterTransactionChange = this.#transactionEnricher.commitTransaction();
}
/**
* Notify the enricher that the current transaction has been aborted.
* @remarks This will throw an error if there is no ongoing transaction.
*/
public abortTransaction(): void {
this.#transactionEnricher.abortTransaction();
}
}

Просмотреть файл

@ -3,7 +3,7 @@
* Licensed under the MIT License.
*/
import { assert, oob } from "@fluidframework/core-utils/internal";
import { assert } from "@fluidframework/core-utils/internal";
import type {
IChannelAttributes,
IFluidDataStoreRuntime,
@ -56,7 +56,7 @@ import { type ChangeEnricherReadonlyCheckout, NoOpChangeEnricher } from "./chang
import type { ResubmitMachine } from "./resubmitMachine.js";
import { DefaultResubmitMachine } from "./defaultResubmitMachine.js";
import { BranchCommitEnricher } from "./branchCommitEnricher.js";
import { createChildLogger } from "@fluidframework/telemetry-utils/internal";
import { createChildLogger, UsageError } from "@fluidframework/telemetry-utils/internal";
// TODO: Organize this to be adjacent to persisted types.
const summarizablesTreeKey = "indexes";
@ -184,58 +184,43 @@ export class SharedTreeCore<TEditor extends ChangeFamilyEditor, TChange>
rebaseLogger,
);
this.editManager.localBranch.events.on("transactionStarted", () => {
this.commitEnricher.startNewTransaction();
if (this.detachedRevision === undefined) {
// It is currently forbidden to attach during a transaction, so transaction state changes can be ignored until after attaching.
this.commitEnricher.startTransaction();
}
});
this.editManager.localBranch.events.on("transactionAborted", () => {
this.commitEnricher.abortCurrentTransaction();
if (this.detachedRevision === undefined) {
// It is currently forbidden to attach during a transaction, so transaction state changes can be ignored until after attaching.
this.commitEnricher.abortTransaction();
}
});
this.editManager.localBranch.events.on("transactionCommitted", () => {
this.commitEnricher.commitCurrentTransaction();
if (this.detachedRevision === undefined) {
// It is currently forbidden to attach during a transaction, so transaction state changes can be ignored until after attaching.
this.commitEnricher.commitTransaction();
}
});
this.editManager.localBranch.events.on("beforeChange", (change) => {
// Ensure that any previously prepared commits that have not been sent are purged.
this.commitEnricher.purgePreparedCommits();
if (this.detachedRevision !== undefined) {
// Edits submitted before the first attach do not need enrichment because they will not be applied by peers.
} else if (change.type === "append") {
if (this.getLocalBranch().isTransacting()) {
for (const newCommit of change.newCommits) {
this.commitEnricher.ingestTransactionCommit(newCommit);
}
} else {
for (const newCommit of change.newCommits) {
this.commitEnricher.prepareCommit(newCommit, false);
}
}
} else if (
change.type === "replace" &&
getChangeReplaceType(change) === "transactionCommit" &&
!this.getLocalBranch().isTransacting()
) {
assert(
change.newCommits.length === 1,
0x983 /* Unexpected number of commits when committing transaction */,
);
this.commitEnricher.prepareCommit(change.newCommits[0] ?? oob(), true);
if (this.detachedRevision === undefined) {
// Commit enrichment is only necessary for changes that will be submitted as ops, and changes issued while detached are not submitted.
this.commitEnricher.processChange(change);
}
});
this.editManager.localBranch.events.on("afterChange", (change) => {
if (this.getLocalBranch().isTransacting()) {
// We do not submit ops for changes that are part of a transaction.
return;
}
if (
change.type === "append" ||
(change.type === "replace" && getChangeReplaceType(change) === "transactionCommit")
) {
if (this.detachedRevision !== undefined) {
for (const newCommit of change.newCommits) {
this.submitCommit(newCommit, this.schemaAndPolicy);
}
} else {
for (const newCommit of change.newCommits) {
const prepared = this.commitEnricher.getPreparedCommit(newCommit);
this.submitCommit(prepared, this.schemaAndPolicy);
// We do not submit ops for changes that are part of a transaction.
if (!this.getLocalBranch().isTransacting()) {
if (
change.type === "append" ||
(change.type === "replace" && getChangeReplaceType(change) === "transactionCommit")
) {
for (const commit of change.newCommits) {
this.submitCommit(
this.detachedRevision !== undefined
? commit
: this.commitEnricher.enrich(commit),
this.schemaAndPolicy,
);
}
}
}
@ -431,6 +416,13 @@ export class SharedTreeCore<TEditor extends ChangeFamilyEditor, TChange>
protected onDisconnect(): void {}
protected override didAttach(): void {
if (this.getLocalBranch().isTransacting()) {
// Attaching during a transaction is not currently supported.
// At least part of of the system is known to not handle this case correctly - commit enrichment - and there may be others.
throw new UsageError(
"Cannot attach while a transaction is in progress. Commit or abort the transaction before attaching.",
);
}
if (this.detachedRevision !== undefined) {
this.detachedRevision = undefined;
}

Просмотреть файл

@ -11,54 +11,60 @@ import type { ChangeEnricherReadonlyCheckout } from "./changeEnricher.js";
* Utility for producing an enriched commit out of multiple transaction steps
*/
export class TransactionEnricher<TChange> {
private readonly rebaser: ChangeRebaser<TChange>;
private readonly enricher: ChangeEnricherReadonlyCheckout<TChange>;
private readonly transactionCommits: GraphCommit<TChange>[] = [];
readonly #rebaser: ChangeRebaser<TChange>;
readonly #enricher: ChangeEnricherReadonlyCheckout<TChange>;
#transactionCommits: GraphCommit<TChange>[] = [];
/**
* The number of commits before the start of each active transaction scope.
* For a stack of `n` transaction scopes, the array will contain `n` integers,
* where the integer at index `i` is the number of commits made before the start of the `i`th transaction scope
* (therefore, the first element in the array, if present, is always 0)
*/
private readonly transactionScopesStart: number[] = [];
readonly #transactionScopesStart: number[] = [];
public constructor(
rebaser: ChangeRebaser<TChange>,
enricher: ChangeEnricherReadonlyCheckout<TChange>,
) {
this.rebaser = rebaser;
this.enricher = enricher;
this.#rebaser = rebaser;
this.#enricher = enricher;
}
public startNewTransaction(): void {
this.transactionScopesStart.push(this.transactionCommits.length);
public isTransacting(): boolean {
return this.#transactionScopesStart.length !== 0;
}
public commitCurrentTransaction(): void {
const commitsCommitted = this.transactionScopesStart.pop();
public startTransaction(): void {
this.#transactionScopesStart.push(this.#transactionCommits.length);
}
/**
* Commits the current transaction.
* @returns If and only if the closed transaction was the outermost transaction, returns a function which can be used to compute the composed change for that transaction's commits.
*/
public commitTransaction(): ((revision: RevisionTag) => TChange) | undefined {
const commitsCommitted = this.#transactionScopesStart.pop();
assert(commitsCommitted !== undefined, 0x985 /* No transaction to commit */);
if (this.#transactionScopesStart.length === 0) {
const transactionCommits = this.#transactionCommits;
this.#transactionCommits = [];
return (revision: RevisionTag) =>
this.#rebaser.changeRevision(this.#rebaser.compose(transactionCommits), revision);
}
}
public abortCurrentTransaction(): void {
const scopeStart = this.transactionScopesStart.pop();
public abortTransaction(): void {
const scopeStart = this.#transactionScopesStart.pop();
assert(scopeStart !== undefined, 0x986 /* No transaction to abort */);
this.transactionCommits.length = scopeStart;
this.#transactionCommits.length = scopeStart;
}
public addTransactionStep(commit: GraphCommit<TChange>): void {
assert(
this.transactionScopesStart.length !== 0,
this.#transactionScopesStart.length !== 0,
0x987 /* No transaction to add a step to */,
);
const change = this.enricher.updateChangeEnrichments(commit.change, commit.revision);
this.transactionCommits.push({ ...commit, change });
}
public getComposedChange(revision: RevisionTag): TChange {
assert(this.transactionScopesStart.length === 0, 0x988 /* Transaction not committed */);
const squashed = this.rebaser.compose(this.transactionCommits);
const tagged = this.rebaser.changeRevision(squashed, revision);
this.transactionCommits.length = 0;
return tagged;
const change = this.#enricher.updateChangeEnrichments(commit.change, commit.revision);
this.#transactionCommits.push({ ...commit, change });
}
}

Просмотреть файл

@ -26,6 +26,7 @@ import {
MockFluidDataStoreRuntime,
MockSharedObjectServices,
MockStorage,
validateAssertionError,
} from "@fluidframework/test-runtime-utils/internal";
import {
@ -617,36 +618,26 @@ describe("SharedTreeCore", () => {
containerRuntimeFactory.processAllMessages();
assert.equal(machine.sequencingLog.length, 2);
});
});
it("does not leak enriched commits that are not sent", () => {
const enricher = new MockChangeEnricher<ModularChangeset>();
const machine = new MockResubmitMachine();
const tree = createTree([], machine, enricher);
const containerRuntimeFactory = new MockContainerRuntimeFactory();
const dataStoreRuntime1 = new MockFluidDataStoreRuntime({
idCompressor: createIdCompressor(),
});
containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1);
tree.connect({
deltaConnection: dataStoreRuntime1.createDeltaConnection(),
objectStorage: new MockStorage(),
});
assert.equal(tree.preparedCommitsCount, 0);
// Temporarily make commit application fail
const disableFailure = tree.getLocalBranch().events.on("beforeChange", () => {
throw new Error("Invalid commit");
});
assert.throws(() => changeTree(tree));
disableFailure();
// The invalid commit has been prepared but not sent
assert.equal(tree.preparedCommitsCount, 1);
// Making a valid change should purge the invalid commit
changeTree(tree);
assert.equal(tree.preparedCommitsCount, 0);
it("throws an error if attaching during a transaction", () => {
const tree = createTree([]);
const containerRuntimeFactory = new MockContainerRuntimeFactoryForReconnection();
const dataStoreRuntime1 = new MockFluidDataStoreRuntime({
idCompressor: createIdCompressor(),
});
containerRuntimeFactory.createContainerRuntime(dataStoreRuntime1);
tree.getLocalBranch().startTransaction();
assert.throws(
() => {
tree.connect({
deltaConnection: dataStoreRuntime1.createDeltaConnection(),
objectStorage: new MockStorage(),
});
},
(e: Error) =>
validateAssertionError(e, /Cannot attach while a transaction is in progress/),
);
});
function isSummaryTree(summaryObject: SummaryObject): summaryObject is ISummaryTree {

Просмотреть файл

@ -86,8 +86,4 @@ export class TestSharedTreeCore extends SharedTreeCore<DefaultEditBuilder, Defau
public override getLocalBranch(): SharedTreeBranch<DefaultEditBuilder, DefaultChangeset> {
return super.getLocalBranch();
}
public get preparedCommitsCount(): number {
return this.commitEnricher.preparedCommitsCount;
}
}