Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Properly replication the thread_id on receipts.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Sep 15, 2022
1 parent b2fcd79 commit 893378d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
1 change: 1 addition & 0 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ class ReceiptsStreamRow:
receipt_type: str
user_id: str
event_id: str
thread_id: Optional[str]
data: dict

NAME = "receipts"
Expand Down
16 changes: 11 additions & 5 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,9 @@ def _get_users_sent_receipts_between_txn(txn: LoggingTransaction) -> List[str]:

async def get_all_updated_receipts(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
) -> Tuple[
List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]], int, bool
]:
"""Get updates for receipts replication stream.
Args:
Expand All @@ -567,9 +569,13 @@ async def get_all_updated_receipts(

def get_all_updated_receipts_txn(
txn: LoggingTransaction,
) -> Tuple[List[Tuple[int, list]], int, bool]:
) -> Tuple[
List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]],
int,
bool,
]:
sql = """
SELECT stream_id, room_id, receipt_type, user_id, event_id, data
SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
FROM receipts_linearized
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
Expand All @@ -578,8 +584,8 @@ def get_all_updated_receipts_txn(
txn.execute(sql, (last_id, current_id, limit))

updates = cast(
List[Tuple[int, list]],
[(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn],
List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]],
[(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn],
)

limited = False
Expand Down
1 change: 1 addition & 0 deletions tests/replication/tcp/streams/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def test_receipt(self):
self.assertEqual("m.read", row.receipt_type)
self.assertEqual(USER_ID, row.user_id)
self.assertEqual("$event:blue", row.event_id)
self.assertIsNone(row.thread_id)
self.assertEqual({"a": 1}, row.data)

# Now let's disconnect and insert some data.
Expand Down

0 comments on commit 893378d

Please sign in to comment.