Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Re-implement unread counts #7736

Merged
merged 47 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
80d03b1
Store unread messages in the database
babolivier Jun 23, 2020
9db0bac
Count the number of unread messages on sync requests
babolivier Jun 24, 2020
e33b891
Handle redactions
babolivier Jun 25, 2020
0ee1963
Only insert rows for local users
babolivier Jun 25, 2020
9cb9378
Handle lack of read receipt in a room
babolivier Jun 25, 2020
9e723dd
Don't mark an event as unread for its own sender
babolivier Jun 25, 2020
5399a7b
Run linter
babolivier Jun 25, 2020
580b499
Fetch joined users from current_state_events instead of room_memberships
babolivier Jun 26, 2020
ff8b39d
Send unread count to the push gateway
babolivier Jun 26, 2020
eafdf9f
Add a redaction helper for tests
babolivier Jun 26, 2020
994f267
Add test case for unread counts
babolivier Jun 26, 2020
00fd951
Run linter on tests
babolivier Jun 26, 2020
a008f6f
Implement the latest changes in the MSC
babolivier Jul 2, 2020
fe5352f
Merge branch 'develop' into babolivier/unread_counts
babolivier Jul 2, 2020
d980c86
Incorporate most review comments
babolivier Jul 6, 2020
3d3e048
Merge branch 'babolivier/unread_counts' of github.com:matrix-org/syna…
babolivier Jul 6, 2020
572753b
Merge branch 'develop' into babolivier/unread_counts
babolivier Jul 6, 2020
bf1b1ec
Lint
babolivier Jul 6, 2020
e07fef1
Lint
babolivier Jul 6, 2020
8b9d073
Fix column type
babolivier Jul 6, 2020
287c263
Fix default value
babolivier Jul 6, 2020
b5972a9
Update port_db's list of bool columns
babolivier Jul 6, 2020
e9e2d88
Add a cache to get_unread_message_count_for_user
babolivier Jul 6, 2020
bc44ad4
Fix function call and cache invalidation
babolivier Jul 6, 2020
9b9f6e2
Lint
babolivier Jul 6, 2020
881fed3
Invalidate the cache in the main thread
babolivier Jul 6, 2020
52ddc4d
Fix push badge computation
babolivier Jul 6, 2020
ea01eff
Lint
babolivier Jul 6, 2020
aa0a56b
Use invalidate_cache_and_stream to invalidate the cache across workers
babolivier Jul 8, 2020
420573a
Process the cache stream first for incoming replication
babolivier Jul 8, 2020
79464f8
Fix receipts replication test
babolivier Jul 8, 2020
48a2a00
Merge branch 'develop' into babolivier/unread_counts
babolivier Jul 20, 2020
0710078
Revert "Fix receipts replication test"
babolivier Jul 20, 2020
ea10fc1
Revert "Process the cache stream first for incoming replication"
babolivier Jul 24, 2020
0ea0792
Revert "Use invalidate_cache_and_stream to invalidate the cache acros…
babolivier Jul 24, 2020
096aca9
Fix read receipt cache invalidation (hopefully)
babolivier Jul 24, 2020
28f3e2b
Lint
babolivier Jul 24, 2020
2302b6c
Merge branch 'develop' of github.com:matrix-org/synapse into babolivi…
babolivier Jul 27, 2020
d40557b
Incorporate review
babolivier Jul 27, 2020
5d3aaf7
Fix unread messages test
babolivier Jul 27, 2020
6f50007
Lint
babolivier Jul 27, 2020
b265bef
Merge branch 'develop' of github.com:matrix-org/synapse into babolivi…
babolivier Jul 27, 2020
debedb3
Remove default value for count_as_unread
babolivier Jul 29, 2020
57ded3e
Merge branch 'develop' of github.com:matrix-org/synapse into babolivi…
babolivier Jul 29, 2020
82d9f39
Match the latest changes on the MSC
babolivier Jul 29, 2020
17e922a
Fix test
babolivier Jul 29, 2020
d7fdc35
Typo
babolivier Jul 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7736.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add unread messages count to sync responses.
2 changes: 1 addition & 1 deletion scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ logger = logging.getLogger("synapse_port_db")


