Skip to content

Commit

Permalink
Retry: Expose "self" in audience and switch container to project more…
Browse files Browse the repository at this point in the history
… consistent states related to connections and changes of clientId (#20703)

This change substantially simplifies programming model around managing connection and clientId changes.

Key changes:
1. Add `self()` method in audience.
2. Add `"selfChanged"` event in audience.
3. Changes how connected / clientId are exposed to hosts & container runtime while container runtime is loading, if connection is established before container runtime is loaded.
   - in the past, we would expose connected state and clientId, but also would raise "connected" event once runtime was fully loaded. You see as result code in ContainerRuntime.setConnectionState() that calculates, and pivots based on changeOfState variable.
   - With this change, we delay telling the world that we are connected, until runtime is fully loaded. And only then raise event and change results of getSelf(). That way user code can be substantially simplified, where they do not need to care about missing connected events (if, for example, we were not doing that extra event), because we will promise to them (once this change propagates) that connections can materialize only after load, thus simplifying loading flows.

ADO: https://dev.azure.com/fluidframework/internal/_workitems/edit/6364
  • Loading branch information
vladsud authored Apr 20, 2024
1 parent c9d1562 commit fb1c1da
Show file tree
Hide file tree
Showing 22 changed files with 437 additions and 133 deletions.
16 changes: 16 additions & 0 deletions .changeset/cruel-trees-dance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
"@fluidframework/container-definitions": major
"@fluidframework/container-loader": major
"@fluidframework/container-runtime": major
---

Audience & connection sequencing improvements

Here are breaking changes in Audience behavior:
1. IAudience no longer implements EventEmmiter. If you used addListener() or removeListener(), please replace with on() & off() respectively.
2. IAudience interface implements getSelf() method and "selfChanged" event.
3. IContainerContext.audience is no longer optional
4. "connected" events are now raised (various API surfaces - IContainer, IContainerRuntime, IFluidDataStoreRuntime, etc.) a bit later in reconnection sequence for "read" connections - only after client receives its own "join" signal and caught up on ops, which makes it symmetrical with "write" connections.
- If this change in behavior breaks some scenario, please let us know immediately, but you can revert that behavior using the following feature gates:
- "Fluid.Container.DisableCatchUpBeforeDeclaringConnected"
- "Fluid.Container.DisableJoinSignalWait"
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
```ts

import type { EventEmitter } from 'events_pkg';
import type { FluidObject } from '@fluidframework/core-interfaces';
import type { IAnyDriverError } from '@fluidframework/driver-definitions';
import type { IClient } from '@fluidframework/protocol-definitions';
Expand Down Expand Up @@ -73,16 +72,23 @@ export interface ContainerWarning extends IErrorBase_2 {
}

// @public
export interface IAudience extends EventEmitter {
export interface IAudience extends IEventProvider<IAudienceEvents> {
getMember(clientId: string): IClient | undefined;
getMembers(): Map<string, IClient>;
on(event: "addMember" | "removeMember", listener: (clientId: string, client: IClient) => void): this;
getSelf: () => ISelf | undefined;
}

// @public
export interface IAudienceEvents extends IEvent {
(event: "addMember" | "removeMember", listener: (clientId: string, client: IClient) => void): void;
(event: "selfChanged", listener: (oldValue: ISelf | undefined, newValue: ISelf) => void): void;
}

// @alpha
export interface IAudienceOwner extends IAudience {
addMember(clientId: string, details: IClient): void;
removeMember(clientId: string): boolean;
setCurrentClientId(clientId: string): void;
}

// @alpha
Expand Down Expand Up @@ -146,7 +152,7 @@ export interface IContainer extends IEventProvider<IContainerEvents> {
export interface IContainerContext {
readonly attachState: AttachState;
// (undocumented)
readonly audience: IAudience | undefined;
readonly audience: IAudience;
// (undocumented)
readonly baseSnapshot: ISnapshotTree | undefined;
// (undocumented)
Expand Down Expand Up @@ -454,6 +460,12 @@ export interface IRuntimeFactory extends IProvideRuntimeFactory {
instantiateRuntime(context: IContainerContext, existing: boolean): Promise<IRuntime>;
}

// @public
export interface ISelf {
client?: IClient;
clientId: string;
}

// @alpha
export const isFluidBrowserPackage: (maybePkg: unknown) => maybePkg is Readonly<IFluidBrowserPackage>;

Expand Down
10 changes: 10 additions & 0 deletions packages/common/container-definitions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,19 @@
"backCompat": false
},
"InterfaceDeclaration_IContainer": {
"backCompat": false,
"forwardCompat": false
},
"InterfaceDeclaration_IContainerContext": {
"backCompat": false,
"forwardCompat": false
},
"InterfaceDeclaration_IAudienceOwner": {
"backCompat": false,
"forwardCompat": false
},
"InterfaceDeclaration_IAudience": {
"forwardCompat": false,
"backCompat": false
},
"InterfaceDeclaration_IBatchMessage": {
Expand Down
101 changes: 88 additions & 13 deletions packages/common/container-definitions/src/audience.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
* Licensed under the MIT License.
*/

import type { IEvent, IEventProvider } from "@fluidframework/core-interfaces";
import type { IClient } from "@fluidframework/protocol-definitions";
import type { EventEmitter } from "events_pkg";

/**
* Manages the state and the members for {@link IAudience}
* @alpha
Expand All @@ -21,27 +20,70 @@ export interface IAudienceOwner extends IAudience {
* @returns if a client was removed from the audience
*/
removeMember(clientId: string): boolean;

/**
* Notifies Audience that current clientId has changed.
* See {@link IAudience.getSelf} and {@link IAudienceEvents}'s "selfChanged" event for more details.
*/
setCurrentClientId(clientId: string): void;
}

/**
* Represents all clients connected to the op stream, both read-only and read/write.
*
* @remarks Access to the Audience when a container is disconnected is a tricky subject.
* See the remarks on specific methods for more details.
*
* See {@link https://nodejs.org/api/events.html#class-eventemitter | here} for an overview of the `EventEmitter`
* class.
* Interface describing Audience events
* @public
*/
export interface IAudience extends EventEmitter {
export interface IAudienceEvents extends IEvent {
/**
* See {@link https://nodejs.dev/learn/the-nodejs-event-emitter | here} for an overview of `EventEmitter.on`.
* "addMember" event is raised when a new user joins collaborative session.
* "removeMember" event is raised when a user leaves collaborative session.
*/
on(
(
event: "addMember" | "removeMember",
listener: (clientId: string, client: IClient) => void,
): this;
): void;
/**
* Notifies that client established new connection and caught-up on ops.
* @param oldValue - represents old connection. Please note that oldValue.client in almost all cases will be undefined,
* due to specifics how Audience refreshes on reconnect. In the future we could improve it and always provide client information.
* @param newValue - represents newly established connection. While {@link IAudience.getSelf} is experimental, it's not guaranteed that
* newValue.client is present. Same is true if you are consuming audience from container runtime layer and running against old version of loader.
*/
(event: "selfChanged", listener: (oldValue: ISelf | undefined, newValue: ISelf) => void): void;
}

/**
* Return type of {@link IAudience.getSelf}. Please see remarks for {@link IAudience.getSelf} to learn more details on promises.
* @public
*/
export interface ISelf {
/**
* clientId of current or previous connection (if client is in disconnected or reconnecting / catching up state)
* It changes only when client has reconnected, caught up with latest ops.
*/
clientId: string;

/**
* Information about current client (including user identity, connection properties), supplied by ordering service when
* client connected to it and received {@link ISelf.clientId}.
* If present (not undefined), it's same value as calling IAudience.getMember(clientId).
* This property could be undefined even if there is non-undefined clientId.
* This could happen in the following cases:
* 1) Container was loaded from stash, by providing IPendingContainerState state to Container.load().
* 2) Container is in the process of establishing new connection. Information about old connection is already reset
* (old clientId is no longer in list of members), but clientId has not yet changed to a new value.
*/
client?: IClient;
}

/**
* Represents all clients connected to the op stream, both read-only and read/write.
*
* @remarks Access to the Audience when a container is disconnected is a tricky subject.
* See the remarks on specific methods for more details.
*
* @public
*/
export interface IAudience extends IEventProvider<IAudienceEvents> {
/**
* List all clients connected to the op stream, keyed off their clientId.
*
Expand All @@ -67,4 +109,37 @@ export interface IAudience extends EventEmitter {
* does not technically have a clientId tied to an active connection to the service.
*/
getMember(clientId: string): IClient | undefined;

/**
* Returns information about client's connection. Please see {@link ISelf} member descriptions for more details.
* undefined if this client has never connected to the ordering service.
* Please see {@link ISelf.clientId} for more details on when values returned by this function change over time.
*
* @experimental
*
* @remarks
* This API is experimental.
*
* Reconnection process will have these phases:
* 1. Establishing connection phase:
* - new connection clientId is added to member's list. That said, self.clientId still reflects old information.
* - The old client's information is removed from members' list. getMember(self.clientId) will return undefined.
* 2. Catch-up phase. Client catches up on latest ops and becomes current.
* 3. "connect" phase - the following happens synchronously:
* - getSelf() information changes to reflect new connection
* - "selfChanged" event on this object fires
* - Various API surfaces may expose "connected" event. This event fires at the same time as self changes. That said, "connected" event will not fire at ContainerRuntime layer if container is read-only.
*
* That said, at the moment this is an experimental API. It depends on some experimental settings that might change in the future.
* Events described in phase #3 may not happen at the same time if kill-bit feature gates are engaged due to a bug discovered in new logic
* that delivers this functionality. Once it's proven (at scale) that everything works well, experimental tag will be removed.
* Also application that deploy loader & container runtime bundles independently will see new (synchronized) behavior only when loader changes are deployed.
* Newer runtimes will continue to observe old (non-synchronized) behavior when paired with older loader code.
*
* When promises in phase #3 are broken (due to conditions described above), consumers could experience current clientId being changed
* (and "selfChanged" event fired) while
* 1. Such clientId is not present in Audience
* 2. Client is not fully caught up
*/
getSelf: () => ISelf | undefined;
}
2 changes: 1 addition & 1 deletion packages/common/container-definitions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* @packageDocumentation
*/

export type { IAudience, IAudienceOwner } from "./audience.js";
export type { IAudience, IAudienceOwner, IAudienceEvents, ISelf } from "./audience.js";
export type { IFluidBrowserPackage, IFluidBrowserPackageEnvironment } from "./browserPackage.js";
export { isFluidBrowserPackage } from "./browserPackage.js";
export type {
Expand Down
2 changes: 1 addition & 1 deletion packages/common/container-definitions/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ export interface IContainerContext {
readonly closeFn: (error?: ICriticalContainerError) => void;
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
readonly quorum: IQuorumClients;
readonly audience: IAudience | undefined;
readonly audience: IAudience;
readonly loader: ILoader;
// The logger implementation, which would support tagged events, should be provided by the loader.
readonly taggedLogger: ITelemetryBaseLogger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ declare function get_old_InterfaceDeclaration_IAudience():
declare function use_current_InterfaceDeclaration_IAudience(
use: TypeOnly<current.IAudience>): void;
use_current_InterfaceDeclaration_IAudience(
// @ts-expect-error compatibility expected to be broken
get_old_InterfaceDeclaration_IAudience());

/*
Expand All @@ -273,6 +274,7 @@ declare function get_current_InterfaceDeclaration_IAudience():
declare function use_old_InterfaceDeclaration_IAudience(
use: TypeOnly<old.IAudience>): void;
use_old_InterfaceDeclaration_IAudience(
// @ts-expect-error compatibility expected to be broken
get_current_InterfaceDeclaration_IAudience());

/*
Expand All @@ -285,6 +287,7 @@ declare function get_old_InterfaceDeclaration_IAudienceOwner():
declare function use_current_InterfaceDeclaration_IAudienceOwner(
use: TypeOnly<current.IAudienceOwner>): void;
use_current_InterfaceDeclaration_IAudienceOwner(
// @ts-expect-error compatibility expected to be broken
get_old_InterfaceDeclaration_IAudienceOwner());

/*
Expand All @@ -297,6 +300,7 @@ declare function get_current_InterfaceDeclaration_IAudienceOwner():
declare function use_old_InterfaceDeclaration_IAudienceOwner(
use: TypeOnly<old.IAudienceOwner>): void;
use_old_InterfaceDeclaration_IAudienceOwner(
// @ts-expect-error compatibility expected to be broken
get_current_InterfaceDeclaration_IAudienceOwner());

/*
Expand Down Expand Up @@ -395,6 +399,7 @@ declare function get_current_InterfaceDeclaration_IContainer():
declare function use_old_InterfaceDeclaration_IContainer(
use: TypeOnly<old.IContainer>): void;
use_old_InterfaceDeclaration_IContainer(
// @ts-expect-error compatibility expected to be broken
get_current_InterfaceDeclaration_IContainer());

/*
Expand All @@ -407,6 +412,7 @@ declare function get_old_InterfaceDeclaration_IContainerContext():
declare function use_current_InterfaceDeclaration_IContainerContext(
use: TypeOnly<current.IContainerContext>): void;
use_current_InterfaceDeclaration_IContainerContext(
// @ts-expect-error compatibility expected to be broken
get_old_InterfaceDeclaration_IContainerContext());

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ describe("mixinAttributor", () => {
const sampleAttributor = new Attributor([
[
op.sequenceNumber!,
{ timestamp: op.timestamp!, user: context.audience!.getMember(op.clientId!)!.user },
{ timestamp: op.timestamp!, user: context.audience.getMember(op.clientId!)!.user },
],
]);

Expand Down
5 changes: 5 additions & 0 deletions packages/loader/container-loader/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,15 @@
"typeValidation": {
"broken": {
"InterfaceDeclaration_IContainerExperimental": {
"backCompat": false,
"forwardCompat": false
},
"InterfaceDeclaration_IParsedUrl": {
"forwardCompat": false
},
"InterfaceDeclaration_IProtocolHandler": {
"backCompat": false,
"forwardCompat": false
}
}
}
Expand Down
39 changes: 30 additions & 9 deletions packages/loader/container-loader/src/audience.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,50 @@
* Licensed under the MIT License.
*/

import { EventEmitter } from "@fluid-internal/client-utils";
import { IAudienceOwner } from "@fluidframework/container-definitions/internal";
import { TypedEventEmitter } from "@fluid-internal/client-utils";
import {
IAudienceEvents,
IAudienceOwner,
ISelf,
} from "@fluidframework/container-definitions/internal";
import { assert } from "@fluidframework/core-utils/internal";
import { IClient } from "@fluidframework/protocol-definitions";

/**
* Audience represents all clients connected to the op stream.
*/
export class Audience extends EventEmitter implements IAudienceOwner {
export class Audience extends TypedEventEmitter<IAudienceEvents> implements IAudienceOwner {
private readonly members = new Map<string, IClient>();
private _currentClientId: string | undefined;

constructor() {
super();
// We are expecting this class to have many listeners, so we suppress noisy "MaxListenersExceededWarning" logging.
super.setMaxListeners(0);
}

public on(
event: "addMember" | "removeMember",
listener: (clientId: string, client: IClient) => void,
): this;
public on(event: string, listener: (...args: any[]) => void): this {
return super.on(event, listener);
public getSelf(): ISelf | undefined {
return this._currentClientId === undefined
? undefined
: {
clientId: this._currentClientId,
client: this.getMember(this._currentClientId),
};
}

public setCurrentClientId(clientId: string): void {
if (this._currentClientId !== clientId) {
const oldId = this._currentClientId;
this._currentClientId = clientId;
// this.getMember(clientId) could resolve to undefined in these two cases:
// 1) Feature gates controlling ConnectionStateHandler() behavior are off
// 2) we are loading from stashed state and audience is empty, but we remember and set prior clientId
this.emit(
"selfChanged",
oldId === undefined ? undefined : ({ clientId: oldId } satisfies ISelf),
{ clientId, client: this.getMember(clientId) } satisfies ISelf,
);
}
}

/**
Expand Down
Loading

0 comments on commit fb1c1da

Please sign in to comment.