Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow multiple workers to write to receipts stream. #16432

Merged
merged 15 commits into from
Oct 25, 2023
Merged
40 changes: 40 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
Dict,
Iterable,
List,
Literal,
Optional,
Set,
Tuple,
TypeVar,
Union,
overload,
)

import attr
Expand Down Expand Up @@ -453,6 +455,44 @@ def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken) -> None:
except Exception:
logger.exception("Error pusher pool of event")

@overload
def on_new_event(
self,
stream_key: Literal[StreamKeyType.ROOM],
new_token: RoomStreamToken,
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
...
Comment on lines +458 to +466
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised we didn't already need this overload! Does it help avoid any asserts or anything?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't reduce assertions, as it only limits what is accepted as arguments (it doesn't change the return type). I added it to catch the case where we passed in an int when using StreamKeyType.RECEIPT.


@overload
def on_new_event(
self,
stream_key: Literal[StreamKeyType.RECEIPT],
new_token: MultiWriterStreamToken,
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
...

@overload
def on_new_event(
self,
stream_key: Literal[
StreamKeyType.ACCOUNT_DATA,
StreamKeyType.DEVICE_LIST,
StreamKeyType.PRESENCE,
StreamKeyType.PUSH_RULES,
StreamKeyType.TO_DEVICE,
StreamKeyType.TYPING,
StreamKeyType.UN_PARTIAL_STATED_ROOMS,
],
new_token: int,
users: Optional[Collection[Union[str, UserID]]] = None,
rooms: Optional[StrCollection] = None,
) -> None:
...

def on_new_event(
self,
stream_key: StreamKeyType,
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ async def on_rdata(
StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows]
)
elif stream_name == ReceiptsStream.NAME:
new_token = self.store.get_max_receipt_stream_id()
self.notifier.on_new_event(
StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows]
StreamKeyType.RECEIPT, new_token, rooms=[row.room_id for row in rows]
clokep marked this conversation as resolved.
Show resolved Hide resolved
)
await self._pusher_pool.on_new_receipts({row.user_id for row in rows})
elif stream_name == ToDeviceStream.NAME:
Expand Down