Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Store the thread ID with the receipt.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Sep 22, 2022
1 parent 4fe707a commit e201626
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 30 deletions.
1 change: 1 addition & 0 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool:
receipt.receipt_type,
receipt.user_id,
receipt.event_ids,
receipt.thread_id,
receipt.data,
)

Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@
"local_media_repository_thumbnails": "local_media_repository_thumbnails_method_idx",
"remote_media_cache_thumbnails": "remote_media_repository_thumbnails_method_idx",
"event_push_summary": "event_push_summary_unique_index2",
"receipts_linearized": "receipts_linearized_unique_index",
"receipts_graph": "receipts_graph_unique_index",
}


Expand Down
71 changes: 53 additions & 18 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ def _insert_linearized_receipt_txn(
receipt_type: str,
user_id: str,
event_id: str,
thread_id: Optional[str],
data: JsonDict,
stream_id: int,
) -> Optional[int]:
Expand All @@ -663,12 +664,27 @@ def _insert_linearized_receipt_txn(
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
if stream_ordering is not None:
sql = (
"SELECT stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized AS r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
if thread_id is None:
thread_clause = "r.thread_id IS NULL"
thread_args: Tuple[str, ...] = ()
else:
thread_clause = "r.thread_id = ?"
thread_args = (thread_id,)

sql = f"""
SELECT stream_ordering, event_id FROM events
INNER JOIN receipts_linearized AS r USING (event_id, room_id)
WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause}
"""
txn.execute(
sql,
(
room_id,
receipt_type,
user_id,
)
+ thread_args,
)
txn.execute(sql, (room_id, receipt_type, user_id))

for so, eid in txn:
if int(so) >= stream_ordering:
Expand All @@ -688,21 +704,28 @@ def _insert_linearized_receipt_txn(
self._receipts_stream_cache.entity_has_changed, room_id, stream_id
)

keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
}
where_clause = ""
if thread_id is None:
where_clause = "thread_id IS NULL"
else:
keyvalues["thread_id"] = thread_id

self.db_pool.simple_upsert_txn(
txn,
table="receipts_linearized",
keyvalues={
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
},
keyvalues=keyvalues,
values={
"stream_id": stream_id,
"event_id": event_id,
"event_stream_ordering": stream_ordering,
"data": json_encoder.encode(data),
"thread_id": None,
},
where_clause=where_clause,
# receipts_linearized has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
lock=False,
Expand Down Expand Up @@ -754,6 +777,7 @@ async def insert_receipt(
receipt_type: str,
user_id: str,
event_ids: List[str],
thread_id: Optional[str],
data: dict,
) -> Optional[Tuple[int, int]]:
"""Insert a receipt, either from local client or remote server.
Expand Down Expand Up @@ -786,6 +810,7 @@ async def insert_receipt(
receipt_type,
user_id,
linearized_event_id,
thread_id,
data,
stream_id=stream_id,
# Read committed is actually beneficial here because we check for a receipt with
Expand All @@ -800,7 +825,8 @@ async def insert_receipt(

now = self._clock.time_msec()
logger.debug(
"RR for event %s in %s (%i ms old)",
"Receipt %s for event %s in %s (%i ms old)",
receipt_type,
linearized_event_id,
room_id,
now - event_ts,
Expand All @@ -813,6 +839,7 @@ async def insert_receipt(
receipt_type,
user_id,
event_ids,
thread_id,
data,
)

Expand All @@ -827,6 +854,7 @@ def _insert_graph_receipt_txn(
receipt_type: str,
user_id: str,
event_ids: List[str],
thread_id: Optional[str],
data: JsonDict,
) -> None:
assert self._can_write_to_receipts
Expand All @@ -838,19 +866,26 @@ def _insert_graph_receipt_txn(
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))

keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
}
where_clause = ""
if thread_id is None:
where_clause = "thread_id IS NULL"
else:
keyvalues["thread_id"] = thread_id

self.db_pool.simple_upsert_txn(
txn,
table="receipts_graph",
keyvalues={
"room_id": room_id,
"receipt_type": receipt_type,
"user_id": user_id,
},
keyvalues=keyvalues,
values={
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
"thread_id": None,
},
where_clause=where_clause,
# receipts_graph has a unique constraint on
# (user_id, room_id, receipt_type), so no need to lock
lock=False,
Expand Down
1 change: 1 addition & 0 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ def test_sending_read_receipt_batches_to_application_services(self):
receipt_type="m.read",
user_id=self.local_user,
event_ids=[f"$eventid_{i}"],
thread_id=None,
data={},
)
)
Expand Down
2 changes: 1 addition & 1 deletion tests/replication/slave/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def test_push_actions_for_user(self, send_receipt: bool):
if send_receipt:
self.get_success(
self.master_store.insert_receipt(
ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], {}
ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], None, {}
)
)

Expand Down
14 changes: 12 additions & 2 deletions tests/replication/tcp/streams/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ def test_receipt(self):
# tell the master to send a new receipt
self.get_success(
self.hs.get_datastores().main.insert_receipt(
"!room:blue", "m.read", USER_ID, ["$event:blue"], {"a": 1}
"!room:blue",
"m.read",
USER_ID,
["$event:blue"],
thread_id=None,
data={"a": 1},
)
)
self.replicate()
Expand All @@ -58,7 +63,12 @@ def test_receipt(self):

self.get_success(
self.hs.get_datastores().main.insert_receipt(
"!room2:blue", "m.read", USER_ID, ["$event2:foo"], {"a": 2}
"!room2:blue",
"m.read",
USER_ID,
["$event2:foo"],
thread_id=None,
data={"a": 2},
)
)
self.replicate()
Expand Down
4 changes: 3 additions & 1 deletion tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def _mark_read(event_id: str) -> None:
"m.read",
user_id=user_id,
event_ids=[event_id],
thread_id=None,
data={},
)
)
Expand Down Expand Up @@ -267,13 +268,14 @@ def _create_event(
def _rotate() -> None:
self.get_success(self.store._rotate_notifs())

def _mark_read(event_id: str) -> None:
def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None:
self.get_success(
self.store.insert_receipt(
room_id,
"m.read",
user_id=user_id,
event_ids=[event_id],
thread_id=thread_id,
data={},
)
)
Expand Down
36 changes: 28 additions & 8 deletions tests/storage/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,18 @@ def test_get_receipts_for_user(self) -> None:
# Send public read receipt for the first event
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], {}
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], None, {}
)
)
# Send private read receipt for the second event
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event1_2_id], {}
self.room_id1,
ReceiptTypes.READ_PRIVATE,
OUR_USER_ID,
[event1_2_id],
None,
{},
)
)

Expand All @@ -164,7 +169,7 @@ def test_get_receipts_for_user(self) -> None:
# Test receipt updating
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], {}
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], None, {}
)
)
res = self.get_success(
Expand All @@ -180,7 +185,12 @@ def test_get_receipts_for_user(self) -> None:
# Test new room is reflected in what the method returns
self.get_success(
self.store.insert_receipt(
self.room_id2, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event2_1_id], {}
self.room_id2,
ReceiptTypes.READ_PRIVATE,
OUR_USER_ID,
[event2_1_id],
None,
{},
)
)
res = self.get_success(
Expand All @@ -202,13 +212,18 @@ def test_get_last_receipt_event_id_for_user(self) -> None:
# Send public read receipt for the first event
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], {}
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], None, {}
)
)
# Send private read receipt for the second event
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event1_2_id], {}
self.room_id1,
ReceiptTypes.READ_PRIVATE,
OUR_USER_ID,
[event1_2_id],
None,
{},
)
)

Expand Down Expand Up @@ -241,7 +256,7 @@ def test_get_last_receipt_event_id_for_user(self) -> None:
# Test receipt updating
self.get_success(
self.store.insert_receipt(
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], {}
self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], None, {}
)
)
res = self.get_success(
Expand All @@ -259,7 +274,12 @@ def test_get_last_receipt_event_id_for_user(self) -> None:
# Test new room is reflected in what the method returns
self.get_success(
self.store.insert_receipt(
self.room_id2, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event2_1_id], {}
self.room_id2,
ReceiptTypes.READ_PRIVATE,
OUR_USER_ID,
[event2_1_id],
None,
{},
)
)
res = self.get_success(
Expand Down

0 comments on commit e201626

Please sign in to comment.