Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Re-implement unread counts #7736

Merged
merged 47 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
80d03b1
Store unread messages in the database
babolivier Jun 23, 2020
9db0bac
Count the number of unread messages on sync requests
babolivier Jun 24, 2020
e33b891
Handle redactions
babolivier Jun 25, 2020
0ee1963
Only insert rows for local users
babolivier Jun 25, 2020
9cb9378
Handle lack of read receipt in a room
babolivier Jun 25, 2020
9e723dd
Don't mark an event as unread for its own sender
babolivier Jun 25, 2020
5399a7b
Run linter
babolivier Jun 25, 2020
580b499
Fetch joined users from current_state_events instead of room_memberships
babolivier Jun 26, 2020
ff8b39d
Send unread count to the push gateway
babolivier Jun 26, 2020
eafdf9f
Add a redaction helper for tests
babolivier Jun 26, 2020
994f267
Add test case for unread counts
babolivier Jun 26, 2020
00fd951
Run linter on tests
babolivier Jun 26, 2020
a008f6f
Implement the latest changes in the MSC
babolivier Jul 2, 2020
fe5352f
Merge branch 'develop' into babolivier/unread_counts
babolivier Jul 2, 2020
d980c86
Incorporate most review comments
babolivier Jul 6, 2020
3d3e048
Merge branch 'babolivier/unread_counts' of github.com:matrix-org/syna…
babolivier Jul 6, 2020
572753b
Merge branch 'develop' into babolivier/unread_counts
babolivier Jul 6, 2020
bf1b1ec
Lint
babolivier Jul 6, 2020
e07fef1
Lint
babolivier Jul 6, 2020
8b9d073
Fix column type
babolivier Jul 6, 2020
287c263
Fix default value
babolivier Jul 6, 2020
b5972a9
Update port_db's list of bool columns
babolivier Jul 6, 2020
e9e2d88
Add a cache to get_unread_message_count_for_user
babolivier Jul 6, 2020
bc44ad4
Fix function call and cache invalidation
babolivier Jul 6, 2020
9b9f6e2
Lint
babolivier Jul 6, 2020
881fed3
Invalidate the cache in the main thread
babolivier Jul 6, 2020
52ddc4d
Fix push badge computation
babolivier Jul 6, 2020
ea01eff
Lint
babolivier Jul 6, 2020
aa0a56b
Use invalidate_cache_and_stream to invalidate the cache across workers
babolivier Jul 8, 2020
420573a
Process the cache stream first for incoming replication
babolivier Jul 8, 2020
79464f8
Fix receipts replication test
babolivier Jul 8, 2020
48a2a00
Merge branch 'develop' into babolivier/unread_counts
babolivier Jul 20, 2020
0710078
Revert "Fix receipts replication test"
babolivier Jul 20, 2020
ea10fc1
Revert "Process the cache stream first for incoming replication"
babolivier Jul 24, 2020
0ea0792
Revert "Use invalidate_cache_and_stream to invalidate the cache acros…
babolivier Jul 24, 2020
096aca9
Fix read receipt cache invalidation (hopefully)
babolivier Jul 24, 2020
28f3e2b
Lint
babolivier Jul 24, 2020
2302b6c
Merge branch 'develop' of github.com:matrix-org/synapse into babolivi…
babolivier Jul 27, 2020
d40557b
Incorporate review
babolivier Jul 27, 2020
5d3aaf7
Fix unread messages test
babolivier Jul 27, 2020
6f50007
Lint
babolivier Jul 27, 2020
b265bef
Merge branch 'develop' of github.com:matrix-org/synapse into babolivi…
babolivier Jul 27, 2020
debedb3
Remove default value for count_as_unread
babolivier Jul 29, 2020
57ded3e
Merge branch 'develop' of github.com:matrix-org/synapse into babolivi…
babolivier Jul 29, 2020
82d9f39
Match the latest changes on the MSC
babolivier Jul 29, 2020
17e922a
Fix test
babolivier Jul 29, 2020
d7fdc35
Typo
babolivier Jul 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7736.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add unread messages count to sync responses.
21 changes: 21 additions & 0 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class JoinedSyncResult:
account_data = attr.ib(type=List[JsonDict])
unread_notifications = attr.ib(type=JsonDict)
summary = attr.ib(type=Optional[JsonDict])
unread_count = attr.ib(type=int)