BOOLEAN_COLUMNS = {
"events": ["processed", "outlier", "contains_url"],
"events": ["processed", "outlier", "contains_url", "count_as_unread"],
"rooms": ["is_public"],
"event_edges": ["is_state"],
"presence_list": ["accepted"],
Expand Down
6 changes: 6 additions & 0 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class JoinedSyncResult:
account_data = attr.ib(type=List[JsonDict])
unread_notifications = attr.ib(type=JsonDict)
summary = attr.ib(type=Optional[JsonDict])
unread_count = attr.ib(type=int)

def __nonzero__(self) -> bool:
"""Make the result appear empty if there are no updates. This is used
Expand Down Expand Up @@ -1886,6 +1887,10 @@ async def _generate_room_entry(

if room_builder.rtype == "joined":
unread_notifications = {} # type: Dict[str, str]

unread_count = await self.store.get_unread_message_count_for_user(
room_id, sync_config.user.to_string(),
)
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
Expand All @@ -1894,6 +1899,7 @@ async def _generate_room_entry(
account_data=account_data_events,
unread_notifications=unread_notifications,
summary=summary,
unread_count=unread_count,
)

if room_sync or always_include:
Expand Down
17 changes: 4 additions & 13 deletions synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,13 @@ async def get_badge_count(store, user_id):
invites = await store.get_invited_rooms_for_local_user(user_id)
joins = await store.get_rooms_for_user(user_id)

my_receipts_by_room = await store.get_receipts_for_user(user_id, "m.read")

badge = len(invites)

for room_id in joins:
if room_id in my_receipts_by_room:
last_unread_event_id = my_receipts_by_room[room_id]

notifs = await (
store.get_unread_event_push_actions_by_room_for_user(
room_id, user_id, last_unread_event_id
)
)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
badge += 1 if notifs["notify_count"] else 0
unread_count = await store.get_unread_message_count_for_user(room_id, user_id)
# return one badge count per conversation, as count per
# message is so noisy as to be almost useless
badge += 1 if unread_count else 0
return badge


Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ def serialize(events):
result["ephemeral"] = {"events": ephemeral_events}
result["unread_notifications"] = room.unread_notifications
result["summary"] = room.summary
result["org.matrix.msc2654.unread_count"] = room.unread_count

return result

Expand Down
1 change: 1 addition & 0 deletions synapse/storage/data_stores/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def _invalidate_caches_for_event(

self.get_latest_event_ids_in_room.invalidate((room_id,))

self.get_unread_message_count_for_user.invalidate_many((room_id,))
self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))

if not backfilled:
Expand Down
48 changes: 47 additions & 1 deletion synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,47 @@
["type", "origin_type", "origin_entity"],
)

STATE_EVENT_TYPES_TO_MARK_UNREAD = {
EventTypes.Topic,
EventTypes.Name,
EventTypes.RoomAvatar,
EventTypes.Tombstone,
}


def should_count_as_unread(event: EventBase, context: EventContext) -> bool:
# Exclude rejected and soft-failed events.
if context.rejected or event.internal_metadata.is_soft_failed():
return False

# Exclude notices.
if (
not event.is_state()
and event.type == EventTypes.Message
and event.content.get("msgtype") == "m.notice"
):
return False

# Exclude edits.
relates_to = event.content.get("m.relates_to", {})
if relates_to.get("rel_type") == RelationTypes.REPLACE:
return False

# Mark events that have a non-empty string body as unread.
body = event.content.get("body")
if isinstance(body, str) and body:
return True

# Mark some state events as unread.
if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD:
return True

# Mark encrypted events as unread.
if not event.is_state() and event.type == EventTypes.Encrypted:
return True

return False


def encode_json(json_object):
"""
Expand Down Expand Up @@ -196,6 +237,10 @@ def _persist_events_and_state_updates(

event_counter.labels(event.type, origin_type, origin_entity).inc()

self.store.get_unread_message_count_for_user.invalidate_many(
(event.room_id,),
)

for room_id, new_state in current_state_for_room.items():
self.store.get_current_state_ids.prefill((room_id,), new_state)

Expand Down Expand Up @@ -817,8 +862,9 @@ def event_dict(event):
"contains_url": (
"url" in event.content and isinstance(event.content["url"], str)
),
"count_as_unread": should_count_as_unread(event, context),
}
for event, _ in events_and_contexts
for event, context in events_and_contexts
],
)

