feat(client-presence): System Workspace (#22670)
1. Add infrastructure for custom System Workspace to handle internal states including ClientConnectionId to ClientSessionId. 2. Move audience support to System Workspace 3. Add `attendeeJoined` implementation 4. Add test coverage for new attendee's joining and consistency of lookup. New test cases: ``` PresenceManager ✔ throws when unknown attendee is requested via `getAttendee` when connected attendee ✔ is announced via `attendeeJoined` when new already known ✔ is available from `getAttendee` by connection id ✔ is available from `getAttendee` by session id ✔ is available from `getAttendees` ✔ is NOT announced when rejoined with same connection (duplicate signal) ✔ is NOT announced when rejoined with different connection and current information is updated ``` 5. Update general update protocol to always include client connection id -> current session id entry to ensure all updates are always working with known (already registered) session id even if a prior join or broadcast were missed.
This commit is contained in:
Родитель
efa8231107
Коммит
f57799c060
|
@ -5,10 +5,9 @@
|
|||
|
||||
import type { IContainerRuntime } from "@fluidframework/container-runtime-definitions/internal";
|
||||
import type { IFluidDataStoreRuntime } from "@fluidframework/datastore-definitions/internal";
|
||||
import type { MonitoringContext } from "@fluidframework/telemetry-utils/internal";
|
||||
|
||||
import type { InternalTypes } from "./exposedInternalTypes.js";
|
||||
import type { ClientSessionId, IPresence, ISessionClient } from "./presence.js";
|
||||
import type { ClientSessionId, ISessionClient } from "./presence.js";
|
||||
|
||||
import type { IRuntimeInternal } from "@fluid-experimental/presence/internal/container-definitions/internal";
|
||||
|
||||
|
@ -48,15 +47,6 @@ export type IEphemeralRuntime = Pick<
|
|||
> &
|
||||
Partial<Pick<IFluidDataStoreRuntime, "logger">>;
|
||||
|
||||
/**
|
||||
* Collection of utilities provided by PresenceManager that are used by presence sub-components.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
export type PresenceManagerInternal = Pick<IPresence, "getAttendee"> & {
|
||||
readonly mc: MonitoringContext | undefined;
|
||||
};
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
|
|
|
@ -5,18 +5,23 @@
|
|||
|
||||
import { assert } from "@fluidframework/core-utils/internal";
|
||||
import type { IInboundSignalMessage } from "@fluidframework/runtime-definitions/internal";
|
||||
import type { ITelemetryLoggerExt } from "@fluidframework/telemetry-utils/internal";
|
||||
|
||||
import type { ClientConnectionId } from "./baseTypes.js";
|
||||
import type { InternalTypes } from "./exposedInternalTypes.js";
|
||||
import type { IEphemeralRuntime, PresenceManagerInternal } from "./internalTypes.js";
|
||||
import type { ClientSessionId } from "./presence.js";
|
||||
import type { IEphemeralRuntime } from "./internalTypes.js";
|
||||
import type { ClientSessionId, ISessionClient } from "./presence.js";
|
||||
import type {
|
||||
ClientUpdateEntry,
|
||||
PresenceStatesInternal,
|
||||
ValueElementMap,
|
||||
} from "./presenceStates.js";
|
||||
import { createPresenceStates, mergeUntrackedDatastore } from "./presenceStates.js";
|
||||
import type { PresenceStates, PresenceStatesSchema } from "./types.js";
|
||||
import type { SystemWorkspaceDatastore } from "./systemWorkspace.js";
|
||||
import type {
|
||||
PresenceStates,
|
||||
PresenceStatesSchema,
|
||||
PresenceWorkspaceAddress,
|
||||
} from "./types.js";
|
||||
|
||||
import type { IExtensionMessage } from "@fluid-experimental/presence/internal/container-definitions/internal";
|
||||
|
||||
|
@ -26,18 +31,14 @@ interface PresenceStatesEntry<TSchema extends PresenceStatesSchema> {
|
|||
}
|
||||
|
||||
interface SystemDatastore {
|
||||
"system:presence": {
|
||||
clientToSessionId: {
|
||||
[
|
||||
ClientConnectionId: ClientConnectionId
|
||||
]: InternalTypes.ValueRequiredState<ClientSessionId>;
|
||||
};
|
||||
};
|
||||
"system:presence": SystemWorkspaceDatastore;
|
||||
}
|
||||
|
||||
type PresenceDatastore = {
|
||||
type InternalWorkspaceAddress = `${"s" | "n"}:${PresenceWorkspaceAddress}`;
|
||||
|
||||
type PresenceDatastore = SystemDatastore & {
|
||||
[WorkspaceAddress: string]: ValueElementMap<PresenceStatesSchema>;
|
||||
} & SystemDatastore;
|
||||
};
|
||||
|
||||
interface GeneralDatastoreMessageContent {
|
||||
[WorkspaceAddress: string]: {
|
||||
|
@ -47,7 +48,7 @@ interface GeneralDatastoreMessageContent {
|
|||
};
|
||||
}
|
||||
|
||||
type DatastoreMessageContent = GeneralDatastoreMessageContent & SystemDatastore;
|
||||
type DatastoreMessageContent = SystemDatastore & GeneralDatastoreMessageContent;
|
||||
|
||||
const datastoreUpdateMessageType = "Pres:DatastoreUpdate";
|
||||
interface DatastoreUpdateMessage extends IInboundSignalMessage {
|
||||
|
@ -56,7 +57,7 @@ interface DatastoreUpdateMessage extends IInboundSignalMessage {
|
|||
sendTimestamp: number;
|
||||
avgLatency: number;
|
||||
isComplete?: true;
|
||||
data: GeneralDatastoreMessageContent & Partial<SystemDatastore>;
|
||||
data: DatastoreMessageContent;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -81,8 +82,9 @@ function isPresenceMessage(
|
|||
* @internal
|
||||
*/
|
||||
export interface PresenceDatastoreManager {
|
||||
joinSession(clientId: ClientConnectionId): void;
|
||||
getWorkspace<TSchema extends PresenceStatesSchema>(
|
||||
internalWorkspaceAddress: string,
|
||||
internalWorkspaceAddress: InternalWorkspaceAddress,
|
||||
requestedContent: TSchema,
|
||||
): PresenceStates<TSchema>;
|
||||
processSignal(message: IExtensionMessage, local: boolean): void;
|
||||
|
@ -92,9 +94,7 @@ export interface PresenceDatastoreManager {
|
|||
* Manages singleton datastore for all Presence.
|
||||
*/
|
||||
export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
|
||||
private readonly datastore: PresenceDatastore = {
|
||||
"system:presence": { clientToSessionId: {} },
|
||||
};
|
||||
private readonly datastore: PresenceDatastore;
|
||||
private averageLatency = 0;
|
||||
private returnedMessages = 0;
|
||||
private refreshBroadcastRequested = false;
|
||||
|
@ -104,29 +104,17 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
|
|||
public constructor(
|
||||
private readonly clientSessionId: ClientSessionId,
|
||||
private readonly runtime: IEphemeralRuntime,
|
||||
private readonly presence: PresenceManagerInternal,
|
||||
private readonly lookupClient: (clientId: ClientSessionId) => ISessionClient,
|
||||
private readonly logger: ITelemetryLoggerExt | undefined,
|
||||
systemWorkspaceDatastore: SystemWorkspaceDatastore,
|
||||
systemWorkspace: PresenceStatesEntry<PresenceStatesSchema>,
|
||||
) {
|
||||
runtime.on("connected", this.onConnect.bind(this));
|
||||
|
||||
// Check if already connected at the time of construction.
|
||||
// If constructed during data store load, the runtime may already be connected
|
||||
// and the "connected" event will be raised during completion. With construction
|
||||
// delayed we expect that "connected" event has passed.
|
||||
// Note: In some manual testing, this does not appear to be enough to
|
||||
// always trigger an initial connect.
|
||||
const clientId = runtime.clientId;
|
||||
if (clientId !== undefined && runtime.connected) {
|
||||
this.onConnect(clientId);
|
||||
}
|
||||
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
|
||||
this.datastore = { "system:presence": systemWorkspaceDatastore } as PresenceDatastore;
|
||||
this.workspaces.set("system:presence", systemWorkspace);
|
||||
}
|
||||
|
||||
private onConnect(clientId: ClientConnectionId): void {
|
||||
this.datastore["system:presence"].clientToSessionId[clientId] = {
|
||||
rev: 0,
|
||||
timestamp: Date.now(),
|
||||
value: this.clientSessionId,
|
||||
};
|
||||
|
||||
public joinSession(clientId: ClientConnectionId): void {
|
||||
// Broadcast join message to all clients
|
||||
const updateProviders = [...this.runtime.getQuorum().getMembers().keys()].filter(
|
||||
(quorumClientId) => quorumClientId !== clientId,
|
||||
|
@ -145,7 +133,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
|
|||
}
|
||||
|
||||
public getWorkspace<TSchema extends PresenceStatesSchema>(
|
||||
internalWorkspaceAddress: string,
|
||||
internalWorkspaceAddress: InternalWorkspaceAddress,
|
||||
requestedContent: TSchema,
|
||||
): PresenceStates<TSchema> {
|
||||
const existing = this.workspaces.get(internalWorkspaceAddress);
|
||||
|
@ -167,12 +155,26 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
|
|||
return;
|
||||
}
|
||||
|
||||
const updates: GeneralDatastoreMessageContent[string] = {};
|
||||
const clientConnectionId = this.runtime.clientId;
|
||||
assert(clientConnectionId !== undefined, "Client connected without clientId");
|
||||
const currentClientToSessionValueState =
|
||||
this.datastore["system:presence"].clientToSessionId[clientConnectionId];
|
||||
|
||||
const updates: GeneralDatastoreMessageContent[InternalWorkspaceAddress] = {};
|
||||
for (const [key, value] of Object.entries(states)) {
|
||||
updates[key] = { [this.clientSessionId]: value };
|
||||
}
|
||||
this.localUpdate(
|
||||
{
|
||||
// Always send current connection mapping for some resiliency against
|
||||
// lost signals. This ensures that client session id found in `updates`
|
||||
// (which is this client's client session id) is always represented in
|
||||
// system workspace of recipient clients.
|
||||
"system:presence": {
|
||||
clientToSessionId: {
|
||||
[clientConnectionId]: { ...currentClientToSessionValueState },
|
||||
},
|
||||
},
|
||||
[internalWorkspaceAddress]: updates,
|
||||
},
|
||||
forceBroadcast,
|
||||
|
@ -182,7 +184,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
|
|||
const entry = createPresenceStates(
|
||||
{
|
||||
clientSessionId: this.clientSessionId,
|
||||
lookupClient: this.presence.getAttendee.bind(this.presence),
|
||||
lookupClient: this.lookupClient,
|
||||
localUpdate,
|
||||
},
|
||||
workspaceDatastore,
|
||||
|
@ -193,10 +195,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
|
|||
return entry.public;
|
||||
}
|
||||
|
||||
private localUpdate(
|
||||
data: GeneralDatastoreMessageContent & Partial<SystemDatastore>,
|
||||
_forceBroadcast: boolean,
|
||||
): void {
|
||||
private localUpdate(data: DatastoreMessageContent, _forceBroadcast: boolean): void {
|
||||
const content = {
|
||||
sendTimestamp: Date.now(),
|
||||
avgLatency: this.averageLatency,
|
||||
|
@ -304,7 +303,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
|
|||
if (updateProviders.includes(clientId)) {
|
||||
// Send all current state to the new client
|
||||
this.broadcastAllKnownState();
|
||||
this.presence.mc?.logger.sendTelemetryEvent({
|
||||
this.logger?.sendTelemetryEvent({
|
||||
eventName: "JoinResponse",
|
||||
details: {
|
||||
type: "broadcastAll",
|
||||
|
@ -342,7 +341,7 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
|
|||
// If not connected, nothing we can do.
|
||||
if (this.refreshBroadcastRequested && this.runtime.connected) {
|
||||
this.broadcastAllKnownState();
|
||||
this.presence.mc?.logger.sendTelemetryEvent({
|
||||
this.logger?.sendTelemetryEvent({
|
||||
eventName: "JoinResponse",
|
||||
details: {
|
||||
type: "broadcastAll",
|
||||
|
|
|
@ -4,11 +4,14 @@
|
|||
*/
|
||||
|
||||
import { createSessionId } from "@fluidframework/id-compressor/internal";
|
||||
import type { MonitoringContext } from "@fluidframework/telemetry-utils/internal";
|
||||
import type {
|
||||
ITelemetryLoggerExt,
|
||||
MonitoringContext,
|
||||
} from "@fluidframework/telemetry-utils/internal";
|
||||
import { createChildMonitoringContext } from "@fluidframework/telemetry-utils/internal";
|
||||
|
||||
import type { ClientConnectionId } from "./baseTypes.js";
|
||||
import type { IEphemeralRuntime, PresenceManagerInternal } from "./internalTypes.js";
|
||||
import type { IEphemeralRuntime } from "./internalTypes.js";
|
||||
import type {
|
||||
ClientSessionId,
|
||||
IPresence,
|
||||
|
@ -17,6 +20,8 @@ import type {
|
|||
} from "./presence.js";
|
||||
import type { PresenceDatastoreManager } from "./presenceDatastoreManager.js";
|
||||
import { PresenceDatastoreManagerImpl } from "./presenceDatastoreManager.js";
|
||||
import type { SystemWorkspace, SystemWorkspaceDatastore } from "./systemWorkspace.js";
|
||||
import { createSystemWorkspace } from "./systemWorkspace.js";
|
||||
import type {
|
||||
PresenceStates,
|
||||
PresenceWorkspaceAddress,
|
||||
|
@ -27,6 +32,7 @@ import type {
|
|||
IContainerExtension,
|
||||
IExtensionMessage,
|
||||
} from "@fluid-experimental/presence/internal/container-definitions/internal";
|
||||
import type { IEmitter } from "@fluid-experimental/presence/internal/events";
|
||||
import { createEmitter } from "@fluid-experimental/presence/internal/events";
|
||||
|
||||
/**
|
||||
|
@ -41,78 +47,57 @@ export type PresenceExtensionInterface = Required<
|
|||
/**
|
||||
* The Presence manager
|
||||
*/
|
||||
class PresenceManager
|
||||
implements IPresence, PresenceExtensionInterface, PresenceManagerInternal
|
||||
{
|
||||
class PresenceManager implements IPresence, PresenceExtensionInterface {
|
||||
private readonly datastoreManager: PresenceDatastoreManager;
|
||||
private readonly selfAttendee: ISessionClient;
|
||||
private readonly attendees = new Map<ClientConnectionId | ClientSessionId, ISessionClient>();
|
||||
private readonly systemWorkspace: SystemWorkspace;
|
||||
|
||||
public readonly mc: MonitoringContext | undefined = undefined;
|
||||
public readonly events = createEmitter<PresenceEvents>();
|
||||
|
||||
private readonly mc: MonitoringContext | undefined = undefined;
|
||||
|
||||
public constructor(runtime: IEphemeralRuntime, clientSessionId: ClientSessionId) {
|
||||
this.selfAttendee = {
|
||||
sessionId: clientSessionId,
|
||||
currentConnectionId: () => {
|
||||
throw new Error("Client has never been connected");
|
||||
},
|
||||
};
|
||||
this.attendees.set(clientSessionId, this.selfAttendee);
|
||||
|
||||
const logger = runtime.logger;
|
||||
if (logger) {
|
||||
this.mc = createChildMonitoringContext({ logger, namespace: "Presence" });
|
||||
this.mc.logger.sendTelemetryEvent({ eventName: "PresenceInstantiated" });
|
||||
}
|
||||
|
||||
// If already connected (now or in the past), populate self and attendees.
|
||||
const originalClientId = runtime.clientId;
|
||||
if (originalClientId !== undefined) {
|
||||
this.selfAttendee.currentConnectionId = () => originalClientId;
|
||||
this.attendees.set(originalClientId, this.selfAttendee);
|
||||
}
|
||||
|
||||
// Watch for connected event that will produce new (or first) clientId.
|
||||
// This event is added before instantiating the datastore manager so
|
||||
// that self can be given a proper clientId before datastore manager
|
||||
// might possibly try to use it. (Datastore manager is expected to
|
||||
// use connected clientId more directly and no order dependence should
|
||||
// be relied upon, but helps with debugging consistency.)
|
||||
runtime.on("connected", (clientId: ClientConnectionId) => {
|
||||
this.selfAttendee.currentConnectionId = () => clientId;
|
||||
this.attendees.set(clientId, this.selfAttendee);
|
||||
});
|
||||
|
||||
this.datastoreManager = new PresenceDatastoreManagerImpl(
|
||||
this.selfAttendee.sessionId,
|
||||
[this.datastoreManager, this.systemWorkspace] = setupSubComponents(
|
||||
clientSessionId,
|
||||
runtime,
|
||||
this,
|
||||
this.events,
|
||||
this.mc?.logger,
|
||||
);
|
||||
|
||||
runtime.on("connected", this.onConnect.bind(this));
|
||||
|
||||
// Check if already connected at the time of construction.
|
||||
// If constructed during data store load, the runtime may already be connected
|
||||
// and the "connected" event will be raised during completion. With construction
|
||||
// delayed we expect that "connected" event has passed.
|
||||
// Note: In some manual testing, this does not appear to be enough to
|
||||
// always trigger an initial connect.
|
||||
const clientId = runtime.clientId;
|
||||
if (clientId !== undefined && runtime.connected) {
|
||||
this.onConnect(clientId);
|
||||
}
|
||||
}
|
||||
|
||||
public readonly events = createEmitter<PresenceEvents>();
|
||||
private onConnect(clientConnectionId: ClientConnectionId): void {
|
||||
this.systemWorkspace.onConnectionAdded(clientConnectionId);
|
||||
this.datastoreManager.joinSession(clientConnectionId);
|
||||
}
|
||||
|
||||
public getAttendees(): ReadonlySet<ISessionClient> {
|
||||
return new Set(this.attendees.values());
|
||||
return this.systemWorkspace.getAttendees();
|
||||
}
|
||||
|
||||
public getAttendee(clientId: ClientConnectionId | ClientSessionId): ISessionClient {
|
||||
const attendee = this.attendees.get(clientId);
|
||||
if (attendee) {
|
||||
return attendee;
|
||||
}
|
||||
// This is a major hack to enable basic operation.
|
||||
// Missing attendees should be rejected.
|
||||
const newAttendee = {
|
||||
sessionId: clientId as ClientSessionId,
|
||||
currentConnectionId: () => clientId,
|
||||
} satisfies ISessionClient;
|
||||
this.attendees.set(clientId, newAttendee);
|
||||
return newAttendee;
|
||||
return this.systemWorkspace.getAttendee(clientId);
|
||||
}
|
||||
|
||||
public getMyself(): ISessionClient {
|
||||
return this.selfAttendee;
|
||||
return this.systemWorkspace.getMyself();
|
||||
}
|
||||
|
||||
public getStates<TSchema extends PresenceStatesSchema>(
|
||||
|
@ -141,6 +126,41 @@ class PresenceManager
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper for Presence Manager setup
|
||||
*
|
||||
* Presence Manager is outermost layer of the presence system and has two main
|
||||
* sub-components:
|
||||
* 1. PresenceDatastoreManager: Manages the unified general data for states and
|
||||
* registry for workspaces.
|
||||
* 2. SystemWorkspace: Custom internal workspace for system states including
|
||||
* attendee management. It is registered with the PresenceDatastoreManager.
|
||||
*/
|
||||
function setupSubComponents(
|
||||
clientSessionId: ClientSessionId,
|
||||
runtime: IEphemeralRuntime,
|
||||
events: IEmitter<PresenceEvents>,
|
||||
logger: ITelemetryLoggerExt | undefined,
|
||||
): [PresenceDatastoreManager, SystemWorkspace] {
|
||||
const systemWorkspaceDatastore: SystemWorkspaceDatastore = {
|
||||
clientToSessionId: {},
|
||||
};
|
||||
const systemWorkspaceConfig = createSystemWorkspace(
|
||||
clientSessionId,
|
||||
systemWorkspaceDatastore,
|
||||
events,
|
||||
);
|
||||
const datastoreManager = new PresenceDatastoreManagerImpl(
|
||||
clientSessionId,
|
||||
runtime,
|
||||
systemWorkspaceConfig.workspace.getAttendee.bind(systemWorkspaceConfig.workspace),
|
||||
logger,
|
||||
systemWorkspaceDatastore,
|
||||
systemWorkspaceConfig.statesEntry,
|
||||
);
|
||||
return [datastoreManager, systemWorkspaceConfig.workspace];
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates Presence Manager
|
||||
*
|
||||
|
|
|
@ -0,0 +1,226 @@
|
|||
/*!
|
||||
* Copyright (c) Microsoft Corporation and contributors. All rights reserved.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
import { assert } from "@fluidframework/core-utils/internal";
|
||||
|
||||
import type { ClientConnectionId } from "./baseTypes.js";
|
||||
import type { InternalTypes } from "./exposedInternalTypes.js";
|
||||
import type {
|
||||
ClientSessionId,
|
||||
IPresence,
|
||||
ISessionClient,
|
||||
PresenceEvents,
|
||||
} from "./presence.js";
|
||||
import type { PresenceStatesInternal } from "./presenceStates.js";
|
||||
import type { PresenceStates, PresenceStatesSchema } from "./types.js";
|
||||
|
||||
import type { IEmitter } from "@fluid-experimental/presence/internal/events";
|
||||
|
||||
/**
|
||||
* The system workspace's datastore structure.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
export interface SystemWorkspaceDatastore {
|
||||
clientToSessionId: {
|
||||
[ConnectionId: ClientConnectionId]: InternalTypes.ValueRequiredState<ClientSessionId>;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* There is no implementation class for this interface.
|
||||
* It is a simple structure. Most complicated aspect is that
|
||||
* `currentConnectionId()` member is replaced with a new
|
||||
* function when a more recent connection is added.
|
||||
*
|
||||
* See {@link SystemWorkspaceImpl.ensureAttendee}.
|
||||
*/
|
||||
interface SessionClient extends ISessionClient {
|
||||
/**
|
||||
* Order is used to track the most recent client connection
|
||||
* during a session.
|
||||
*/
|
||||
order: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
export interface SystemWorkspace
|
||||
// Portion of IPresence that is handled by SystemWorkspace along with
|
||||
// responsiblity for emitting "attendeeJoined" events.
|
||||
extends Pick<IPresence, "getAttendees" | "getAttendee" | "getMyself"> {
|
||||
/**
|
||||
* Must be called when the current client acquires a new connection.
|
||||
*
|
||||
* @param clientConnectionId - The new client connection id.
|
||||
*/
|
||||
onConnectionAdded(clientConnectionId: ClientConnectionId): void;
|
||||
}
|
||||
|
||||
class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
|
||||
private readonly selfAttendee: SessionClient;
|
||||
/**
|
||||
* `attendees` is this client's understanding of the attendees in the
|
||||
* session. The map covers entries for both session ids and connection
|
||||
* ids, which are never expected to collide, but if they did for same
|
||||
* client that would be fine.
|
||||
* An entry is for session id if the value's `sessionId` matches the key.
|
||||
*/
|
||||
private readonly attendees = new Map<ClientConnectionId | ClientSessionId, SessionClient>();
|
||||
|
||||
public constructor(
|
||||
clientSessionId: ClientSessionId,
|
||||
private readonly datastore: SystemWorkspaceDatastore,
|
||||
public readonly events: IEmitter<Pick<PresenceEvents, "attendeeJoined">>,
|
||||
) {
|
||||
this.selfAttendee = {
|
||||
sessionId: clientSessionId,
|
||||
order: 0,
|
||||
currentConnectionId: () => {
|
||||
throw new Error("Client has never been connected");
|
||||
},
|
||||
};
|
||||
this.attendees.set(clientSessionId, this.selfAttendee);
|
||||
}
|
||||
|
||||
public ensureContent<TSchemaAdditional extends PresenceStatesSchema>(
|
||||
_content: TSchemaAdditional,
|
||||
): never {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
public processUpdate(
|
||||
_received: number,
|
||||
_timeModifier: number,
|
||||
remoteDatastore: {
|
||||
clientToSessionId: {
|
||||
[
|
||||
ConnectionId: ClientConnectionId
|
||||
]: InternalTypes.ValueRequiredState<ClientSessionId> & {
|
||||
ignoreUnmonitored?: true;
|
||||
};
|
||||
};
|
||||
},
|
||||
): void {
|
||||
const postUpdateActions: (() => void)[] = [];
|
||||
for (const [clientConnectionId, value] of Object.entries(
|
||||
remoteDatastore.clientToSessionId,
|
||||
)) {
|
||||
const clientSessionId = value.value;
|
||||
const { attendee, isNew } = this.ensureAttendee(
|
||||
clientSessionId,
|
||||
clientConnectionId,
|
||||
/* order */ value.rev,
|
||||
);
|
||||
if (isNew) {
|
||||
postUpdateActions.push(() => this.events.emit("attendeeJoined", attendee));
|
||||
}
|
||||
const knownSessionId: InternalTypes.ValueRequiredState<ClientSessionId> | undefined =
|
||||
this.datastore.clientToSessionId[clientConnectionId];
|
||||
if (knownSessionId === undefined) {
|
||||
this.datastore.clientToSessionId[clientConnectionId] = value;
|
||||
} else {
|
||||
assert(knownSessionId.value === value.value, "Mismatched SessionId");
|
||||
}
|
||||
}
|
||||
// TODO: reorganize processUpdate and caller to process actions after all updates are processed.
|
||||
for (const action of postUpdateActions) {
|
||||
action();
|
||||
}
|
||||
}
|
||||
|
||||
public onConnectionAdded(clientConnectionId: ClientConnectionId): void {
|
||||
this.datastore.clientToSessionId[clientConnectionId] = {
|
||||
rev: this.selfAttendee.order++,
|
||||
timestamp: Date.now(),
|
||||
value: this.selfAttendee.sessionId,
|
||||
};
|
||||
|
||||
this.selfAttendee.currentConnectionId = () => clientConnectionId;
|
||||
this.attendees.set(clientConnectionId, this.selfAttendee);
|
||||
}
|
||||
|
||||
public getAttendees(): ReadonlySet<ISessionClient> {
|
||||
return new Set(this.attendees.values());
|
||||
}
|
||||
|
||||
public getAttendee(clientId: ClientConnectionId | ClientSessionId): ISessionClient {
|
||||
const attendee = this.attendees.get(clientId);
|
||||
if (attendee) {
|
||||
return attendee;
|
||||
}
|
||||
|
||||
// TODO: Restore option to add attendee on demand to handle internal
|
||||
// lookup cases that must come from internal data.
|
||||
// There aren't any resiliency mechanisms in place to handle a missed
|
||||
// ClientJoin right now.
|
||||
throw new Error("Attendee not found");
|
||||
}
|
||||
|
||||
public getMyself(): ISessionClient {
|
||||
return this.selfAttendee;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure the given client session and connection id pair are represented
|
||||
* in the attendee map. If not present, SessionClient is created and added
|
||||
* to map. If present, make sure the current connection id is updated.
|
||||
*/
|
||||
private ensureAttendee(
|
||||
clientSessionId: ClientSessionId,
|
||||
clientConnectionId: ClientConnectionId,
|
||||
order: number,
|
||||
): { attendee: SessionClient; isNew: boolean } {
|
||||
const currentConnectionId = (): ClientConnectionId => clientConnectionId;
|
||||
let attendee = this.attendees.get(clientSessionId);
|
||||
let isNew = false;
|
||||
if (attendee === undefined) {
|
||||
// New attendee. Create SessionClient and add session id based
|
||||
// entry to map.
|
||||
attendee = {
|
||||
sessionId: clientSessionId,
|
||||
order,
|
||||
currentConnectionId,
|
||||
};
|
||||
this.attendees.set(clientSessionId, attendee);
|
||||
isNew = true;
|
||||
} else if (order > attendee.order) {
|
||||
// The given association is newer than the one we have.
|
||||
// Update the order and current connection id.
|
||||
attendee.order = order;
|
||||
attendee.currentConnectionId = currentConnectionId;
|
||||
}
|
||||
// Always update entry for the connection id. (Okay if already set.)
|
||||
this.attendees.set(clientConnectionId, attendee);
|
||||
return { attendee, isNew };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates the system workspace.
|
||||
*
|
||||
* @internal
|
||||
*/
|
||||
export function createSystemWorkspace(
|
||||
clientSessionId: ClientSessionId,
|
||||
datastore: SystemWorkspaceDatastore,
|
||||
events: IEmitter<Pick<PresenceEvents, "attendeeJoined">>,
|
||||
): {
|
||||
workspace: SystemWorkspace;
|
||||
statesEntry: {
|
||||
internal: PresenceStatesInternal;
|
||||
public: PresenceStates<PresenceStatesSchema>;
|
||||
};
|
||||
} {
|
||||
const workspace = new SystemWorkspaceImpl(clientSessionId, datastore, events);
|
||||
return {
|
||||
workspace,
|
||||
statesEntry: {
|
||||
internal: workspace,
|
||||
public: undefined as unknown as PresenceStates<PresenceStatesSchema>,
|
||||
},
|
||||
};
|
||||
}
|
|
@ -3,24 +3,42 @@
|
|||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
import { EventAndErrorTrackingLogger } from "@fluidframework/test-utils/internal";
|
||||
import { strict as assert } from "node:assert";
|
||||
|
||||
import { EventAndErrorTrackingLogger } from "@fluidframework/test-utils/internal";
|
||||
import type { SinonFakeTimers } from "sinon";
|
||||
import { useFakeTimers } from "sinon";
|
||||
|
||||
import type { ISessionClient } from "../presence.js";
|
||||
import { createPresenceManager } from "../presenceManager.js";
|
||||
|
||||
import { MockEphemeralRuntime } from "./mockEphemeralRuntime.js";
|
||||
import { assertFinalExpectations } from "./testUtils.js";
|
||||
import {
|
||||
assertFinalExpectations,
|
||||
generateBasicClientJoin,
|
||||
prepareConnectedPresence,
|
||||
} from "./testUtils.js";
|
||||
|
||||
describe("Presence", () => {
|
||||
describe("PresenceManager", () => {
|
||||
let runtime: MockEphemeralRuntime;
|
||||
let logger: EventAndErrorTrackingLogger;
|
||||
const initialTime = 1000;
|
||||
let clock: SinonFakeTimers;
|
||||
|
||||
before(async () => {
|
||||
clock = useFakeTimers();
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
logger = new EventAndErrorTrackingLogger();
|
||||
runtime = new MockEphemeralRuntime(logger);
|
||||
clock.setSystemTime(initialTime);
|
||||
});
|
||||
|
||||
afterEach(function (done: Mocha.Done) {
|
||||
clock.reset();
|
||||
|
||||
// If the test passed so far, check final expectations.
|
||||
if (this.currentTest?.state === "passed") {
|
||||
assertFinalExpectations(runtime, logger);
|
||||
|
@ -28,6 +46,10 @@ describe("Presence", () => {
|
|||
done();
|
||||
});
|
||||
|
||||
after(() => {
|
||||
clock.restore();
|
||||
});
|
||||
|
||||
it("can be created", () => {
|
||||
// Act & Verify (does not throw)
|
||||
createPresenceManager(runtime);
|
||||
|
@ -43,5 +65,160 @@ describe("Presence", () => {
|
|||
// Verify
|
||||
assertFinalExpectations(runtime, logger);
|
||||
});
|
||||
|
||||
it("throws when unknown attendee is requested via `getAttendee`", () => {
|
||||
// Setup
|
||||
const presence = createPresenceManager(runtime);
|
||||
|
||||
// Act & Verify
|
||||
assert.throws(() => presence.getAttendee("unknown"), /Attendee not found/);
|
||||
});
|
||||
|
||||
describe("when connected", () => {
|
||||
let presence: ReturnType<typeof createPresenceManager>;
|
||||
const afterCleanUp: (() => void)[] = [];
|
||||
|
||||
beforeEach(() => {
|
||||
presence = prepareConnectedPresence(runtime, "seassionId-2", "client2", clock, logger);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
for (const cleanUp of afterCleanUp) {
|
||||
cleanUp();
|
||||
}
|
||||
afterCleanUp.length = 0;
|
||||
});
|
||||
|
||||
describe("attendee", () => {
|
||||
const newAttendeeSessionId = "sessionId-4";
|
||||
const initialAttendeeConnectionId = "client4";
|
||||
let newAttendee: ISessionClient | undefined;
|
||||
let initialAttendeeSignal: ReturnType<typeof generateBasicClientJoin>;
|
||||
|
||||
beforeEach(() => {
|
||||
runtime.submitSignal = () => {};
|
||||
newAttendee = undefined;
|
||||
afterCleanUp.push(
|
||||
presence.events.on("attendeeJoined", (attendee) => {
|
||||
assert(newAttendee === undefined, "Only one attendee should be announced");
|
||||
newAttendee = attendee;
|
||||
}),
|
||||
);
|
||||
|
||||
initialAttendeeSignal = generateBasicClientJoin(clock.now - 50, {
|
||||
averageLatency: 50,
|
||||
clientSessionId: newAttendeeSessionId,
|
||||
clientConnectionId: initialAttendeeConnectionId,
|
||||
updateProviders: ["client2"],
|
||||
});
|
||||
});
|
||||
|
||||
it("is announced via `attendeeJoined` when new", () => {
|
||||
// Act - simulate join message from client
|
||||
presence.processSignal("", initialAttendeeSignal, false);
|
||||
|
||||
// Verify
|
||||
assert(newAttendee !== undefined, "No attendee was announced");
|
||||
assert.equal(
|
||||
newAttendee.sessionId,
|
||||
newAttendeeSessionId,
|
||||
"Attendee has wrong session id",
|
||||
);
|
||||
assert.equal(
|
||||
newAttendee.currentConnectionId(),
|
||||
initialAttendeeConnectionId,
|
||||
"Attendee has wrong client connection id",
|
||||
);
|
||||
});
|
||||
|
||||
describe("already known", () => {
|
||||
beforeEach(() => {
|
||||
// Setup - simulate join message from client
|
||||
presence.processSignal("", initialAttendeeSignal, false);
|
||||
assert(newAttendee !== undefined, "No attendee was announced in setup");
|
||||
});
|
||||
|
||||
for (const [desc, id] of [
|
||||
["connection id", initialAttendeeConnectionId] as const,
|
||||
["session id", newAttendeeSessionId] as const,
|
||||
]) {
|
||||
it(`is available from \`getAttendee\` by ${desc}`, () => {
|
||||
// Act
|
||||
const attendee = presence.getAttendee(id);
|
||||
// Verify
|
||||
assert.equal(attendee, newAttendee, "getAttendee returned wrong attendee");
|
||||
});
|
||||
}
|
||||
|
||||
it("is available from `getAttendees`", () => {
|
||||
// Setup
|
||||
assert(newAttendee !== undefined, "No attendee was set in beforeEach");
|
||||
|
||||
// Act
|
||||
const attendees = presence.getAttendees();
|
||||
assert(attendees.has(newAttendee), "getAttendees set does not contain attendee");
|
||||
});
|
||||
|
||||
it("is NOT announced when rejoined with same connection (duplicate signal)", () => {
|
||||
clock.tick(10);
|
||||
|
||||
// Act & Verify - simulate duplicate join message from client
|
||||
presence.processSignal("", initialAttendeeSignal, false);
|
||||
});
|
||||
|
||||
it("is NOT announced when rejoined with different connection and current information is updated", () => {
|
||||
// Setup
|
||||
assert(newAttendee !== undefined, "No attendee was set in beforeEach");
|
||||
|
||||
const updatedClientConnectionId = "client5";
|
||||
clock.tick(20);
|
||||
const rejoinedAttendeeSignal = generateBasicClientJoin(clock.now - 20, {
|
||||
averageLatency: 20,
|
||||
clientSessionId: newAttendeeSessionId, // Same session id
|
||||
clientConnectionId: updatedClientConnectionId, // Different connection id
|
||||
connectionOrder: 1,
|
||||
updateProviders: ["client2"],
|
||||
});
|
||||
rejoinedAttendeeSignal.content.data["system:presence"].clientToSessionId[
|
||||
initialAttendeeConnectionId
|
||||
] =
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
|
||||
initialAttendeeSignal.content.data["system:presence"].clientToSessionId[
|
||||
initialAttendeeConnectionId
|
||||
]!;
|
||||
|
||||
// Act - simulate new join message from same client (without disconnect)
|
||||
presence.processSignal("", rejoinedAttendeeSignal, false);
|
||||
|
||||
// Verify
|
||||
// Session id is unchanged
|
||||
assert.equal(
|
||||
newAttendee.sessionId,
|
||||
newAttendeeSessionId,
|
||||
"Attendee has wrong session id",
|
||||
);
|
||||
// Current connection id is updated
|
||||
assert(
|
||||
newAttendee.currentConnectionId() === updatedClientConnectionId,
|
||||
"Attendee does not have updated client connection id",
|
||||
);
|
||||
// Attendee is available via new connection id
|
||||
const attendeeViaUpdatedId = presence.getAttendee(updatedClientConnectionId);
|
||||
assert.equal(
|
||||
attendeeViaUpdatedId,
|
||||
newAttendee,
|
||||
"getAttendee returned wrong attendee for updated connection id",
|
||||
);
|
||||
// Attendee is available via old connection id
|
||||
const attendeeViaOriginalId = presence.getAttendee(initialAttendeeConnectionId);
|
||||
assert.equal(
|
||||
attendeeViaOriginalId,
|
||||
newAttendee,
|
||||
"getAttendee returned wrong attendee for original connection id",
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -12,26 +12,37 @@ import { createPresenceManager } from "../presenceManager.js";
|
|||
import type { MockEphemeralRuntime } from "./mockEphemeralRuntime.js";
|
||||
|
||||
import type { ClientConnectionId, ClientSessionId } from "@fluid-experimental/presence";
|
||||
import type { IExtensionMessage } from "@fluid-experimental/presence/internal/container-definitions/internal";
|
||||
|
||||
/**
|
||||
* Generates expected join signal for a client that was initialized while connected.
|
||||
*/
|
||||
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types, @typescript-eslint/explicit-function-return-type
|
||||
export function craftInitializationClientJoin(
|
||||
export function generateBasicClientJoin(
|
||||
fixedTime: number,
|
||||
clientSessionId: string = "seassionId-2",
|
||||
clientConnectionId: ClientConnectionId = "client2",
|
||||
updateProviders: string[] = ["client0", "client1", "client3"],
|
||||
{
|
||||
clientSessionId = "seassionId-2",
|
||||
clientConnectionId = "client2",
|
||||
updateProviders = ["client0", "client1", "client3"],
|
||||
connectionOrder = 0,
|
||||
averageLatency = 0,
|
||||
}: {
|
||||
clientSessionId?: string;
|
||||
clientConnectionId?: ClientConnectionId;
|
||||
updateProviders?: string[];
|
||||
connectionOrder?: number;
|
||||
averageLatency?: number;
|
||||
},
|
||||
) {
|
||||
return {
|
||||
type: "Pres:ClientJoin",
|
||||
content: {
|
||||
"avgLatency": 0,
|
||||
"avgLatency": averageLatency,
|
||||
"data": {
|
||||
"system:presence": {
|
||||
"clientToSessionId": {
|
||||
[clientConnectionId]: {
|
||||
"rev": 0,
|
||||
"rev": connectionOrder,
|
||||
"timestamp": fixedTime,
|
||||
"value": clientSessionId,
|
||||
},
|
||||
|
@ -41,7 +52,8 @@ export function craftInitializationClientJoin(
|
|||
"sendTimestamp": fixedTime,
|
||||
updateProviders,
|
||||
},
|
||||
};
|
||||
clientId: clientConnectionId,
|
||||
} satisfies IExtensionMessage<"Pres:ClientJoin">;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -76,12 +88,11 @@ export function prepareConnectedPresence(
|
|||
quorumClientIds.length = 3;
|
||||
}
|
||||
|
||||
const expectedClientJoin = craftInitializationClientJoin(
|
||||
clock.now,
|
||||
const expectedClientJoin = generateBasicClientJoin(clock.now, {
|
||||
clientSessionId,
|
||||
clientConnectionId,
|
||||
quorumClientIds,
|
||||
);
|
||||
updateProviders: quorumClientIds,
|
||||
});
|
||||
runtime.signalsExpected.push([expectedClientJoin.type, expectedClientJoin.content]);
|
||||
|
||||
const presence = createPresenceManager(runtime, clientSessionId as ClientSessionId);
|
||||
|
|
Загрузка…
Ссылка в новой задаче