improvement(client-presence): Use audience for session client connection status (#23006)

## Description
This PR updates SystemWorkspace to use audience for session client
connection status.

We also add audience support to Mock Ephemeral Runtime w/ additional
reworking of unit tests

Presence + Audience test cases to add:
- Unknown attendee joins that is in audience (=> emits attendeeJoined)
- Unknown attendee joins that is not in audience (=> does not emit
attendeeJoined)
- Known attendee rejoins that is in audience w/ same connection
(duplicate signal) (=> does not emit attendeeJoined)
- Known attendee rejoins that is in audience w/ different connection (=>
emits attendeeJoined)
- Known attendee rejoins that is not in audience w/ same connection (=>
does not emit attendeeJoined)
- Known attendee rejoins that is not in audience w/ different connection
(=> does not emit attendeeJoined)
This commit is contained in:
WillieHabi 2024-11-14 12:49:58 -08:00 коммит произвёл GitHub
Родитель aa2718b106
Коммит 7f8f204cda
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
8 изменённых файлов: 314 добавлений и 122 удалений

Просмотреть файл

@ -168,14 +168,25 @@ function makePresenceView(
if (audience !== undefined) {
presenceConfig.presence.events.on("attendeeJoined", (attendee) => {
const name = audience.getMembers().get(attendee.getConnectionId())?.name;
const update = `client ${name === undefined ? "(unnamed)" : `named ${name}`} with id ${attendee.sessionId} joined`;
const update = `client ${name === undefined ? "(unnamed)" : `named ${name}`} 🔗 with id ${attendee.sessionId} joined`;
addLogEntry(logContentDiv, update);
});
presenceConfig.presence.events.on("attendeeDisconnected", (attendee) => {
// Filter for remote attendees
const self = audience.getMyself();
if (self && attendee !== presenceConfig.presence.getAttendee(self.currentConnection)) {
const name = audience.getMembers().get(attendee.getConnectionId())?.name;
const update = `client ${name === undefined ? "(unnamed)" : `named ${name}`} ⛓️‍💥 with id ${attendee.sessionId} left`;
addLogEntry(logContentDiv, update);
}
});
}
logDiv.append(logHeaderDiv, logContentDiv);
presenceConfig.lastRoll.events.on("updated", (update) => {
const updateText = `updated ${update.client.sessionId.slice(0, 8)}'s last rolls to ${JSON.stringify(update.value)}`;
const connected = update.client.getConnectionStatus() === "Connected" ? "🔗" : "⛓️‍💥";
const updateText = `updated ${update.client.sessionId.slice(0, 8)}'s ${connected} last rolls to ${JSON.stringify(update.value)}`;
addLogEntry(logContentDiv, updateText);
makeDiceValuesView(statesContentDiv, presenceConfig.lastRoll);

Просмотреть файл

@ -43,7 +43,7 @@ export const brandedObjectEntries = Object.entries as <K extends string, T>(
*/
export type IEphemeralRuntime = Pick<
(IContainerRuntime & IRuntimeInternal) | IFluidDataStoreRuntime,
"clientId" | "connected" | "getQuorum" | "off" | "on" | "submitSignal"
"clientId" | "connected" | "getAudience" | "getQuorum" | "off" | "on" | "submitSignal"
> &
Partial<Pick<IFluidDataStoreRuntime, "logger">>;

Просмотреть файл

@ -267,7 +267,12 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
// Direct to the appropriate Presence Workspace, if present.
const workspace = this.workspaces.get(workspaceAddress);
if (workspace) {
workspace.internal.processUpdate(received, timeModifier, remoteDatastore);
workspace.internal.processUpdate(
received,
timeModifier,
remoteDatastore,
message.clientId,
);
} else {
// All broadcast state is kept even if not currently registered, unless a value
// notes itself to be ignored.

Просмотреть файл

@ -85,6 +85,12 @@ class PresenceManager
runtime.on("connected", this.onConnect.bind(this));
runtime.on("disconnected", () => {
if (runtime.clientId !== undefined) {
this.removeClientConnectionId(runtime.clientId);
}
});
// Check if already connected at the time of construction.
// If constructed during data store load, the runtime may already be connected
// and the "connected" event will be raised during completion. With construction
@ -167,6 +173,7 @@ function setupSubComponents(
clientSessionId,
systemWorkspaceDatastore,
events,
runtime.getAudience(),
);
const datastoreManager = new PresenceDatastoreManagerImpl(
clientSessionId,

Просмотреть файл

@ -104,6 +104,7 @@ export interface PresenceStatesInternal {
received: number,
timeModifier: number,
remoteDatastore: ValueUpdateRecord,
senderConnectionId: ClientConnectionId,
): void;
}

Просмотреть файл

@ -3,6 +3,7 @@
* Licensed under the MIT License.
*/
import type { IAudience } from "@fluidframework/container-definitions";
import { assert } from "@fluidframework/core-utils/internal";
import type { ClientConnectionId } from "./baseTypes.js";
@ -60,14 +61,9 @@ class SessionClient implements ISessionClient {
return this.connectionStatus;
}
public setConnectionId(
connectionId: ClientConnectionId,
updateStatus: boolean = true,
): void {
public setConnectionId(connectionId: ClientConnectionId): void {
this.connectionId = connectionId;
if (updateStatus) {
this.connectionStatus = SessionClientStatus.Connected;
}
this.connectionStatus = SessionClientStatus.Connected;
}
public setDisconnected(): void {
@ -114,6 +110,7 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
private readonly events: IEmitter<
Pick<PresenceEvents, "attendeeJoined" | "attendeeDisconnected">
>,
private readonly audience: IAudience,
) {
this.selfAttendee = new SessionClient(clientSessionId);
this.attendees.set(clientSessionId, this.selfAttendee);
@ -137,8 +134,11 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
};
};
},
senderConnectionId: ClientConnectionId,
): void {
const postUpdateActions: (() => void)[] = [];
const audienceMembers = this.audience.getMembers();
const connectedAttendees = new Set<SessionClient>();
for (const [clientConnectionId, value] of Object.entries(
remoteDatastore.clientToSessionId,
)) {
@ -148,9 +148,26 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
clientConnectionId,
/* order */ value.rev,
);
if (isNew) {
postUpdateActions.push(() => this.events.emit("attendeeJoined", attendee));
// Check new attendee against audience to see if they're currently connected
const isAttendeeConnected = audienceMembers.has(clientConnectionId);
if (isAttendeeConnected) {
connectedAttendees.add(attendee);
if (attendee.getConnectionStatus() === SessionClientStatus.Disconnected) {
attendee.setConnectionId(clientConnectionId);
}
if (isNew) {
// If the attendee is both new and in audience (i.e. currently connected), emit an attendeeJoined event.
postUpdateActions.push(() => this.events.emit("attendeeJoined", attendee));
}
}
// If the attendee is not in the audience, they are considered disconnected.
if (!connectedAttendees.has(attendee)) {
attendee.setDisconnected();
}
const knownSessionId: InternalTypes.ValueRequiredState<ClientSessionId> | undefined =
this.datastore.clientToSessionId[clientConnectionId];
if (knownSessionId === undefined) {
@ -159,6 +176,7 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
assert(knownSessionId.value === value.value, 0xa5a /* Mismatched SessionId */);
}
}
// TODO: reorganize processUpdate and caller to process actions after all updates are processed.
for (const action of postUpdateActions) {
action();
@ -185,7 +203,8 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
// If the last known connectionID is different from the connection ID being removed, the attendee has reconnected,
// therefore we should not change the attendee connection status or emit a disconnect event.
const attendeeReconnected = attendee.getConnectionId() !== clientConnectionId;
if (!attendeeReconnected) {
const connected = attendee.getConnectionStatus() === SessionClientStatus.Connected;
if (!attendeeReconnected && connected) {
attendee.setDisconnected();
this.events.emit("attendeeDisconnected", attendee);
}
@ -224,9 +243,7 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
): { attendee: SessionClient; isNew: boolean } {
let attendee = this.attendees.get(clientSessionId);
let isNew = false;
// TODO #22616: Check for a current connection to determine best status.
// For now, always leave existing state as was last determined and
// assume new client is connected.
if (attendee === undefined) {
// New attendee. Create SessionClient and add session ID based
// entry to map.
@ -237,10 +254,12 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
// The given association is newer than the one we have.
// Update the order and current connection ID.
attendee.order = order;
attendee.setConnectionId(clientConnectionId, /* updateStatus */ false);
attendee.setConnectionId(clientConnectionId);
isNew = true;
}
// Always update entry for the connection ID. (Okay if already set.)
this.attendees.set(clientConnectionId, attendee);
return { attendee, isNew };
}
}
@ -254,6 +273,7 @@ export function createSystemWorkspace(
clientSessionId: ClientSessionId,
datastore: SystemWorkspaceDatastore,
events: IEmitter<Pick<PresenceEvents, "attendeeJoined">>,
audience: IAudience,
): {
workspace: SystemWorkspace;
statesEntry: {
@ -261,7 +281,7 @@ export function createSystemWorkspace(
public: PresenceStates<PresenceStatesSchema>;
};
} {
const workspace = new SystemWorkspaceImpl(clientSessionId, datastore, events);
const workspace = new SystemWorkspaceImpl(clientSessionId, datastore, events, audience);
return {
workspace,
statesEntry: {

Просмотреть файл

@ -5,13 +5,48 @@
import { strict as assert } from "node:assert";
import type { IAudience } from "@fluidframework/container-definitions";
import type { ITelemetryBaseLogger } from "@fluidframework/core-interfaces";
import type { IQuorumClients, ISequencedClient } from "@fluidframework/driver-definitions";
import { MockQuorumClients } from "@fluidframework/test-runtime-utils/internal";
import type {
IClient,
IQuorumClients,
ISequencedClient,
} from "@fluidframework/driver-definitions";
import { MockAudience, MockQuorumClients } from "@fluidframework/test-runtime-utils/internal";
import type { ClientConnectionId } from "../baseTypes.js";
import type { IEphemeralRuntime } from "../internalTypes.js";
/**
* Creates a mock {@link @fluidframework/container-definitions#IAudience} for testing.
*/
export function makeMockAudience(clientIds: string[]): IAudience {
const clients = new Map<string, IClient>();
for (const [index, clientId] of clientIds.entries()) {
// eslint-disable-next-line unicorn/prefer-code-point
const stringId = String.fromCharCode(index + 65);
const name = stringId.repeat(10);
const userId = `${name}@microsoft.com`;
const user = {
id: userId,
};
clients.set(clientId, {
mode: index % 2 === 0 ? "write" : "read",
details: { capabilities: { interactive: true } },
permission: [],
user,
scopes: [],
});
}
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const audience = new MockAudience();
for (const [clientId, client] of clients.entries()) {
audience.addMember(clientId, client);
}
return audience as IAudience;
}
/**
* Creates a mock {@link @fluidframework/protocol-definitions#IQuorumClients} for testing.
*/
@ -48,11 +83,14 @@ export function makeMockQuorum(clientIds: string[]): IQuorumClients {
export class MockEphemeralRuntime implements IEphemeralRuntime {
public logger?: ITelemetryBaseLogger;
public readonly quorum: IQuorumClients;
public readonly audience: IAudience;
public readonly listeners: {
connected: ((clientId: ClientConnectionId) => void)[];
disconnected: (() => void)[];
} = {
connected: [],
disconnected: [],
};
private isSupportedEvent(event: string): event is keyof typeof this.listeners {
return event in this.listeners;
@ -65,14 +103,12 @@ export class MockEphemeralRuntime implements IEphemeralRuntime {
if (logger !== undefined) {
this.logger = logger;
}
const quorum = makeMockQuorum([
"client0",
"client1",
"client2",
"client3",
"client4",
"client5",
]);
const clients = ["client0", "client1", "client2", "client3", "client4", "client5"];
const quorum = makeMockQuorum(clients);
const audience: IAudience = makeMockAudience(clients);
this.audience = audience;
this.getAudience = () => audience;
this.quorum = quorum;
this.getQuorum = () => quorum;
this.on = (
@ -126,6 +162,8 @@ export class MockEphemeralRuntime implements IEphemeralRuntime {
throw new Error("IEphemeralRuntime.off method not implemented.");
};
public getAudience: () => ReturnType<IEphemeralRuntime["getAudience"]>;
public getQuorum: () => ReturnType<IEphemeralRuntime["getQuorum"]>;
public submitSignal: IEphemeralRuntime["submitSignal"] = (

Просмотреть файл

@ -92,19 +92,10 @@ describe("Presence", () => {
describe("attendee", () => {
const newAttendeeSessionId = "sessionId-4";
const initialAttendeeConnectionId = "client4";
let newAttendee: ISessionClient | undefined;
let initialAttendeeSignal: ReturnType<typeof generateBasicClientJoin>;
beforeEach(() => {
runtime.submitSignal = () => {};
newAttendee = undefined;
afterCleanUp.push(
presence.events.on("attendeeJoined", (attendee) => {
assert(newAttendee === undefined, "Only one attendee should be announced");
newAttendee = attendee;
}),
);
initialAttendeeSignal = generateBasicClientJoin(clock.now - 50, {
averageLatency: 50,
clientSessionId: newAttendeeSessionId,
@ -113,114 +104,233 @@ describe("Presence", () => {
});
});
it("is announced via `attendeeJoined` when new", () => {
// Act - simulate join message from client
presence.processSignal("", initialAttendeeSignal, false);
it("is not announced via `attendeeDisconnected` when unknown connection is removed", () => {
// Setup
presence.events.on("attendeeDisconnected", () => {
assert.fail("ateendeeDisconnected should not be emitted for unknown connection.");
});
// Verify
assert(newAttendee !== undefined, "No attendee was announced");
assert.equal(
newAttendee.sessionId,
newAttendeeSessionId,
"Attendee has wrong session id",
);
assert.equal(
newAttendee.getConnectionId(),
initialAttendeeConnectionId,
"Attendee has wrong client connection id",
);
// Act & Verify - remove unknown connection id
presence.removeClientConnectionId("unknownConnectionId");
});
describe("disconnects", () => {
let disconnectedAttendee: ISessionClient | undefined;
beforeEach(() => {
disconnectedAttendee = undefined;
describe("that is joining", () => {
it('is announced via `attendeeJoined` with status "Connected" when new', () => {
// Setup
let newAttendee: ISessionClient | undefined;
afterCleanUp.push(
presence.events.on("attendeeDisconnected", (attendee) => {
assert(
disconnectedAttendee === undefined,
"Only one attendee should be disconnected",
);
disconnectedAttendee = attendee;
presence.events.on("attendeeJoined", (attendee) => {
assert(newAttendee === undefined, "Only one attendee should be announced");
newAttendee = attendee;
}),
);
// Act - simulate join message from client
presence.processSignal("", initialAttendeeSignal, false);
// Verify
assert(newAttendee !== undefined, "No attendee was announced");
assert.equal(
newAttendee.sessionId,
newAttendeeSessionId,
"Attendee has wrong session id",
);
assert.equal(
newAttendee.getConnectionId(),
initialAttendeeConnectionId,
"Attendee has wrong client connection id",
);
assert.equal(
newAttendee.getConnectionStatus(),
SessionClientStatus.Connected,
"Attendee connection status is not Connected",
);
});
});
describe("that is already known", () => {
let knownAttendee: ISessionClient | undefined;
beforeEach(() => {
afterCleanUp.push(
presence.events.on("attendeeJoined", (attendee) => {
knownAttendee = attendee;
}),
);
// Setup - simulate join message from client
presence.processSignal("", initialAttendeeSignal, false);
// Act - remove client connection id
presence.removeClientConnectionId(initialAttendeeConnectionId);
});
it("is announced via `attendeeDisconnected` when audience member leaves", () => {
// Verify
assert(
disconnectedAttendee !== undefined,
"No attendee was disconnected in beforeEach",
);
assert.equal(
disconnectedAttendee.sessionId,
newAttendeeSessionId,
"Disconnected attendee has wrong session id",
);
assert.equal(
disconnectedAttendee.getConnectionId(),
initialAttendeeConnectionId,
"Disconnected attendee has wrong client connection id",
);
});
it("changes the session client status to `Disconnected`", () => {
// Verify
assert(
disconnectedAttendee !== undefined,
"No attendee was disconnected in beforeEach",
);
assert.equal(
disconnectedAttendee.getConnectionStatus(),
SessionClientStatus.Disconnected,
"Disconnected attendee has wrong status",
);
});
});
describe("already known", () => {
beforeEach(() => {
// Setup - simulate join message from client
presence.processSignal("", initialAttendeeSignal, false);
assert(newAttendee !== undefined, "No attendee was announced in setup");
assert(knownAttendee !== undefined, "No attendee was announced in setup");
});
for (const [desc, id] of [
["connection id", initialAttendeeConnectionId] as const,
["session id", newAttendeeSessionId] as const,
]) {
it(`is available from \`getAttendee\` by ${desc}`, () => {
// Act
const attendee = presence.getAttendee(id);
// Verify
assert.equal(attendee, newAttendee, "getAttendee returned wrong attendee");
describe(`is available from \`getAttendee\` by ${desc}`, () => {
it('with status "Connected"', () => {
// Act
const attendee = presence.getAttendee(id);
// Verify
assert.equal(attendee, knownAttendee, "getAttendee returned wrong attendee");
assert.equal(
attendee.getConnectionStatus(),
SessionClientStatus.Connected,
"getAttendee returned attendee with wrong status",
);
});
it('with status "Disconnected" after disconnect', () => {
// Act - remove client connection id
presence.removeClientConnectionId(initialAttendeeConnectionId);
const attendee = presence.getAttendee(id);
// Verify
assert.equal(attendee, knownAttendee, "getAttendee returned wrong attendee");
assert.equal(
attendee.getConnectionStatus(),
SessionClientStatus.Disconnected,
"getAttendee returned attendee with wrong status",
);
});
});
}
it("is available from `getAttendees`", () => {
// Setup
assert(newAttendee !== undefined, "No attendee was set in beforeEach");
describe("is available from `getAttendees`", () => {
it('with status "Connected"', () => {
// Setup
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");
// Act
const attendees = presence.getAttendees();
assert(attendees.has(newAttendee), "getAttendees set does not contain attendee");
// Act
const attendees = presence.getAttendees();
assert(
attendees.has(knownAttendee),
"getAttendees set does not contain attendee",
);
assert.equal(
knownAttendee.getConnectionStatus(),
SessionClientStatus.Connected,
"getAttendees set contains attendee with wrong status",
);
});
it('with status "Disconnected"', () => {
// Setup
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");
// Act - remove client connection id
presence.removeClientConnectionId(initialAttendeeConnectionId);
// Verify
const attendees = presence.getAttendees();
assert(
attendees.has(knownAttendee),
"getAttendees set does not contain attendee",
);
assert.equal(
knownAttendee.getConnectionStatus(),
SessionClientStatus.Disconnected,
"getAttendees set contains attendee with wrong status",
);
});
});
it('is not announced via `attendeeJoined` when already "Connected"', () => {
// Setup
afterCleanUp.push(
presence.events.on("attendeeJoined", () => {
assert.fail("No attendee should be announced in beforeEach");
}),
);
// Act - simulate join message from client
presence.processSignal("", initialAttendeeSignal, false);
});
describe("and has their connection removed", () => {
let disconnectedAttendee: ISessionClient | undefined;
beforeEach(() => {
disconnectedAttendee = undefined;
afterCleanUp.push(
presence.events.on("attendeeDisconnected", (attendee) => {
assert(
disconnectedAttendee === undefined,
"Only one attendee should be disconnected",
);
disconnectedAttendee = attendee;
}),
);
// Act - remove client connection id
presence.removeClientConnectionId(initialAttendeeConnectionId);
});
it("is announced via `attendeeDisconnected`", () => {
//
assert(knownAttendee !== undefined, "No attendee was set in beforeEach");
assert(
disconnectedAttendee !== undefined,
"No attendee was disconnected in removeClientConnectionId",
);
assert.equal(
disconnectedAttendee.sessionId,
knownAttendee.sessionId,
"Disconnected attendee has wrong session id",
);
assert.equal(
disconnectedAttendee.getConnectionId(),
initialAttendeeConnectionId,
"Disconnected attendee has wrong client connection id",
);
assert.equal(
disconnectedAttendee.getConnectionStatus(),
SessionClientStatus.Disconnected,
"Disconnected attendee has wrong status",
);
});
it('is not announced via `attendeeDisconnected` when already "Disconnected"', () => {
assert(
disconnectedAttendee !== undefined,
"No attendee was disconnected in removeClientConnectionId",
);
// Act & Verify - remove client connection id again
presence.removeClientConnectionId(initialAttendeeConnectionId);
});
});
});
describe("that is rejoining", () => {
let priorAttendee: ISessionClient | undefined;
beforeEach(() => {
afterCleanUp.push(
presence.events.on("attendeeJoined", (attendee) => {
priorAttendee = attendee;
}),
);
// Setup - simulate join message from client
presence.processSignal("", initialAttendeeSignal, false);
assert(priorAttendee !== undefined, "No attendee was announced in setup");
});
it("is NOT announced when rejoined with same connection (duplicate signal)", () => {
afterCleanUp.push(
presence.events.on("attendeeJoined", (attendee) => {
assert.fail(
"Attendee should not be announced when rejoining with same connection",
);
}),
);
clock.tick(10);
// Act & Verify - simulate duplicate join message from client
presence.processSignal("", initialAttendeeSignal, false);
});
it("is NOT announced when rejoined with different connection and current information is updated", () => {
it("is announced when rejoined with different connection and current information is updated", () => {
// Setup
assert(newAttendee !== undefined, "No attendee was set in beforeEach");
assert(priorAttendee !== undefined, "No attendee was set in beforeEach");
const updatedClientConnectionId = "client5";
clock.tick(20);
@ -245,27 +355,27 @@ describe("Presence", () => {
// Verify
// Session id is unchanged
assert.equal(
newAttendee.sessionId,
priorAttendee.sessionId,
newAttendeeSessionId,
"Attendee has wrong session id",
);
// Current connection id is updated
assert(
newAttendee.getConnectionId() === updatedClientConnectionId,
priorAttendee.getConnectionId() === updatedClientConnectionId,
"Attendee does not have updated client connection id",
);
// Attendee is available via new connection id
const attendeeViaUpdatedId = presence.getAttendee(updatedClientConnectionId);
assert.equal(
attendeeViaUpdatedId,
newAttendee,
priorAttendee,
"getAttendee returned wrong attendee for updated connection id",
);
// Attendee is available via old connection id
const attendeeViaOriginalId = presence.getAttendee(initialAttendeeConnectionId);
assert.equal(
attendeeViaOriginalId,
newAttendee,
priorAttendee,
"getAttendee returned wrong attendee for original connection id",
);
});