diff --git a/src/client.ts b/src/client.ts index 320f70c150b..b2d52d085e0 100644 --- a/src/client.ts +++ b/src/client.ts @@ -380,6 +380,11 @@ export interface IStartClientOpts { * This should be in the order of hours. Default: undefined. */ clientWellKnownPollPeriod?: number; + + /** + * @experimental + */ + experimentalThreadSupport?: boolean; } export interface IStoredClientOpts extends IStartClientOpts { diff --git a/src/models/event.ts b/src/models/event.ts index 2159c48ee66..e61c411c7c1 100644 --- a/src/models/event.ts +++ b/src/models/event.ts @@ -28,7 +28,9 @@ import { EventType, MsgType, RelationType } from "../@types/event"; import { Crypto } from "../crypto"; import { deepSortedObjectEntries } from "../utils"; import { RoomMember } from "./room-member"; +import { Thread } from "./thread"; import { IActionsObject } from '../pushprocessor'; +import { ReEmitter } from '../ReEmitter'; /** * Enum for event statuses. @@ -191,6 +193,12 @@ export class MatrixEvent extends EventEmitter { */ private txnId: string = null; + /** + * @experimental + * A reference to the thread this event belongs to + */ + private thread: Thread = null; + /* Set an approximate timestamp for the event relative the local clock. * This will inherently be approximate because it doesn't take into account * the time between the server putting the 'age' field on the event as it sent @@ -212,6 +220,8 @@ export class MatrixEvent extends EventEmitter { */ public verificationRequest = null; + private readonly reEmitter: ReEmitter; + /** * Construct a Matrix Event object * @constructor @@ -261,6 +271,7 @@ export class MatrixEvent extends EventEmitter { this.txnId = event.txn_id || null; this.localTimestamp = Date.now() - this.getAge(); + this.reEmitter = new ReEmitter(this); } /** @@ -381,6 +392,15 @@ export class MatrixEvent extends EventEmitter { return this.event.content || {}; } + /** + * @experimental + * Get the event ID of the replied event + */ + public get replyEventId(): string { + const relations = this.getWireContent()["m.relates_to"]; + return relations?.["m.in_reply_to"]?.["event_id"]; + } + /** * Get the previous event content JSON. This will only return something for * state events which exist in the timeline. @@ -1271,6 +1291,21 @@ export class MatrixEvent extends EventEmitter { public getTxnId(): string | undefined { return this.txnId; } + + /** + * @experimental + */ + public setThread(thread: Thread): void { + this.thread = thread; + this.reEmitter.reEmit(thread, ["Thread.ready", "Thread.update"]); + } + + /** + * @experimental + */ + public getThread(): Thread { + return this.thread; + } } /* REDACT_KEEP_KEYS gives the keys we keep when an event is redacted diff --git a/src/models/room.ts b/src/models/room.ts index 1f36eaf9d59..59ef279e606 100644 --- a/src/models/room.ts +++ b/src/models/room.ts @@ -35,6 +35,7 @@ import { IRoomVersionsCapability, MatrixClient, PendingEventOrdering, RoomVersio import { ResizeMethod } from "../@types/partials"; import { Filter } from "../filter"; import { RoomState } from "./room-state"; +import { Thread } from "./thread"; // These constants are used as sane defaults when the homeserver doesn't support // the m.room_versions capability. In practice, KNOWN_SAFE_ROOM_VERSION should be @@ -145,6 +146,11 @@ export class Room extends EventEmitter { public oldState: RoomState; public currentState: RoomState; + /** + * @experimental + */ + public threads = new Set(); + /** * Construct a new Room. * @@ -857,13 +863,26 @@ export class Room extends EventEmitter { } /** - * Get an event which is stored in our unfiltered timeline set + * Get an event which is stored in our unfiltered timeline set or in a thread * * @param {string} eventId event ID to look for * @return {?module:models/event.MatrixEvent} the given event, or undefined if unknown */ public findEventById(eventId: string): MatrixEvent | undefined { - return this.getUnfilteredTimelineSet().findEventById(eventId); + let event = this.getUnfilteredTimelineSet().findEventById(eventId); + + if (event) { + return event; + } else { + const threads = this.getThreads(); + for (let i = 0; i < threads.length; i++) { + const thread = threads[i]; + event = thread.findEventById(eventId); + if (event) { + return event; + } + } + } } /** @@ -1049,6 +1068,54 @@ export class Room extends EventEmitter { ); } + /** + * @experimental + */ + public addThread(thread: Thread): Set { + this.threads.add(thread); + if (!thread.ready) { + thread.once("Thread.ready", this.dedupeThreads); + this.emit("Thread.update", thread); + this.reEmitter.reEmit(thread, ["Thread.update", "Thread.ready"]); + } + return this.threads; + } + + /** + * @experimental + */ + public getThread(eventId: string): Thread { + return this.getThreads().find(thread => { + return thread.id === eventId; + }); + } + + /** + * @experimental + */ + public getThreads(): Thread[] { + return Array.from(this.threads.values()); + } + + /** + * Two threads starting from a different child event can end up + * with the same event root. This method ensures that the duplicates + * are removed + * @experimental + */ + private dedupeThreads = (readyThread): void => { + const threads = Array.from(this.threads); + if (threads.includes(readyThread)) { + this.threads = new Set(threads.filter(thread => { + if (readyThread.id === thread.id && readyThread !== thread) { + return false; + } else { + return true; + } + })); + } + }; + /** * Get a member from the current room state. * @param {string} userId The user ID of the member. @@ -1225,6 +1292,28 @@ export class Room extends EventEmitter { } } + /** + * Add an event to a thread's timeline. Will fire "Thread.update" + * @experimental + */ + public addThreadedEvent(event: MatrixEvent): void { + if (event.getUnsigned().transaction_id) { + const existingEvent = this.txnToEvent[event.getUnsigned().transaction_id]; + if (existingEvent) { + // remote echo of an event we sent earlier + this.handleRemoteEcho(event, existingEvent); + } + } + + let thread = this.findEventById(event.replyEventId)?.getThread(); + if (thread) { + thread.addEvent(event); + } else { + thread = new Thread([event], this, this.client); + this.addThread(thread); + } + } + /** * Add an event to the end of this room's live timelines. Will fire * "Room.timeline". diff --git a/src/models/thread.ts b/src/models/thread.ts new file mode 100644 index 00000000000..043ff7dc776 --- /dev/null +++ b/src/models/thread.ts @@ -0,0 +1,186 @@ +/* +Copyright 2021 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { EventEmitter } from "events"; +import { MatrixClient } from "../matrix"; +import { MatrixEvent } from "./event"; +import { EventTimelineSet } from './event-timeline-set'; +import { Room } from './room'; + +/** + * @experimental + */ +export class Thread extends EventEmitter { + /** + * A reference to the event ID at the top of the thread + */ + private root: string; + /** + * A reference to all the events ID at the bottom of the threads + */ + public tail = new Set(); + private _timelineSet: EventTimelineSet; + + constructor( + events: MatrixEvent[] = [], + public readonly room: Room, + public readonly client: MatrixClient, + ) { + super(); + this._timelineSet = new EventTimelineSet(room, { + unstableClientRelationAggregation: true, + timelineSupport: true, + }); + events.forEach(event => this.addEvent(event)); + } + + /** + * Add an event to the thread and updates + * the tail/root references if needed + * Will fire "Thread.update" + * @param event The event to add + */ + public async addEvent(event: MatrixEvent): Promise { + if (this._timelineSet.findEventById(event.getId()) || event.status !== null) { + return; + } + + if (this.tail.has(event.replyEventId)) { + this.tail.delete(event.replyEventId); + } + this.tail.add(event.getId()); + + if (!event.replyEventId || !this._timelineSet.findEventById(event.replyEventId)) { + this.root = event.getId(); + } + + event.setThread(this); + this._timelineSet.addLiveEvent(event); + + if (this.ready) { + this.client.decryptEventIfNeeded(event, {}); + this.emit("Thread.update", this); + } else { + this.emit("Thread.update", this); + } + } + + /** + * Completes the reply chain with all events + * missing from the current sync data + * Will fire "Thread.ready" + */ + public async fetchReplyChain(): Promise { + if (!this.ready) { + let mxEvent = this.room.findEventById(this.rootEvent.replyEventId); + if (!mxEvent) { + mxEvent = await this.fetchEventById( + this.rootEvent.getRoomId(), + this.rootEvent.replyEventId, + ); + } + + this.addEvent(mxEvent); + if (mxEvent.replyEventId) { + await this.fetchReplyChain(); + } else { + await this.decryptEvents(); + this.emit("Thread.ready", this); + } + } + } + + private async decryptEvents(): Promise { + await Promise.allSettled( + Array.from(this._timelineSet.getLiveTimeline().getEvents()).map(event => { + return this.client.decryptEventIfNeeded(event, {}); + }), + ); + } + + /** + * Fetches an event over the network + */ + private async fetchEventById(roomId: string, eventId: string): Promise { + const response = await this.client.http.authedRequest( + undefined, + "GET", + `/rooms/${roomId}/event/${eventId}`, + ); + return new MatrixEvent(response); + } + + /** + * Finds an event by ID in the current thread + */ + public findEventById(eventId: string) { + return this._timelineSet.findEventById(eventId); + } + + /** + * Determines thread's ready status + */ + public get ready(): boolean { + return this.rootEvent.replyEventId === undefined; + } + + /** + * The thread ID, which is the same as the root event ID + */ + public get id(): string { + return this.root; + } + + /** + * The thread root event + */ + public get rootEvent(): MatrixEvent { + return this.findEventById(this.root); + } + + /** + * The number of messages in the thread + */ + public get length(): number { + return this._timelineSet.getLiveTimeline().getEvents().length; + } + + /** + * A set of mxid participating to the thread + */ + public get participants(): Set { + const participants = new Set(); + this._timelineSet.getLiveTimeline().getEvents().forEach(event => { + participants.add(event.getSender()); + }); + return participants; + } + + /** + * A read-only getter to access the timeline set + */ + public get timelineSet(): EventTimelineSet { + return this._timelineSet; + } + + /** + * A getter for the last event added to the thread + */ + public get replyToEvent(): MatrixEvent { + const events = this._timelineSet.getLiveTimeline().getEvents(); + return events[events.length -1]; + } +} diff --git a/src/sync.ts b/src/sync.ts index 70cefef7b67..005b6e7eed7 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -148,6 +148,7 @@ export class SyncApi { this.opts.resolveInvitesToProfiles = this.opts.resolveInvitesToProfiles || false; this.opts.pollTimeout = this.opts.pollTimeout || (30 * 1000); this.opts.pendingEventOrdering = this.opts.pendingEventOrdering || PendingEventOrdering.Chronological; + this.opts.experimentalThreadSupport = this.opts.experimentalThreadSupport === true; if (!opts.canResetEntireTimeline) { opts.canResetEntireTimeline = (roomId: string) => { @@ -283,8 +284,9 @@ export class SyncApi { return; } leaveObj.timeline = leaveObj.timeline || {}; - const timelineEvents = - this.mapSyncEventsFormat(leaveObj.timeline, room); + const events = this.mapSyncEventsFormat(leaveObj.timeline, room); + const [timelineEvents, threadedEvents] = this.partitionThreadedEvents(events); + const stateEvents = this.mapSyncEventsFormat(leaveObj.state, room); // set the back-pagination token. Do this *before* adding any @@ -293,17 +295,39 @@ export class SyncApi { EventTimeline.BACKWARDS); this.processRoomEvents(room, stateEvents, timelineEvents); + this.processThreadEvents(room, threadedEvents); room.recalculate(); client.store.storeRoom(room); client.emit("Room", room); - this.processEventsForNotifs(room, timelineEvents); + this.processEventsForNotifs(room, events); }); return rooms; }); } + /** + * Split events between the ones that will end up in the main + * room timeline versus the one that need to be processed in a thread + * @experimental + */ + public partitionThreadedEvents(events: MatrixEvent[]): [MatrixEvent[], MatrixEvent[]] { + if (this.opts.experimentalThreadSupport) { + return events.reduce((memo, event: MatrixEvent) => { + memo[event.replyEventId ? 1 : 0].push(event); + return memo; + }, [[], []]); + } else { + // When `experimentalThreadSupport` is disabled + // treat all events as timelineEvents + return [ + events, + [], + ]; + } + } + /** * Peek into a room. This will result in the room in question being synced so it * is accessible via getRooms(). Live updates for the room will be provided. @@ -1193,7 +1217,7 @@ export class SyncApi { // this helps large account to speed up faster // room::decryptCriticalEvent is in charge of decrypting all the events // required for a client to function properly - const timelineEvents = this.mapSyncEventsFormat(joinObj.timeline, room, false); + const events = this.mapSyncEventsFormat(joinObj.timeline, room, false); const ephemeralEvents = this.mapSyncEventsFormat(joinObj.ephemeral); const accountDataEvents = this.mapSyncEventsFormat(joinObj.account_data); @@ -1240,8 +1264,8 @@ export class SyncApi { // which we'll try to paginate but not get any new events (which // will stop us linking the empty timeline into the chain). // - for (let i = timelineEvents.length - 1; i >= 0; i--) { - const eventId = timelineEvents[i].getId(); + for (let i = events.length - 1; i >= 0; i--) { + const eventId = events[i].getId(); if (room.getTimelineForEvent(eventId)) { debuglog("Already have event " + eventId + " in limited " + "sync - not resetting"); @@ -1250,7 +1274,7 @@ export class SyncApi { // we might still be missing some of the events before i; // we don't want to be adding them to the end of the // timeline because that would put them out of order. - timelineEvents.splice(0, i); + events.splice(0, i); // XXX: there's a problem here if the skipped part of the // timeline modifies the state set in stateEvents, because @@ -1280,7 +1304,10 @@ export class SyncApi { } } + const [timelineEvents, threadedEvents] = this.partitionThreadedEvents(events); + this.processRoomEvents(room, stateEvents, timelineEvents, syncEventData.fromCache); + this.processThreadEvents(room, threadedEvents); // set summary after processing events, // because it will trigger a name calculation @@ -1301,7 +1328,7 @@ export class SyncApi { client.emit("Room", room); } - this.processEventsForNotifs(room, timelineEvents); + this.processEventsForNotifs(room, events); const processRoomEvent = async (e) => { client.emit("event", e); @@ -1322,6 +1349,7 @@ export class SyncApi { await utils.promiseMapSeries(stateEvents, processRoomEvent); await utils.promiseMapSeries(timelineEvents, processRoomEvent); + await utils.promiseMapSeries(threadedEvents, processRoomEvent); ephemeralEvents.forEach(function(e) { client.emit("event", e); }); @@ -1341,10 +1369,13 @@ export class SyncApi { leaveRooms.forEach((leaveObj) => { const room = leaveObj.room; const stateEvents = this.mapSyncEventsFormat(leaveObj.state, room); - const timelineEvents = this.mapSyncEventsFormat(leaveObj.timeline, room); + const events = this.mapSyncEventsFormat(leaveObj.timeline, room); const accountDataEvents = this.mapSyncEventsFormat(leaveObj.account_data); + const [timelineEvents, threadedEvents] = this.partitionThreadedEvents(events); + this.processRoomEvents(room, stateEvents, timelineEvents); + this.processThreadEvents(room, threadedEvents); room.addAccountData(accountDataEvents); room.recalculate(); @@ -1353,7 +1384,7 @@ export class SyncApi { client.emit("Room", room); } - this.processEventsForNotifs(room, timelineEvents); + this.processEventsForNotifs(room, events); stateEvents.forEach(function(e) { client.emit("event", e); @@ -1361,6 +1392,9 @@ export class SyncApi { timelineEvents.forEach(function(e) { client.emit("event", e); }); + threadedEvents.forEach(function(e) { + client.emit("event", e); + }); accountDataEvents.forEach(function(e) { client.emit("event", e); }); @@ -1680,6 +1714,15 @@ export class SyncApi { room.addLiveEvents(timelineEventList || [], null, fromCache); } + /** + * @experimental + */ + private processThreadEvents(room: Room, threadedEvents: MatrixEvent[]): void { + threadedEvents.forEach(event => { + room.addThreadedEvent(event); + }); + } + /** * Takes a list of timelineEvents and adds and adds to notifEvents * as appropriate.