diff --git a/changelog.d/7736.feature b/changelog.d/7736.feature new file mode 100644 index 000000000000..c97864677aac --- /dev/null +++ b/changelog.d/7736.feature @@ -0,0 +1 @@ +Add unread messages count to sync responses. diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index cfd24d2f061d..184038556fad 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -25,6 +25,7 @@ from canonicaljson import json from prometheus_client import Counter +from twisted.enterprise.adbapi import Connection from twisted.internet import defer import synapse.metrics @@ -61,6 +62,12 @@ ["type", "origin_type", "origin_entity"], ) +STATE_EVENT_TYPES_TO_MARK_UNREAD = [ + EventTypes.PowerLevels, + EventTypes.Topic, + EventTypes.Name, +] + def encode_json(json_object): """ @@ -977,7 +984,7 @@ def _update_metadata_tables_txn( txn, events=[event for event, _ in events_and_contexts] ) - for event, _ in events_and_contexts: + for event, context in events_and_contexts: if event.type == EventTypes.Name: # Insert into the event_search table. self._store_room_name_txn(txn, event) @@ -1009,6 +1016,8 @@ def _update_metadata_tables_txn( if isinstance(expiry_ts, int) and not event.is_state(): self._insert_event_expiry_txn(txn, event.event_id, expiry_ts) + self._maybe_insert_unread_event_txn(txn, event, context) + # Insert into the room_memberships table. self._store_room_members_txn( txn, @@ -1614,3 +1623,72 @@ def f(txn, stream_ordering): await self.db.runInteraction("locally_reject_invite", f, stream_ordering) return stream_ordering + + def _maybe_insert_unread_event_txn( + self, txn: Connection, event: EventBase, context: EventContext, + ): + """Mark the event as unread for every current member of the room if it passes the + conditions for that. + + These conditions are: the event must either have a body, be an encrypted message, + or be either a power levels event, a room name event or a room topic event, and + must be neither rejected or soft-failed nor an edit or a notice. + + Args: + txn: The transaction to use to retrieve room members and to mark the event + as unread. + event: The event to evaluate and maybe mark as unread. + context: The context in which the event was sent (used to figure out whether + the event has been rejected). + """ + content = event.content + + is_edit = ( + content.get("m.relates_to", {}).get("rel_type") == RelationTypes.REPLACE + ) + is_notice = not event.is_state() and content.get("msgtype") == "m.notice" + + # We don't want rejected or soft-failed events, edits or notices to be marked + # unread. + if ( + context.rejected + or is_edit + or is_notice + or event.internal_metadata.is_soft_failed() + ): + return + + body_exists = content.get("body") is not None + is_state_event_to_mark_unread = ( + event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD + ) + is_encrypted_message = ( + not event.is_state() and event.type == EventTypes.Encrypted + ) + + # We want to mark unread messages with a body, some state events (power levels, + # room name, room topic) and encrypted messages. + if not (body_exists or is_state_event_to_mark_unread or is_encrypted_message): + return + + # Get the list of users that are currently joined to the room. + users_in_room = self.db.simple_select_onecol_txn( + txn=txn, + table="room_memberships", + keyvalues={"membership": Membership.JOIN, "room_id": event.room_id}, + retcol="user_id", + ) + + # Mark the message as unread for every user currently in the room. + self.db.simple_insert_many_txn( + txn=txn, + table="unread_messages", + values=[ + { + "user_id": user_id, + "stream_ordering": event.internal_metadata.stream_ordering, + "room_id": event.room_id, + } + for user_id in users_in_room + ], + ) diff --git a/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql new file mode 100644 index 000000000000..3c7d83b9bbd4 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/09unread_messages.sql @@ -0,0 +1,21 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS unread_messages( + user_id TEXT NOT NULL, -- The user for which the message is unread. + stream_ordering BIGINT NOT NULL, -- The position of the message in the event stream. + room_id TEXT NOT NULL, + UNIQUE (user_id, stream_ordering) +); \ No newline at end of file