Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clarifications for event push action processing #13485

Merged
merged 5 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13485.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add comments about how event push actions are rotated.
53 changes: 33 additions & 20 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def get_unread_event_push_actions_by_room_for_user(
user_id: str,
) -> NotifCounts:
"""Get the notification count, the highlight count and the unread message count
for a given user in a given room after the given read receipt.
for a given user in a given room after their latest read receipt.

Note that this function assumes the user to be a current member of the room,
since it's either called by the sync handler to handle joined room entries, or by
Expand All @@ -238,9 +238,8 @@ async def get_unread_event_push_actions_by_room_for_user(
user_id: The user to retrieve the counts for.

Returns
A dict containing the counts mentioned earlier in this docstring,
respectively under the keys "notify_count", "highlight_count" and
"unread_count".
A NotifCounts object containing the notification count, the highlight count
and the unread message count.
"""
return await self.db_pool.runInteraction(
"get_unread_event_push_actions_by_room",
Expand All @@ -255,6 +254,7 @@ def _get_unread_counts_by_receipt_txn(
room_id: str,
user_id: str,
) -> NotifCounts:
# Get the stream ordering of the user's latest receipt in the room.
result = self.get_last_receipt_for_user_txn(
txn,
user_id,
Expand All @@ -266,13 +266,11 @@ def _get_unread_counts_by_receipt_txn(
),
)

stream_ordering = None
if result:
_, stream_ordering = result

if stream_ordering is None:
# Either last_read_event_id is None, or it's an event we don't have (e.g.
# because it's been purged), in which case retrieve the stream ordering for
else:
# If the user has no receipts in the room, retrieve the stream ordering for
# the latest membership event from this user in this room (which we assume is
# a join).
event_id = self.db_pool.simple_select_one_onecol_txn(
Expand All @@ -289,10 +287,26 @@ def _get_unread_counts_by_receipt_txn(
)

def _get_unread_counts_by_pos_txn(
self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
self,
txn: LoggingTransaction,
room_id: str,
user_id: str,
receipt_stream_ordering: int,
) -> NotifCounts:
"""Get the number of unread messages for a user/room that have happened
since the given stream ordering.

Args:
txn: The database transaction.
room_id: The room ID to get unread counts for.
user_id: The user ID to get unread counts for.
receipt_stream_ordering: The stream ordering of the user's latest
receipt in the room. If there are no receipts, the stream ordering
of the user's join event.

Returns
A NotifCounts object containing the notification count, the highlight count
and the unread message count.
"""

counts = NotifCounts()
Expand Down Expand Up @@ -320,7 +334,7 @@ def _get_unread_counts_by_pos_txn(
OR last_receipt_stream_ordering = ?
)
""",
(room_id, user_id, stream_ordering, stream_ordering),
(room_id, user_id, receipt_stream_ordering, receipt_stream_ordering),
)
row = txn.fetchone()

Expand All @@ -338,17 +352,20 @@ def _get_unread_counts_by_pos_txn(
AND stream_ordering > ?
AND highlight = 1
"""
txn.execute(sql, (user_id, room_id, stream_ordering))
txn.execute(sql, (user_id, room_id, receipt_stream_ordering))
row = txn.fetchone()
if row:
counts.highlight_count += row[0]

# Finally we need to count push actions that aren't included in the
# summary returned above, e.g. recent events that haven't been
# summarised yet, or the summary is empty due to a recent read receipt.
stream_ordering = max(stream_ordering, summary_stream_ordering)
# summary returned above. This might be due to recent events that haven't
# been summarised yet or the summary is out of date due to a recent read
# receipt.
start_unread_stream_ordering = max(
receipt_stream_ordering, summary_stream_ordering
)
notify_count, unread_count = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering
txn, room_id, user_id, start_unread_stream_ordering
)

counts.notify_count += notify_count
Expand Down Expand Up @@ -1151,8 +1168,6 @@ def _rotate_notifs_before_txn(
txn: The database transaction.
old_rotate_stream_ordering: The previous maximum event stream ordering.
rotate_to_stream_ordering: The new maximum event stream ordering to summarise.

Returns whether the archiving process has caught up or not.
"""

# Calculate the new counts that should be upserted into event_push_summary
Expand Down Expand Up @@ -1238,9 +1253,7 @@ def _rotate_notifs_before_txn(
(rotate_to_stream_ordering,),
)

async def _remove_old_push_actions_that_have_rotated(
self,
) -> None:
async def _remove_old_push_actions_that_have_rotated(self) -> None:
"""Clear out old push actions that have been summarised."""

# We want to clear out anything that is older than a day that *has* already
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def get_last_receipt_for_user_txn(
receipt_type: The receipt types to fetch.

Returns:
The latest receipt, if one exists.
The event ID and stream ordering of the latest receipt, if one exists.
"""

clause, args = make_in_list_sql_clause(
Expand Down