Expand Down
86 changes: 85 additions & 1 deletion synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,15 @@
from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import Database
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import get_domain_from_id
from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks
from synapse.util.caches.descriptors import (
Cache,
_CacheContext,
cached,
cachedInlineCallbacks,
)
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -1358,6 +1364,84 @@ def get_next_event_to_expire_txn(txn):
desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
)

@cached(tree=True, cache_context=True)
async def get_unread_message_count_for_user(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
self, room_id: str, user_id: str, cache_context: _CacheContext,
) -> int:
"""Retrieve the count of unread messages for the given room and user.

Args:
room_id: The ID of the room to count unread messages in.
user_id: The ID of the user to count unread messages for.

Returns:
The number of unread messages for the given user in the given room.
"""
with Measure(self._clock, "get_unread_message_count_for_user"):
last_read_event_id = await self.get_last_receipt_event_id_for_user(
user_id=user_id,
room_id=room_id,
receipt_type="m.read",
on_invalidate=cache_context.invalidate,
)

return await self.db.runInteraction(
"get_unread_message_count_for_user",
self._get_unread_message_count_for_user_txn,
user_id,
room_id,
last_read_event_id,
)

def _get_unread_message_count_for_user_txn(
self,
txn: Cursor,
user_id: str,
room_id: str,
last_read_event_id: Optional[str],
) -> int:
if last_read_event_id:
# Get the stream ordering for the last read event.
stream_ordering = self.db.simple_select_one_onecol_txn(
txn=txn,
table="events",
keyvalues={"room_id": room_id, "event_id": last_read_event_id},
retcol="stream_ordering",
)
else:
# If there's no read receipt for that room, it probably means the user hasn't
# opened it yet, in which case use the stream ID of their join event.
# We can't just set it to 0 otherwise messages from other local users from
# before this user joined will be counted as well.
txn.execute(
"""
SELECT stream_ordering FROM local_current_membership
LEFT JOIN events USING (event_id, room_id)
WHERE membership = 'join'
AND user_id = ?
AND room_id = ?
""",
(user_id, room_id),
)
row = txn.fetchone()

if row is None:
return 0

stream_ordering = row[0]

# Count the messages that qualify as unread after the stream ordering we've just
# retrieved.
sql = """
SELECT COUNT(*) FROM events
WHERE sender != ? AND room_id = ? AND stream_ordering > ? AND count_as_unread
"""

txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()

return row[0] if row else 0


AllNewEventsResult = namedtuple(
"AllNewEventsResult",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Store a boolean value in the events table for whether the event should be counted in
-- the unread_count property of sync responses.
ALTER TABLE events ADD COLUMN count_as_unread BOOLEAN;
20 changes: 20 additions & 0 deletions tests/rest/client/v1/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,26 @@ def send_event(

return channel.json_body

def redact(self, room_id, event_id, txn_id=None, tok=None, expect_code=200):
if txn_id is None:
txn_id = "m%s" % (str(time.time()))

path = "/_matrix/client/r0/rooms/%s/redact/%s/%s" % (room_id, event_id, txn_id)
if tok:
path = path + "?access_token=%s" % tok

request, channel = make_request(
self.hs.get_reactor(), "PUT", path, json.dumps({}).encode("utf8")
)
render(request, self.resource, self.hs.get_reactor())

assert int(channel.result["code"]) == expect_code, (
"Expected: %d, got: %d, resp: %r"
% (expect_code, int(channel.result["code"]), channel.result["body"])
)

return channel.json_body

def _read_write_state(
self,
room_id: str,
Expand Down
Loading