From 32b30b49f3a3e24b13f2041607be80a82b862198 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Jul 2024 17:26:40 -0500 Subject: [PATCH 01/14] Add `bump_stamp` --- synapse/api/constants.py | 4 + synapse/handlers/sliding_sync.py | 159 ++++++++++++++++++----- synapse/storage/databases/main/stream.py | 6 +- synapse/types/handlers/__init__.py | 8 ++ 4 files changed, 139 insertions(+), 38 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 9265a271d2..cb12133cb1 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -128,9 +128,13 @@ class EventTypes: SpaceParent: Final = "m.space.parent" Reaction: Final = "m.reaction" + Sticker: Final = "m.sticker" + LocationShare: Final = "m.location" CallInvite: Final = "m.call.invite" + PollStart: Final = "m.poll.start" + class ToDeviceEventTypes: RoomKeyRequest: Final = "m.room_key_request" diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 0cebeea592..b11a138553 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -53,6 +53,17 @@ logger = logging.getLogger(__name__) +# The event types that we should consider when sorting the rooms in the sync response. +DEFAULT_BUMP_EVENT_TYPES = { + EventTypes.Message, + EventTypes.Encrypted, + EventTypes.Sticker, + EventTypes.LocationShare, + EventTypes.CallInvite, + EventTypes.PollStart, +} + + def filter_membership_for_sync( *, membership: str, user_id: str, sender: Optional[str] ) -> bool: @@ -122,6 +133,62 @@ def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser": return attr.evolve(self, **kwds) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _SortedRoomMembershipForUser(_RoomMembershipForUser): + """ + Same as `_RoomMembershipForUser` but with an additional `bump_stamp` attribute. + + Attributes: + event_id: The event ID of the membership event + event_pos: The stream position of the membership event + membership: The membership state of the user in the room + sender: The person who sent the membership event + newly_joined: Whether the user newly joined the room during the given token + range + bump_stamp: The `stream_ordering` of the last event according to the + `bump_event_types`. This helps clients sort more readily without them needing to + pull in a bunch of the timeline to determine the last activity. + `bump_event_types` is a thing because for example, we don't want display name + changes to mark the room as unread and bump it to the top. For encrypted rooms, + we just have to consider any activity as a bump because we can't see the content + and the client has to figure it out for themselves. + """ + + bump_stamp: int + + @classmethod + def from_room_membership_for_user( + cls, + rooms_membership_for_user: _RoomMembershipForUser, + *, + bump_stamp: int, + ) -> "_SortedRoomMembershipForUser": + """ + Copy from the given `_RoomMembershipForUser`, sprinkle on the specific + `_SortedRoomMembershipForUser` fields to create a new + `_SortedRoomMembershipForUser`. + """ + + # Based on `attr.evolve(...)` but we can copy from a child class to a parent + child_kwargs: Dict[str, Any] = {} + attrs = attr.fields(rooms_membership_for_user.__class__) + for a in attrs: + attr_name = a.name # To deal with private attributes. + init_name = a.alias + child_kwargs[init_name] = getattr(rooms_membership_for_user, attr_name) + + return cls(bump_stamp=bump_stamp, **child_kwargs) + + def copy_and_replace(self, **kwds: Any) -> "_SortedRoomMembershipForUser": + return attr.evolve(self, **kwds) + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _RelevantRoomEntry: + room_sync_config: RoomSyncConfig + room_membership_for_user: _SortedRoomMembershipForUser + + class SlidingSyncHandler: def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() @@ -242,7 +309,7 @@ async def current_sync_for_user( # Assemble sliding window lists lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} - relevant_room_map: Dict[str, RoomSyncConfig] = {} + relevant_room_map: Dict[str, _RelevantRoomEntry] = {} if sync_config.lists: # Get all of the room IDs that the user should be able to see in the sync # response @@ -267,40 +334,45 @@ async def current_sync_for_user( ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: for range in list_config.ranges: - sliced_room_ids = [ - room_id - # Both sides of range are inclusive - for room_id, _ in sorted_room_info[range[0] : range[1] + 1] - ] + # Both sides of range are inclusive + sliced_room_info = sorted_room_info[range[0] : range[1] + 1] ops.append( SlidingSyncResult.SlidingWindowList.Operation( op=OperationType.SYNC, range=range, - room_ids=sliced_room_ids, + room_ids=[room_id for room_id, _ in sliced_room_info], ) ) # Take the superset of the `RoomSyncConfig` for each room - for room_id in sliced_room_ids: - if relevant_room_map.get(room_id) is not None: + for room_id, room_membership_for_user in sliced_room_info: + relevant_room_entry = relevant_room_map.get(room_id) + if relevant_room_entry is not None: + existing_room_sync_config = ( + relevant_room_entry.room_sync_config + ) + # Take the highest timeline limit if ( - relevant_room_map[room_id].timeline_limit + existing_room_sync_config.timeline_limit < list_config.timeline_limit ): - relevant_room_map[room_id].timeline_limit = ( + existing_room_sync_config.timeline_limit = ( list_config.timeline_limit ) # Union the required state - relevant_room_map[room_id].required_state.update( + existing_room_sync_config.required_state.update( list_config.required_state ) else: - relevant_room_map[room_id] = RoomSyncConfig( - timeline_limit=list_config.timeline_limit, - required_state=set(list_config.required_state), + relevant_room_map[room_id] = _RelevantRoomEntry( + room_sync_config=RoomSyncConfig( + timeline_limit=list_config.timeline_limit, + required_state=set(list_config.required_state), + ), + room_membership_for_user=room_membership_for_user, ) lists[list_key] = SlidingSyncResult.SlidingWindowList( @@ -312,12 +384,12 @@ async def current_sync_for_user( # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} - for room_id, room_sync_config in relevant_room_map.items(): + for room_id, relevant_room_entry in relevant_room_map.items(): room_sync_result = await self.get_room_sync_data( user=sync_config.user, room_id=room_id, - room_sync_config=room_sync_config, - rooms_membership_for_user_at_to_token=sync_room_map[room_id], + room_sync_config=relevant_room_entry.room_sync_config, + room_membership_for_user_at_to_token=relevant_room_entry.room_membership_for_user, from_token=from_token, to_token=to_token, ) @@ -768,7 +840,7 @@ async def sort_rooms( self, sync_room_map: Dict[str, _RoomMembershipForUser], to_token: StreamToken, - ) -> List[Tuple[str, _RoomMembershipForUser]]: + ) -> List[Tuple[str, _SortedRoomMembershipForUser]]: """ Sort by `stream_ordering` of the last event that the user should see in the room. `stream_ordering` is unique so we get a stable sort. @@ -782,13 +854,15 @@ async def sort_rooms( A sorted list of room IDs by `stream_ordering` along with membership information. """ + sorted_sync_rooms: List[Tuple[str, _SortedRoomMembershipForUser]] = [] + # Assemble a map of room ID to the `stream_ordering` of the last activity that the # user should see in the room (<= `to_token`) - last_activity_in_room_map: Dict[str, int] = {} for room_id, room_for_user in sync_room_map.items(): # If they are fully-joined to the room, let's find the latest activity # at/before the `to_token`. if room_for_user.membership == Membership.JOIN: + # TODO: Take `DEFAULT_BUMP_EVENT_TYPES` into account last_event_result = ( await self.store.get_last_event_pos_in_room_before_stream_ordering( room_id, to_token.room_key @@ -802,16 +876,30 @@ async def sort_rooms( _, event_pos = last_event_result - last_activity_in_room_map[room_id] = event_pos.stream + sorted_sync_rooms.append( + ( + room_id, + _SortedRoomMembershipForUser.from_room_membership_for_user( + room_for_user, bump_stamp=event_pos.stream + ), + ) + ) else: # Otherwise, if the user has left/been invited/knocked/been banned from # a room, they shouldn't see anything past that point. - last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + sorted_sync_rooms.append( + ( + room_id, + _SortedRoomMembershipForUser.from_room_membership_for_user( + room_for_user, bump_stamp=room_for_user.event_pos.stream + ), + ) + ) return sorted( - sync_room_map.items(), + sorted_sync_rooms, # Sort by the last activity (stream_ordering) in the room - key=lambda room_info: last_activity_in_room_map[room_info[0]], + key=lambda room_info: room_info[1].bump_stamp, # We want descending order reverse=True, ) @@ -821,7 +909,7 @@ async def get_room_sync_data( user: UserID, room_id: str, room_sync_config: RoomSyncConfig, - rooms_membership_for_user_at_to_token: _RoomMembershipForUser, + room_membership_for_user_at_to_token: _SortedRoomMembershipForUser, from_token: Optional[StreamToken], to_token: StreamToken, ) -> SlidingSyncResult.RoomResult: @@ -835,7 +923,7 @@ async def get_room_sync_data( room_id: The room ID to fetch data for room_sync_config: Config for what data we should fetch for a room in the sync response. - rooms_membership_for_user_at_to_token: Membership information for the user + room_membership_for_user_at_to_token: Membership information for the user in the room at the time of `to_token`. from_token: The point in the stream to sync from. to_token: The point in the stream to sync up to. @@ -855,7 +943,7 @@ async def get_room_sync_data( if ( room_sync_config.timeline_limit > 0 # No timeline for invite/knock rooms (just `stripped_state`) - and rooms_membership_for_user_at_to_token.membership + and room_membership_for_user_at_to_token.membership not in (Membership.INVITE, Membership.KNOCK) ): limited = False @@ -868,12 +956,12 @@ async def get_room_sync_data( # We're going to paginate backwards from the `to_token` from_bound = to_token.room_key # People shouldn't see past their leave/ban event - if rooms_membership_for_user_at_to_token.membership in ( + if room_membership_for_user_at_to_token.membership in ( Membership.LEAVE, Membership.BAN, ): from_bound = ( - rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token() + room_membership_for_user_at_to_token.event_pos.to_room_stream_token() ) # Determine whether we should limit the timeline to the token range. @@ -888,7 +976,7 @@ async def get_room_sync_data( to_bound = ( from_token.room_key if from_token is not None - and not rooms_membership_for_user_at_to_token.newly_joined + and not room_membership_for_user_at_to_token.newly_joined else None ) @@ -925,7 +1013,7 @@ async def get_room_sync_data( self.storage_controllers, user.to_string(), timeline_events, - is_peeking=rooms_membership_for_user_at_to_token.membership + is_peeking=room_membership_for_user_at_to_token.membership != Membership.JOIN, filter_send_to_client=True, ) @@ -980,16 +1068,16 @@ async def get_room_sync_data( # Figure out any stripped state events for invite/knocks. This allows the # potential joiner to identify the room. stripped_state: List[JsonDict] = [] - if rooms_membership_for_user_at_to_token.membership in ( + if room_membership_for_user_at_to_token.membership in ( Membership.INVITE, Membership.KNOCK, ): # This should never happen. If someone is invited/knocked on room, then # there should be an event for it. - assert rooms_membership_for_user_at_to_token.event_id is not None + assert room_membership_for_user_at_to_token.event_id is not None invite_or_knock_event = await self.store.get_event( - rooms_membership_for_user_at_to_token.event_id + room_membership_for_user_at_to_token.event_id ) stripped_state = [] @@ -1005,7 +1093,7 @@ async def get_room_sync_data( stripped_state.append(strip_event(invite_or_knock_event)) # TODO: Handle state resets. For example, if we see - # `rooms_membership_for_user_at_to_token.membership = Membership.LEAVE` but + # `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but # `required_state` doesn't include it, we should indicate to the client that a # state reset happened. Perhaps we should indicate this by setting `initial: # True` and empty `required_state`. @@ -1031,6 +1119,7 @@ async def get_room_sync_data( stripped_state=stripped_state, prev_batch=prev_batch_token, limited=limited, + bump_stamp=room_membership_for_user_at_to_token.bump_stamp, # TODO: Dummy values joined_count=0, invited_count=0, diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index d34376b8df..b53b9ff035 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1207,9 +1207,9 @@ def get_last_event_pos_in_room_before_stream_ordering_txn( min_stream = end_token.stream max_stream = end_token.get_max_stream_pos() - # We use `union all` because we don't need any of the deduplication logic - # (`union` is really a union + distinct). `UNION ALL` does preserve the - # ordering of the operand queries but there is no actual gurantee that it + # We use `UNION ALL` because we don't need any of the deduplication logic + # (`UNION` is really a `UNION` + `DISTINCT`). `UNION ALL` does preserve the + # ordering of the operand queries but there is no actual guarantee that it # has this behavior in all scenarios so we need the extra `ORDER BY` at the # bottom. sql = """ diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 3cd3c8fb0f..de5a863408 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -176,6 +176,13 @@ class RoomResult: `/rooms//messages` API to retrieve earlier messages. limited: True if their are more events than fit between the given position and now. Sync again to get more. + bump_stamp: The `stream_ordering` of the last event according to the + `bump_event_types`. This helps clients sort more readily without them + needing to pull in a bunch of the timeline to determine the last activity. + `bump_event_types` is a thing because for example, we don't want display + name changes to mark the room as unread and bump it to the top. For + encrypted rooms, we just have to consider any activity as a bump because we + can't see the content and the client has to figure it out for themselves. joined_count: The number of users with membership of join, including the client's own user ID. (same as sync `v2 m.joined_member_count`) invited_count: The number of users with membership of invite. (same as sync v2 @@ -209,6 +216,7 @@ class RoomResult: prev_batch: Optional[StreamToken] # Only optional because it won't be included for invite/knock rooms with `stripped_state` limited: Optional[bool] + bump_stamp: int joined_count: int invited_count: int notification_count: int From 5a89eba7d0b35cbdb30033571cacf05d60e4a6c9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Jul 2024 17:39:54 -0500 Subject: [PATCH 02/14] Get rid of extra room_id in tuple once sorted --- synapse/handlers/sliding_sync.py | 57 ++++++++++++++++------------- tests/handlers/test_sliding_sync.py | 8 ++-- 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index b11a138553..af58357198 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -123,6 +123,7 @@ class _RoomMembershipForUser: range """ + room_id: str event_id: Optional[str] event_pos: PersistedEventPosition membership: str @@ -327,7 +328,7 @@ async def current_sync_for_user( sync_config.user, sync_room_map, list_config.filters, to_token ) - sorted_room_info = await self.sort_rooms( + sorted_sync_rooms = await self.sort_rooms( filtered_sync_room_map, to_token ) @@ -335,19 +336,24 @@ async def current_sync_for_user( if list_config.ranges: for range in list_config.ranges: # Both sides of range are inclusive - sliced_room_info = sorted_room_info[range[0] : range[1] + 1] + sliced_sync_rooms = sorted_sync_rooms[range[0] : range[1] + 1] ops.append( SlidingSyncResult.SlidingWindowList.Operation( op=OperationType.SYNC, range=range, - room_ids=[room_id for room_id, _ in sliced_room_info], + room_ids=[ + room_membership.room_id + for room_membership in sliced_sync_rooms + ], ) ) # Take the superset of the `RoomSyncConfig` for each room - for room_id, room_membership_for_user in sliced_room_info: - relevant_room_entry = relevant_room_map.get(room_id) + for room_membership_for_user in sliced_sync_rooms: + relevant_room_entry = relevant_room_map.get( + room_membership_for_user.room_id + ) if relevant_room_entry is not None: existing_room_sync_config = ( relevant_room_entry.room_sync_config @@ -367,16 +373,20 @@ async def current_sync_for_user( list_config.required_state ) else: - relevant_room_map[room_id] = _RelevantRoomEntry( - room_sync_config=RoomSyncConfig( - timeline_limit=list_config.timeline_limit, - required_state=set(list_config.required_state), - ), - room_membership_for_user=room_membership_for_user, + relevant_room_map[room_membership_for_user.room_id] = ( + _RelevantRoomEntry( + room_sync_config=RoomSyncConfig( + timeline_limit=list_config.timeline_limit, + required_state=set( + list_config.required_state + ), + ), + room_membership_for_user=room_membership_for_user, + ) ) lists[list_key] = SlidingSyncResult.SlidingWindowList( - count=len(sorted_room_info), + count=len(sorted_sync_rooms), ops=ops, ) @@ -461,6 +471,7 @@ async def get_sync_room_ids_for_user( # (below) because they are potentially from the current snapshot time # instead from the time of the `to_token`. room_for_user.room_id: _RoomMembershipForUser( + room_id=room_for_user.room_id, event_id=room_for_user.event_id, event_pos=room_for_user.event_pos, membership=room_for_user.membership, @@ -561,6 +572,7 @@ async def get_sync_room_ids_for_user( is not None ): sync_room_id_set[room_id] = _RoomMembershipForUser( + room_id=room_id, event_id=first_membership_change_after_to_token.prev_event_id, event_pos=first_membership_change_after_to_token.prev_event_pos, membership=first_membership_change_after_to_token.prev_membership, @@ -655,6 +667,7 @@ async def get_sync_room_ids_for_user( # is their own leave event if last_membership_change_in_from_to_range.membership == Membership.LEAVE: filtered_sync_room_id_set[room_id] = _RoomMembershipForUser( + room_id=room_id, event_id=last_membership_change_in_from_to_range.event_id, event_pos=last_membership_change_in_from_to_range.event_pos, membership=last_membership_change_in_from_to_range.membership, @@ -840,7 +853,7 @@ async def sort_rooms( self, sync_room_map: Dict[str, _RoomMembershipForUser], to_token: StreamToken, - ) -> List[Tuple[str, _SortedRoomMembershipForUser]]: + ) -> List[_SortedRoomMembershipForUser]: """ Sort by `stream_ordering` of the last event that the user should see in the room. `stream_ordering` is unique so we get a stable sort. @@ -854,7 +867,7 @@ async def sort_rooms( A sorted list of room IDs by `stream_ordering` along with membership information. """ - sorted_sync_rooms: List[Tuple[str, _SortedRoomMembershipForUser]] = [] + sorted_sync_rooms: List[_SortedRoomMembershipForUser] = [] # Assemble a map of room ID to the `stream_ordering` of the last activity that the # user should see in the room (<= `to_token`) @@ -877,29 +890,23 @@ async def sort_rooms( _, event_pos = last_event_result sorted_sync_rooms.append( - ( - room_id, - _SortedRoomMembershipForUser.from_room_membership_for_user( - room_for_user, bump_stamp=event_pos.stream - ), + _SortedRoomMembershipForUser.from_room_membership_for_user( + room_for_user, bump_stamp=event_pos.stream ) ) else: # Otherwise, if the user has left/been invited/knocked/been banned from # a room, they shouldn't see anything past that point. sorted_sync_rooms.append( - ( - room_id, - _SortedRoomMembershipForUser.from_room_membership_for_user( - room_for_user, bump_stamp=room_for_user.event_pos.stream - ), + _SortedRoomMembershipForUser.from_room_membership_for_user( + room_for_user, bump_stamp=room_for_user.event_pos.stream ) ) return sorted( sorted_sync_rooms, # Sort by the last activity (stream_ordering) in the room - key=lambda room_info: room_info[1].bump_stamp, + key=lambda room_info: room_info.bump_stamp, # We want descending order reverse=True, ) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 713a798703..73b95df4fe 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -2312,7 +2312,7 @@ def test_sort_activity_basic(self) -> None: ) # Sort the rooms (what we're testing) - sorted_room_info = self.get_success( + sorted_sync_rooms = self.get_success( self.sliding_sync_handler.sort_rooms( sync_room_map=sync_room_map, to_token=after_rooms_token, @@ -2320,7 +2320,7 @@ def test_sort_activity_basic(self) -> None: ) self.assertEqual( - [room_id for room_id, _ in sorted_room_info], + [room_membership.room_id for room_membership in sorted_sync_rooms], [room_id2, room_id1], ) @@ -2395,7 +2395,7 @@ def test_activity_after_xxx(self, room1_membership: str) -> None: ) # Sort the rooms (what we're testing) - sorted_room_info = self.get_success( + sorted_sync_rooms = self.get_success( self.sliding_sync_handler.sort_rooms( sync_room_map=sync_room_map, to_token=after_rooms_token, @@ -2403,7 +2403,7 @@ def test_activity_after_xxx(self, room1_membership: str) -> None: ) self.assertEqual( - [room_id for room_id, _ in sorted_room_info], + [room_membership.room_id for room_membership in sorted_sync_rooms], [room_id2, room_id1, room_id3], "Corresponding map to disambiguate the opaque room IDs: " + str( From 7c3207bc7e051046cdb002a1f80fcd84388a9430 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Jul 2024 18:13:47 -0500 Subject: [PATCH 03/14] Consider `DEFAULT_BUMP_EVENT_TYPES` --- synapse/handlers/sliding_sync.py | 19 ++++---- synapse/storage/databases/main/stream.py | 29 ++++++++---- tests/handlers/test_sliding_sync.py | 58 ++++++++++++++++++++++++ tests/storage/test_stream.py | 41 +++++++++++++++++ 4 files changed, 131 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index af58357198..b00bc527ec 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -875,19 +875,17 @@ async def sort_rooms( # If they are fully-joined to the room, let's find the latest activity # at/before the `to_token`. if room_for_user.membership == Membership.JOIN: - # TODO: Take `DEFAULT_BUMP_EVENT_TYPES` into account last_event_result = ( await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key + room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES ) ) - # If the room has no events at/before the `to_token`, this is probably a - # mistake in the code that generates the `sync_room_map` since that should - # only give us rooms that the user had membership in during the token range. - assert last_event_result is not None - - _, event_pos = last_event_result + # By default, just choose the membership event position + event_pos = room_for_user.event_pos + # But if we found a bump event, use that instead + if last_event_result is not None: + _, event_pos = last_event_result sorted_sync_rooms.append( _SortedRoomMembershipForUser.from_room_membership_for_user( @@ -897,6 +895,11 @@ async def sort_rooms( else: # Otherwise, if the user has left/been invited/knocked/been banned from # a room, they shouldn't see anything past that point. + # + # FIXME: It's possible that people should see beyond this point in + # invited/knocked cases if for example the room has + # `invite`/`world_readable` history visibility, see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 sorted_sync_rooms.append( _SortedRoomMembershipForUser.from_room_membership_for_user( room_for_user, bump_stamp=room_for_user.event_pos.stream diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index b53b9ff035..55e23b1e5c 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1178,6 +1178,7 @@ async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str, end_token: RoomStreamToken, + event_types: Optional[Iterable[str]] = None, ) -> Optional[Tuple[str, PersistedEventPosition]]: """ Returns the ID and event position of the last event in a room at or before a @@ -1186,6 +1187,7 @@ async def get_last_event_pos_in_room_before_stream_ordering( Args: room_id end_token: The token used to stream from + event_types: Optional allowlist of event types to filter by Returns: The ID of the most recent event and it's position, or None if there are no @@ -1207,6 +1209,14 @@ def get_last_event_pos_in_room_before_stream_ordering_txn( min_stream = end_token.stream max_stream = end_token.get_max_stream_pos() + event_type_clause = "" + event_type_args = [] + if event_types is not None and len(event_types) > 0: + event_type_clause, event_type_args = make_in_list_sql_clause( + txn.database_engine, "type", event_types + ) + event_type_clause = f"AND {event_type_clause}" + # We use `UNION ALL` because we don't need any of the deduplication logic # (`UNION` is really a `UNION` + `DISTINCT`). `UNION ALL` does preserve the # ordering of the operand queries but there is no actual guarantee that it @@ -1218,6 +1228,7 @@ def get_last_event_pos_in_room_before_stream_ordering_txn( FROM events LEFT JOIN rejections USING (event_id) WHERE room_id = ? + %s AND ? < stream_ordering AND stream_ordering <= ? AND NOT outlier AND rejections.event_id IS NULL @@ -1229,6 +1240,7 @@ def get_last_event_pos_in_room_before_stream_ordering_txn( FROM events LEFT JOIN rejections USING (event_id) WHERE room_id = ? + %s AND stream_ordering <= ? AND NOT outlier AND rejections.event_id IS NULL @@ -1236,16 +1248,17 @@ def get_last_event_pos_in_room_before_stream_ordering_txn( LIMIT 1 ) AS b ORDER BY stream_ordering DESC - """ + """ % ( + event_type_clause, + event_type_clause, + ) txn.execute( sql, - ( - room_id, - min_stream, - max_stream, - room_id, - min_stream, - ), + [room_id] + + event_type_args + + [min_stream, max_stream, room_id] + + event_type_args + + [min_stream], ) for instance_name, stream_ordering, topological_ordering, event_id in txn: diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 73b95df4fe..636bd32369 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -2414,3 +2414,61 @@ def test_activity_after_xxx(self, room1_membership: str) -> None: } ), ) + + def test_default_bump_event_types(self) -> None: + """ + Test that we only consider `bump_event_types` when sorting rooms. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as( + user1_id, + tok=user1_tok, + ) + message_response = self.helper.send(room_id1, "message in room1", tok=user1_tok) + room_id2 = self.helper.create_room_as( + user1_id, + tok=user1_tok, + ) + self.helper.send(room_id2, "message in room2", tok=user1_tok) + + # Send a reaction in room1 but it shouldn't affect the sort order + # because reactions are not part of the `DEFAULT_BUMP_EVENT_TYPES` + self.helper.send_event( + room_id1, + type=EventTypes.Reaction, + content={ + "m.relates_to": { + "event_id": message_response["event_id"], + "key": "👍", + "rel_type": "m.annotation", + } + }, + tok=user1_tok, + ) + + after_rooms_token = self.event_sources.get_current_token() + + # Get the rooms the user should be syncing with + sync_room_map = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=None, + to_token=after_rooms_token, + ) + ) + + # Sort the rooms (what we're testing) + sorted_sync_rooms = self.get_success( + self.sliding_sync_handler.sort_rooms( + sync_room_map=sync_room_map, + to_token=after_rooms_token, + ) + ) + + self.assertEqual( + [room_membership.room_id for room_membership in sorted_sync_rooms], + # room2 sorts before room1 because reactions don't bump the room + [room_id2, room_id1], + ) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index aad46b1b44..9dea1af8ea 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -556,6 +556,47 @@ def test_last_event_before_sharded_token(self) -> None: ), ) + def test_restrict_event_types(self) -> None: + """ + Test that we only consider given `event_types` when finding the last event + before a token. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response = self.helper.send_event( + room_id1, + type="org.matrix.special_message", + content={"body": "before1, target!"}, + tok=user1_tok, + ) + self.helper.send(room_id1, "before2", tok=user1_tok) + + after_room_token = self.event_sources.get_current_token() + + # Send some events after the token + self.helper.send_event( + room_id1, + type="org.matrix.special_message", + content={"body": "after1"}, + tok=user1_tok, + ) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event_result = self.get_success( + self.store.get_last_event_pos_in_room_before_stream_ordering( + room_id=room_id1, + end_token=after_room_token.room_key, + event_types=["org.matrix.special_message"], + ) + ) + assert last_event_result is not None + last_event_id, _ = last_event_result + + # Make sure it's the last event before the token + self.assertEqual(last_event_id, event_response["event_id"]) + class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): """ From 176d023e72b88a2d46246673f621303dddab1af4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Jul 2024 18:28:37 -0500 Subject: [PATCH 04/14] Add changelog --- changelog.d/17395.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17395.feature diff --git a/changelog.d/17395.feature b/changelog.d/17395.feature new file mode 100644 index 0000000000..1e702b7010 --- /dev/null +++ b/changelog.d/17395.feature @@ -0,0 +1 @@ +Sort by and add `rooms.bump_stamp` for easier client-side sorting in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. From aee961992b4240293c05b02a07196a003208f29a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Jul 2024 18:31:54 -0500 Subject: [PATCH 05/14] Fix lints --- synapse/storage/databases/main/stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 55e23b1e5c..be81025355 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1178,7 +1178,7 @@ async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str, end_token: RoomStreamToken, - event_types: Optional[Iterable[str]] = None, + event_types: Optional[Collection[str]] = None, ) -> Optional[Tuple[str, PersistedEventPosition]]: """ Returns the ID and event position of the last event in a room at or before a @@ -1210,7 +1210,7 @@ def get_last_event_pos_in_room_before_stream_ordering_txn( max_stream = end_token.get_max_stream_pos() event_type_clause = "" - event_type_args = [] + event_type_args: List[str] = [] if event_types is not None and len(event_types) > 0: event_type_clause, event_type_args = make_in_list_sql_clause( txn.database_engine, "type", event_types From ef7a4d77625af07376242d81ff7adc2121da901f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 3 Jul 2024 18:51:46 -0500 Subject: [PATCH 06/14] Get rid of complexity Also fixes problems when running with old Python (CI failure): https://github.com/element-hq/synapse/actions/runs/9786096129/job/27020394279?pr=17395 ``` builtins.AttributeError: 'Attribute' object has no attribute 'alias' ``` --- synapse/handlers/sliding_sync.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index b00bc527ec..ec77d43e7f 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -170,15 +170,15 @@ def from_room_membership_for_user( `_SortedRoomMembershipForUser`. """ - # Based on `attr.evolve(...)` but we can copy from a child class to a parent - child_kwargs: Dict[str, Any] = {} - attrs = attr.fields(rooms_membership_for_user.__class__) - for a in attrs: - attr_name = a.name # To deal with private attributes. - init_name = a.alias - child_kwargs[init_name] = getattr(rooms_membership_for_user, attr_name) - - return cls(bump_stamp=bump_stamp, **child_kwargs) + return cls( + room_id=rooms_membership_for_user.room_id, + event_id=rooms_membership_for_user.event_id, + event_pos=rooms_membership_for_user.event_pos, + membership=rooms_membership_for_user.membership, + sender=rooms_membership_for_user.sender, + newly_joined=rooms_membership_for_user.newly_joined, + bump_stamp=bump_stamp, + ) def copy_and_replace(self, **kwds: Any) -> "_SortedRoomMembershipForUser": return attr.evolve(self, **kwds) From b559bcecfa9fe2d124e34a0ffa1fcb4795504529 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 4 Jul 2024 10:42:46 -0500 Subject: [PATCH 07/14] Don't double document attributes See https://github.com/element-hq/synapse/pull/17395#discussion_r1665692157 --- synapse/handlers/sliding_sync.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index ec77d43e7f..0c5792dbe1 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -140,12 +140,6 @@ class _SortedRoomMembershipForUser(_RoomMembershipForUser): Same as `_RoomMembershipForUser` but with an additional `bump_stamp` attribute. Attributes: - event_id: The event ID of the membership event - event_pos: The stream position of the membership event - membership: The membership state of the user in the room - sender: The person who sent the membership event - newly_joined: Whether the user newly joined the room during the given token - range bump_stamp: The `stream_ordering` of the last event according to the `bump_event_types`. This helps clients sort more readily without them needing to pull in a bunch of the timeline to determine the last activity. From 812a6e07720c50faf56309d28f98a39dcfb2d8f1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 4 Jul 2024 10:57:23 -0500 Subject: [PATCH 08/14] Add `bump_stamp` to API response --- synapse/rest/client/sync.py | 1 + tests/rest/client/test_sync.py | 94 ++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 1d955a2e89..8b0b9e6c8f 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -976,6 +976,7 @@ async def encode_rooms( serialized_rooms: Dict[str, JsonDict] = {} for room_id, room_result in rooms.items(): serialized_rooms[room_id] = { + "bump_stamp": room_result.bump_stamp, "joined_count": room_result.joined_count, "invited_count": room_result.invited_count, "notification_count": room_result.notification_count, diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 966c622e14..f9e4c0c718 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -1937,6 +1937,100 @@ def test_rooms_incremental_sync(self) -> None: channel.json_body["rooms"][room_id1], ) + def test_rooms_bump_stamp(self) -> None: + """ + Test that `bump_stamp` is present and pointing to relevant events. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as( + user1_id, + tok=user1_tok, + ) + event_response1 = message_response = self.helper.send( + room_id1, "message in room1", tok=user1_tok + ) + event_pos1 = self.get_success( + self.store.get_position_for_event(event_response1["event_id"]) + ) + room_id2 = self.helper.create_room_as( + user1_id, + tok=user1_tok, + ) + send_response2 = self.helper.send(room_id2, "message in room2", tok=user1_tok) + event_pos2 = self.get_success( + self.store.get_position_for_event(send_response2["event_id"]) + ) + + # Send a reaction in room1 but it shouldn't affect the `bump_stamp` + # because reactions are not part of the `DEFAULT_BUMP_EVENT_TYPES` + self.helper.send_event( + room_id1, + type=EventTypes.Reaction, + content={ + "m.relates_to": { + "event_id": message_response["event_id"], + "key": "👍", + "rel_type": "m.annotation", + } + }, + tok=user1_tok, + ) + + # Make the Sliding Sync request + timeline_limit = 100 + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": timeline_limit, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Make sure it has the foo-list we requested + self.assertListEqual( + list(channel.json_body["lists"].keys()), + ["foo-list"], + channel.json_body["lists"].keys(), + ) + + # Make sure the list includes the rooms in the right order + self.assertListEqual( + list(channel.json_body["lists"]["foo-list"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 1], + # room2 sorts before room1 because reactions don't bump the room + "room_ids": [room_id2, room_id1], + } + ], + channel.json_body["lists"]["foo-list"], + ) + + # Make sure the `bump_stamp` for room2 is correct + self.assertEqual( + channel.json_body["rooms"][room_id2]["bump_stamp"], + event_pos2.stream, + channel.json_body["rooms"][room_id2], + ) + + # Make sure the `bump_stamp` for room2 is correct + self.assertEqual( + channel.json_body["rooms"][room_id1]["bump_stamp"], + event_pos1.stream, + channel.json_body["rooms"][room_id1], + ) + def test_rooms_newly_joined_incremental_sync(self) -> None: """ Test that when we make an incremental sync with a `newly_joined` `rooms`, we are From b59ccf74801ff80007f5379be04663257b5641e7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 4 Jul 2024 11:32:07 -0500 Subject: [PATCH 09/14] Revert back to sorting by the latest event See https://github.com/element-hq/synapse/pull/17395#discussion_r1665783386 This better ensures that clients receive any new events regardless of what they are. --- synapse/handlers/sliding_sync.py | 98 +++++++++-------------------- tests/handlers/test_sliding_sync.py | 12 ++-- tests/rest/client/test_sync.py | 22 ++++--- 3 files changed, 49 insertions(+), 83 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 0c5792dbe1..8fc884ad1c 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -134,54 +134,10 @@ def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser": return attr.evolve(self, **kwds) -@attr.s(slots=True, frozen=True, auto_attribs=True) -class _SortedRoomMembershipForUser(_RoomMembershipForUser): - """ - Same as `_RoomMembershipForUser` but with an additional `bump_stamp` attribute. - - Attributes: - bump_stamp: The `stream_ordering` of the last event according to the - `bump_event_types`. This helps clients sort more readily without them needing to - pull in a bunch of the timeline to determine the last activity. - `bump_event_types` is a thing because for example, we don't want display name - changes to mark the room as unread and bump it to the top. For encrypted rooms, - we just have to consider any activity as a bump because we can't see the content - and the client has to figure it out for themselves. - """ - - bump_stamp: int - - @classmethod - def from_room_membership_for_user( - cls, - rooms_membership_for_user: _RoomMembershipForUser, - *, - bump_stamp: int, - ) -> "_SortedRoomMembershipForUser": - """ - Copy from the given `_RoomMembershipForUser`, sprinkle on the specific - `_SortedRoomMembershipForUser` fields to create a new - `_SortedRoomMembershipForUser`. - """ - - return cls( - room_id=rooms_membership_for_user.room_id, - event_id=rooms_membership_for_user.event_id, - event_pos=rooms_membership_for_user.event_pos, - membership=rooms_membership_for_user.membership, - sender=rooms_membership_for_user.sender, - newly_joined=rooms_membership_for_user.newly_joined, - bump_stamp=bump_stamp, - ) - - def copy_and_replace(self, **kwds: Any) -> "_SortedRoomMembershipForUser": - return attr.evolve(self, **kwds) - - @attr.s(slots=True, frozen=True, auto_attribs=True) class _RelevantRoomEntry: room_sync_config: RoomSyncConfig - room_membership_for_user: _SortedRoomMembershipForUser + room_membership_for_user: _RoomMembershipForUser class SlidingSyncHandler: @@ -847,7 +803,7 @@ async def sort_rooms( self, sync_room_map: Dict[str, _RoomMembershipForUser], to_token: StreamToken, - ) -> List[_SortedRoomMembershipForUser]: + ) -> List[_RoomMembershipForUser]: """ Sort by `stream_ordering` of the last event that the user should see in the room. `stream_ordering` is unique so we get a stable sort. @@ -861,31 +817,27 @@ async def sort_rooms( A sorted list of room IDs by `stream_ordering` along with membership information. """ - sorted_sync_rooms: List[_SortedRoomMembershipForUser] = [] - # Assemble a map of room ID to the `stream_ordering` of the last activity that the # user should see in the room (<= `to_token`) + last_activity_in_room_map: Dict[str, int] = {} for room_id, room_for_user in sync_room_map.items(): # If they are fully-joined to the room, let's find the latest activity # at/before the `to_token`. if room_for_user.membership == Membership.JOIN: last_event_result = ( await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + room_id, to_token.room_key ) ) - # By default, just choose the membership event position - event_pos = room_for_user.event_pos - # But if we found a bump event, use that instead - if last_event_result is not None: - _, event_pos = last_event_result + # If the room has no events at/before the `to_token`, this is probably a + # mistake in the code that generates the `sync_room_map` since that should + # only give us rooms that the user had membership in during the token range. + assert last_event_result is not None - sorted_sync_rooms.append( - _SortedRoomMembershipForUser.from_room_membership_for_user( - room_for_user, bump_stamp=event_pos.stream - ) - ) + _, event_pos = last_event_result + + last_activity_in_room_map[room_id] = event_pos.stream else: # Otherwise, if the user has left/been invited/knocked/been banned from # a room, they shouldn't see anything past that point. @@ -894,16 +846,12 @@ async def sort_rooms( # invited/knocked cases if for example the room has # `invite`/`world_readable` history visibility, see # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 - sorted_sync_rooms.append( - _SortedRoomMembershipForUser.from_room_membership_for_user( - room_for_user, bump_stamp=room_for_user.event_pos.stream - ) - ) + last_activity_in_room_map[room_id] = room_for_user.event_pos.stream return sorted( - sorted_sync_rooms, + sync_room_map.values(), # Sort by the last activity (stream_ordering) in the room - key=lambda room_info: room_info.bump_stamp, + key=lambda room_info: last_activity_in_room_map[room_info.room_id], # We want descending order reverse=True, ) @@ -913,7 +861,7 @@ async def get_room_sync_data( user: UserID, room_id: str, room_sync_config: RoomSyncConfig, - room_membership_for_user_at_to_token: _SortedRoomMembershipForUser, + room_membership_for_user_at_to_token: _RoomMembershipForUser, from_token: Optional[StreamToken], to_token: StreamToken, ) -> SlidingSyncResult.RoomResult: @@ -1102,6 +1050,20 @@ async def get_room_sync_data( # state reset happened. Perhaps we should indicate this by setting `initial: # True` and empty `required_state`. + # Figure out the last bump event in the room + last_bump_event_result = ( + await self.store.get_last_event_pos_in_room_before_stream_ordering( + room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES + ) + ) + + # By default, just choose the membership event position + bump_stamp = room_membership_for_user_at_to_token.event_pos.stream + # But if we found a bump event, use that instead + if last_bump_event_result: + _, bump_event_pos = last_bump_event_result + bump_stamp = bump_event_pos.stream + return SlidingSyncResult.RoomResult( # TODO: Dummy value name=None, @@ -1123,7 +1085,7 @@ async def get_room_sync_data( stripped_state=stripped_state, prev_batch=prev_batch_token, limited=limited, - bump_stamp=room_membership_for_user_at_to_token.bump_stamp, + bump_stamp=bump_stamp, # TODO: Dummy values joined_count=0, invited_count=0, diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 636bd32369..de59d614ed 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -2417,7 +2417,8 @@ def test_activity_after_xxx(self, room1_membership: str) -> None: def test_default_bump_event_types(self) -> None: """ - Test that we only consider `bump_event_types` when sorting rooms. + Test that we only consider the *latest* event in the room when sorting (not + `bump_event_types`). """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -2433,8 +2434,8 @@ def test_default_bump_event_types(self) -> None: ) self.helper.send(room_id2, "message in room2", tok=user1_tok) - # Send a reaction in room1 but it shouldn't affect the sort order - # because reactions are not part of the `DEFAULT_BUMP_EVENT_TYPES` + # Send a reaction in room1 which isn't in `DEFAULT_BUMP_EVENT_TYPES` but we only + # care about sorting by the *latest* event in the room. self.helper.send_event( room_id1, type=EventTypes.Reaction, @@ -2469,6 +2470,7 @@ def test_default_bump_event_types(self) -> None: self.assertEqual( [room_membership.room_id for room_membership in sorted_sync_rooms], - # room2 sorts before room1 because reactions don't bump the room - [room_id2, room_id1], + # room1 sorts before room2 because it has the latest event (the reaction). + # We only care about the *latest* event in the room. + [room_id1, room_id2], ) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index f9e4c0c718..07c6ed1570 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -2010,27 +2010,29 @@ def test_rooms_bump_stamp(self) -> None: { "op": "SYNC", "range": [0, 1], - # room2 sorts before room1 because reactions don't bump the room - "room_ids": [room_id2, room_id1], + # room1 sorts before room2 because it has the latest event (the + # reaction) + "room_ids": [room_id1, room_id2], } ], channel.json_body["lists"]["foo-list"], ) - # Make sure the `bump_stamp` for room2 is correct - self.assertEqual( - channel.json_body["rooms"][room_id2]["bump_stamp"], - event_pos2.stream, - channel.json_body["rooms"][room_id2], - ) - - # Make sure the `bump_stamp` for room2 is correct + # The `bump_stamp` for room1 should point at the latest message (not the + # reaction since it's not one of the `DEFAULT_BUMP_EVENT_TYPES`) self.assertEqual( channel.json_body["rooms"][room_id1]["bump_stamp"], event_pos1.stream, channel.json_body["rooms"][room_id1], ) + # The `bump_stamp` for room2 should point at the latest message + self.assertEqual( + channel.json_body["rooms"][room_id2]["bump_stamp"], + event_pos2.stream, + channel.json_body["rooms"][room_id2], + ) + def test_rooms_newly_joined_incremental_sync(self) -> None: """ Test that when we make an incremental sync with a `newly_joined` `rooms`, we are From 341283af5493d87e97c1c4e60ed254de4e121642 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 4 Jul 2024 11:39:46 -0500 Subject: [PATCH 10/14] Use correct live location share event type See https://github.com/element-hq/synapse/pull/17395#discussion_r1665835371 --- synapse/api/constants.py | 2 +- synapse/handlers/sliding_sync.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index cb12133cb1..12d18137e0 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -129,7 +129,7 @@ class EventTypes: Reaction: Final = "m.reaction" Sticker: Final = "m.sticker" - LocationShare: Final = "m.location" + LiveLocationShareStart: Final = "m.beacon_info" CallInvite: Final = "m.call.invite" diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 8fc884ad1c..7239f764b8 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -58,9 +58,9 @@ EventTypes.Message, EventTypes.Encrypted, EventTypes.Sticker, - EventTypes.LocationShare, EventTypes.CallInvite, EventTypes.PollStart, + EventTypes.LiveLocationShareStart, } From cee609181ef7eb375f0e49127cab685c651abcb2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 4 Jul 2024 11:42:48 -0500 Subject: [PATCH 11/14] Update changelog --- changelog.d/17395.feature | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/17395.feature b/changelog.d/17395.feature index 1e702b7010..0c95b9f4a9 100644 --- a/changelog.d/17395.feature +++ b/changelog.d/17395.feature @@ -1 +1 @@ -Sort by and add `rooms.bump_stamp` for easier client-side sorting in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. +Add `rooms.bump_stamp` for easier client-side sorting in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. From fc9f8a5104faead55d42acec882a86080b3bd100 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 4 Jul 2024 11:50:51 -0500 Subject: [PATCH 12/14] Remove `_RelevantRoomEntry` complexity --- synapse/handlers/sliding_sync.py | 52 +++++++++++--------------------- 1 file changed, 18 insertions(+), 34 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 7239f764b8..4d05ae5a68 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -134,12 +134,6 @@ def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser": return attr.evolve(self, **kwds) -@attr.s(slots=True, frozen=True, auto_attribs=True) -class _RelevantRoomEntry: - room_sync_config: RoomSyncConfig - room_membership_for_user: _RoomMembershipForUser - - class SlidingSyncHandler: def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() @@ -260,7 +254,7 @@ async def current_sync_for_user( # Assemble sliding window lists lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} - relevant_room_map: Dict[str, _RelevantRoomEntry] = {} + relevant_room_map: Dict[str, RoomSyncConfig] = {} if sync_config.lists: # Get all of the room IDs that the user should be able to see in the sync # response @@ -286,29 +280,26 @@ async def current_sync_for_user( if list_config.ranges: for range in list_config.ranges: # Both sides of range are inclusive - sliced_sync_rooms = sorted_sync_rooms[range[0] : range[1] + 1] + sliced_sync_room_ids = [ + room_membership.room_id + # Both sides of range are inclusive + for room_membership in sorted_sync_rooms[ + range[0] : range[1] + 1 + ] + ] ops.append( SlidingSyncResult.SlidingWindowList.Operation( op=OperationType.SYNC, range=range, - room_ids=[ - room_membership.room_id - for room_membership in sliced_sync_rooms - ], + room_ids=sliced_sync_room_ids, ) ) # Take the superset of the `RoomSyncConfig` for each room - for room_membership_for_user in sliced_sync_rooms: - relevant_room_entry = relevant_room_map.get( - room_membership_for_user.room_id - ) - if relevant_room_entry is not None: - existing_room_sync_config = ( - relevant_room_entry.room_sync_config - ) - + for room_id in sliced_sync_room_ids: + existing_room_sync_config = relevant_room_map.get(room_id) + if existing_room_sync_config is not None: # Take the highest timeline limit if ( existing_room_sync_config.timeline_limit @@ -323,16 +314,9 @@ async def current_sync_for_user( list_config.required_state ) else: - relevant_room_map[room_membership_for_user.room_id] = ( - _RelevantRoomEntry( - room_sync_config=RoomSyncConfig( - timeline_limit=list_config.timeline_limit, - required_state=set( - list_config.required_state - ), - ), - room_membership_for_user=room_membership_for_user, - ) + relevant_room_map[room_id] = RoomSyncConfig( + timeline_limit=list_config.timeline_limit, + required_state=set(list_config.required_state), ) lists[list_key] = SlidingSyncResult.SlidingWindowList( @@ -344,12 +328,12 @@ async def current_sync_for_user( # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} - for room_id, relevant_room_entry in relevant_room_map.items(): + for room_id, room_sync_config in relevant_room_map.items(): room_sync_result = await self.get_room_sync_data( user=sync_config.user, room_id=room_id, - room_sync_config=relevant_room_entry.room_sync_config, - room_membership_for_user_at_to_token=relevant_room_entry.room_membership_for_user, + room_sync_config=room_sync_config, + room_membership_for_user_at_to_token=sync_room_map[room_id], from_token=from_token, to_token=to_token, ) From 76dce98a5274091c285877d8581aa9e8f2d15996 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 4 Jul 2024 11:53:34 -0500 Subject: [PATCH 13/14] Prefer `is not None` --- synapse/handlers/sliding_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 4d05ae5a68..88b1ff7a6a 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1044,7 +1044,7 @@ async def get_room_sync_data( # By default, just choose the membership event position bump_stamp = room_membership_for_user_at_to_token.event_pos.stream # But if we found a bump event, use that instead - if last_bump_event_result: + if last_bump_event_result is not None: _, bump_event_pos = last_bump_event_result bump_stamp = bump_event_pos.stream From fac17cb2c664721d085325820fad4955d120348d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Jul 2024 12:27:34 -0500 Subject: [PATCH 14/14] Slight correction to comment since we no longer sort on the serve by bump types --- synapse/handlers/sliding_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index a60ee0b016..8e2f751c02 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -54,7 +54,7 @@ logger = logging.getLogger(__name__) -# The event types that we should consider when sorting the rooms in the sync response. +# The event types that clients should consider as new activity. DEFAULT_BUMP_EVENT_TYPES = { EventTypes.Message, EventTypes.Encrypted,