def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
Expand Down Expand Up @@ -951,6 +952,23 @@ async def unread_notifs_for_room_id(
# count is whatever it was last time.
return None

async def unread_messages_for_room_id(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
self, room_id: str, sync_config: SyncConfig,
) -> int:
"""Retrieve the count of unread message for the current user in the given room.
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""
with Measure(self.clock, "unread_messages_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
room_id=room_id,
receipt_type="m.read",
)

count = await self.store.get_unread_message_count_for_user(
sync_config.user.to_string(), room_id, last_unread_event_id
)
return count

async def generate_sync_result(
self,
sync_config: SyncConfig,
Expand Down Expand Up @@ -1877,6 +1895,8 @@ async def _generate_room_entry(

if room_builder.rtype == "joined":
unread_notifications = {} # type: Dict[str, str]

unread_count = await self.unread_messages_for_room_id(room_id, sync_config)
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
Expand All @@ -1885,6 +1905,7 @@ async def _generate_room_entry(
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
unread_count=unread_count,
)

if room_sync or always_include:
Expand Down
8 changes: 4 additions & 4 deletions synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ def get_badge_count(store, user_id):
if room_id in my_receipts_by_room:
last_unread_event_id = my_receipts_by_room[room_id]

notifs = yield (
store.get_unread_event_push_actions_by_room_for_user(
room_id, user_id, last_unread_event_id
unread_count = yield defer.ensureDeferred(
store.get_unread_message_count_for_user(
user_id, room_id, last_unread_event_id
)
)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
badge += 1 if notifs["notify_count"] else 0
badge += 1 if unread_count else 0
return badge


Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ def serialize(events):
result["ephemeral"] = {"events": ephemeral_events}
result["unread_notifications"] = room.unread_notifications
result["summary"] = room.summary
result["org.matrix.msc2654.unread_count"] = room.unread_count

return result

Expand Down
98 changes: 97 additions & 1 deletion synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from canonicaljson import json
from prometheus_client import Counter

from twisted.enterprise.adbapi import Connection
from twisted.internet import defer

import synapse.metrics
Expand Down Expand Up @@ -61,6 +62,13 @@
["type", "origin_type", "origin_entity"],
)

STATE_EVENT_TYPES_TO_MARK_UNREAD = [
EventTypes.PowerLevels,
EventTypes.Topic,
EventTypes.Name,
EventTypes.RoomAvatar,
]
babolivier marked this conversation as resolved.
Show resolved Hide resolved


def encode_json(json_object):
"""
Expand Down Expand Up @@ -977,7 +985,7 @@ def _update_metadata_tables_txn(
txn, events=[event for event, _ in events_and_contexts]
)

for event, _ in events_and_contexts:
for event, context in events_and_contexts:
if event.type == EventTypes.Name:
# Insert into the event_search table.
self._store_room_name_txn(txn, event)
Expand All @@ -990,6 +998,8 @@ def _update_metadata_tables_txn(
elif event.type == EventTypes.Redaction and event.redacts is not None:
# Insert into the redactions table.
self._store_redaction(txn, event)
# If the redacted event was unread, revert that.
self._handle_redacted_unread_event_txn(txn, event)
elif event.type == EventTypes.Retention:
# Update the room_retention table.
self._store_retention_policy_for_room_txn(txn, event)
Expand All @@ -1009,6 +1019,8 @@ def _update_metadata_tables_txn(
if isinstance(expiry_ts, int) and not event.is_state():
self._insert_event_expiry_txn(txn, event.event_id, expiry_ts)

self._maybe_insert_unread_event_txn(txn, event, context)

# Insert into the room_memberships table.
self._store_room_members_txn(
txn,
Expand Down Expand Up @@ -1614,3 +1626,87 @@ def f(txn, stream_ordering):
await self.db.runInteraction("locally_reject_invite", f, stream_ordering)

return stream_ordering

def _maybe_insert_unread_event_txn(
self, txn: Connection, event: EventBase, context: EventContext,
):
"""Mark the event as unread for every current member of the room if it passes the
conditions for that.

These conditions are: the event must either have a non-empty string body, be an
encrypted message, or be either a power levels event, a room name event or a room
topic event, and must be neither rejected or soft-failed nor an edit or a notice.

Args:
txn: The transaction to use to retrieve room members and to mark the event
as unread.
event: The event to evaluate and maybe mark as unread.
context: The context in which the event was sent (used to figure out whether
the event has been rejected).
"""
content = event.content

is_edit = (
content.get("m.relates_to", {}).get("rel_type") == RelationTypes.REPLACE
)
is_notice = not event.is_state() and content.get("msgtype") == "m.notice"

# We don't want rejected or soft-failed events, edits or notices to be marked
# unread.
if (
context.rejected
or is_edit
or is_notice
or event.internal_metadata.is_soft_failed()
):
return
babolivier marked this conversation as resolved.
Show resolved Hide resolved

body_exists = isinstance(content.get("body"), str)
babolivier marked this conversation as resolved.
Show resolved Hide resolved
is_state_event_to_mark_unread = (
event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD
)
is_encrypted_message = (
not event.is_state() and event.type == EventTypes.Encrypted
)

# We want to mark unread messages with a non-empty string body, some state events
# (power levels, room name, room topic, room avatar) and encrypted messages.
if not (body_exists or is_state_event_to_mark_unread or is_encrypted_message):
return
babolivier marked this conversation as resolved.
Show resolved Hide resolved

# Get the list of users that are currently joined to the room.
users_in_room = self.db.simple_select_onecol_txn(
txn=txn,
table="current_state_events",
keyvalues={"membership": Membership.JOIN, "room_id": event.room_id},
retcol="state_key",
) # type: list

# Only insert rows for local users.
local_users_in_room = list(
filter(lambda user_id: self.hs.is_mine_id(user_id), users_in_room)
)

# Mark the message as unread for every user currently in the room, except the
# sender of the event (because even if they haven't sent a read receipt for the
# event, it seems dumb to show it as unread to its sender).
babolivier marked this conversation as resolved.
Show resolved Hide resolved
self.db.simple_insert_many_txn(
txn=txn,
table="unread_messages",
values=[
{
"user_id": user_id,
"stream_ordering": event.internal_metadata.stream_ordering,
"room_id": event.room_id,
"event_id": event.event_id,
}
for user_id in local_users_in_room
if user_id != event.sender
],
)

def _handle_redacted_unread_event_txn(self, txn: Connection, event: EventBase):
# Redact every row for this event in the unread_messages table.
self.db.simple_delete_txn(
txn=txn, table="unread_messages", keyvalues={"event_id": event.redacts}
)
58 changes: 57 additions & 1 deletion synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.database import Database
from synapse.storage.database import Database, LoggingTransaction
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import get_domain_from_id
from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks
Expand Down Expand Up @@ -1362,6 +1362,62 @@ def get_next_event_to_expire_txn(txn):
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)

async def get_unread_message_count_for_user(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
self, user_id: str, room_id: str, last_read_event_id: str,
):
return await self.db.runInteraction(
"get_unread_message_count_for_user",
self._get_unread_message_count_for_user_txn,
user_id,
room_id,
last_read_event_id,
)

def _get_unread_message_count_for_user_txn(
self,
txn: LoggingTransaction,
babolivier marked this conversation as resolved.
Show resolved Hide resolved
user_id: str,
room_id: str,
last_read_event_id: Optional[str],
):
babolivier marked this conversation as resolved.
Show resolved Hide resolved
if last_read_event_id:
# Get the stream ordering for the last read event.
stream_ordering = self.db.simple_select_one_onecol_txn(
txn=txn,
table="events",
keyvalues={"room_id": room_id, "event_id": last_read_event_id},
retcol="stream_ordering",
)
else:
# If there's no read receipt for that room, it probably means the user hasn't
# opened it yet, in which case use the stream ID of their join event.
# We can't just set it to 0 otherwise messages from other local users from
# before this user joined will be counted as well.
txn.execute(
"""
SELECT stream_ordering FROM room_memberships
babolivier marked this conversation as resolved.
Show resolved Hide resolved
LEFT JOIN events USING (event_id, room_id)
WHERE membership = 'join'
AND user_id = ?
AND room_id = ?
""",
(user_id, room_id),
)
row = txn.fetchone()
stream_ordering = row[0]
babolivier marked this conversation as resolved.
Show resolved Hide resolved

# Count the messages that qualify as unread after the stream ordering we've just
# retrieved.
sql = """
SELECT COUNT(*) FROM unread_messages
WHERE user_id = ? AND room_id = ? AND stream_ordering > ?
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""

txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()

return row[0] if row else 0


AllNewEventsResult = namedtuple(
"AllNewEventsResult",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
babolivier marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE TABLE IF NOT EXISTS unread_messages(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
user_id TEXT NOT NULL, -- The user for which the message is unread.
stream_ordering BIGINT NOT NULL, -- The position of the message in the event stream.
room_id TEXT NOT NULL,
event_id TEXT NOT NULL, -- The ID of the message, we need it to handle redactions.
babolivier marked this conversation as resolved.
Show resolved Hide resolved
UNIQUE (user_id, stream_ordering)
);
20 changes: 20 additions & 0 deletions tests/rest/client/v1/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,26 @@ def send_event(

return channel.json_body

def redact(self, room_id, event_id, txn_id=None, tok=None, expect_code=200):
if txn_id is None:
txn_id = "m%s" % (str(time.time()))

path = "/_matrix/client/r0/rooms/%s/redact/%s/%s" % (room_id, event_id, txn_id)
if tok:
path = path + "?access_token=%s" % tok

request, channel = make_request(
self.hs.get_reactor(), "PUT", path, json.dumps({}).encode("utf8")
)
render(request, self.resource, self.hs.get_reactor())

assert int(channel.result["code"]) == expect_code, (
"Expected: %d, got: %d, resp: %r"
% (expect_code, int(channel.result["code"]), channel.result["body"])
)

return channel.json_body

def _read_write_state(
self,
room_id: str,
Expand Down
Loading