From a0c0efcee3e4da8e11a31b3d6c546da6aef65451 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 20 Sep 2021 10:39:14 -0400 Subject: [PATCH 01/10] Do not store event streams in a dict. --- synapse/handlers/appservice.py | 6 +-- synapse/handlers/initial_sync.py | 2 +- synapse/handlers/sync.py | 6 +-- synapse/module_api/__init__.py | 2 +- synapse/notifier.py | 2 +- synapse/streams/events.py | 63 +++++++++++++++++-------- tests/handlers/test_receipts.py | 2 +- tests/handlers/test_typing.py | 2 +- tests/rest/client/test_shadow_banned.py | 2 +- tests/rest/client/test_typing.py | 2 +- 10 files changed, 57 insertions(+), 32 deletions(-) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 8bde9ed66f8e..b7213b67a57b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -254,7 +254,7 @@ async def _notify_interested_services_ephemeral( async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: - typing_source = self.event_sources.sources["typing"] + typing_source = self.event_sources.sources.typing # Get the typing events from just before current typing, _ = await typing_source.get_new_events_as( service=service, @@ -269,7 +269,7 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) - receipts_source = self.event_sources.sources["receipt"] + receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( service=service, from_key=from_key ) @@ -279,7 +279,7 @@ async def _handle_presence( self, service: ApplicationService, users: Collection[Union[str, UserID]] ) -> List[JsonDict]: events: List[JsonDict] = [] - presence_source = self.event_sources.sources["presence"] + presence_source = self.event_sources.sources.presence from_key = await self.store.get_type_stream_id_for_appservice( service, "presence" ) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index c942086e746e..9ad39a65d8b6 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -125,7 +125,7 @@ async def _snapshot_all_rooms( now_token = self.hs.get_event_sources().get_current_token() - presence_stream = self.hs.get_event_sources().sources["presence"] + presence_stream = self.hs.get_event_sources().sources.presence presence, _ = await presence_stream.get_new_events( user, from_key=None, include_offline=False ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index e93db4bdcce4..2c7c6d63a931 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -443,7 +443,7 @@ async def ephemeral_by_room( room_ids = sync_result_builder.joined_room_ids - typing_source = self.event_sources.sources["typing"] + typing_source = self.event_sources.sources.typing typing, typing_key = await typing_source.get_new_events( user=sync_config.user, from_key=typing_key, @@ -465,7 +465,7 @@ async def ephemeral_by_room( receipt_key = since_token.receipt_key if since_token else 0 - receipt_source = self.event_sources.sources["receipt"] + receipt_source = self.event_sources.sources.receipt receipts, receipt_key = await receipt_source.get_new_events( user=sync_config.user, from_key=receipt_key, @@ -1415,7 +1415,7 @@ async def _generate_sync_entry_for_presence( sync_config = sync_result_builder.sync_config user = sync_result_builder.sync_config.user - presence_source = self.event_sources.sources["presence"] + presence_source = self.event_sources.sources.presence since_token = sync_result_builder.since_token presence_key = None diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 2d403532fadb..3196c2bec65e 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -91,7 +91,7 @@ def __init__(self, hs: "HomeServer", auth_handler): self._auth = hs.get_auth() self._auth_handler = auth_handler self._server_name = hs.hostname - self._presence_stream = hs.get_event_sources().sources["presence"] + self._presence_stream = hs.get_event_sources().sources.presence self._state = hs.get_state_handler() self._clock: Clock = hs.get_clock() self._send_email_handler = hs.get_send_email_handler() diff --git a/synapse/notifier.py b/synapse/notifier.py index bbe337949ac5..1a9f84ba4533 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -584,7 +584,7 @@ async def check_for_updates( events: List[EventBase] = [] end_token = from_token - for name, source in self.event_sources.sources.items(): + for name, source in self.event_sources.sources.get_sources(): keyname = "%s_key" % name before_id = getattr(before_token, keyname) after_id = getattr(after_token, keyname) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 99b0aac2fb12..7d698d3e8741 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Dict +from typing import TYPE_CHECKING, Generator, Tuple, Union + +import attr from synapse.handlers.account_data import AccountDataEventSource from synapse.handlers.presence import PresenceEventSource @@ -21,20 +23,43 @@ from synapse.handlers.typing import TypingNotificationEventSource from synapse.types import StreamToken +if TYPE_CHECKING: + from synapse.server import HomeServer -class EventSources: - SOURCE_TYPES = { - "room": RoomEventSource, - "presence": PresenceEventSource, - "typing": TypingNotificationEventSource, - "receipt": ReceiptEventSource, - "account_data": AccountDataEventSource, - } - def __init__(self, hs): - self.sources: Dict[str, Any] = { - name: cls(hs) for name, cls in EventSources.SOURCE_TYPES.items() - } +@attr.s(frozen=True, slots=True, auto_attribs=True) +class _EventSourcesInner: + room: RoomEventSource + presence: PresenceEventSource + typing: TypingNotificationEventSource + receipt: ReceiptEventSource + account_data: AccountDataEventSource + + def get_sources( + self, + ) -> Generator[ + Tuple[ + str, + Union[ + RoomEventSource, + PresenceEventSource, + TypingNotificationEventSource, + ReceiptEventSource, + AccountDataEventSource, + ], + ], + None, + None, + ]: + for attribute in _EventSourcesInner.__attrs_attrs__: # type: ignore[attr-defined] + yield attribute.name, getattr(self, attribute.name) + + +class EventSources: + def __init__(self, hs: "HomeServer"): + self.sources = _EventSourcesInner( + *(attribute.type(hs) for attribute in _EventSourcesInner.__attrs_attrs__) # type: ignore[attr-defined] + ) self.store = hs.get_datastore() def get_current_token(self) -> StreamToken: @@ -44,11 +69,11 @@ def get_current_token(self) -> StreamToken: groups_key = self.store.get_group_stream_token() token = StreamToken( - room_key=self.sources["room"].get_current_key(), - presence_key=self.sources["presence"].get_current_key(), - typing_key=self.sources["typing"].get_current_key(), - receipt_key=self.sources["receipt"].get_current_key(), - account_data_key=self.sources["account_data"].get_current_key(), + room_key=self.sources.room.get_current_key(), + presence_key=self.sources.presence.get_current_key(), + typing_key=self.sources.typing.get_current_key(), + receipt_key=self.sources.receipt.get_current_key(), + account_data_key=self.sources.account_data.get_current_key(), push_rules_key=push_rules_key, to_device_key=to_device_key, device_list_key=device_list_key, @@ -67,7 +92,7 @@ def get_current_token_for_pagination(self) -> StreamToken: The current token for pagination. """ token = StreamToken( - room_key=self.sources["room"].get_current_key(), + room_key=self.sources.room.get_current_key(), presence_key=0, typing_key=0, receipt_key=0, diff --git a/tests/handlers/test_receipts.py b/tests/handlers/test_receipts.py index 732a12c9bd08..5de89c873b94 100644 --- a/tests/handlers/test_receipts.py +++ b/tests/handlers/test_receipts.py @@ -23,7 +23,7 @@ class ReceiptsTestCase(unittest.HomeserverTestCase): def prepare(self, reactor, clock, hs): - self.event_source = hs.get_event_sources().sources["receipt"] + self.event_source = hs.get_event_sources().sources.receipt # In the first param of _test_filters_hidden we use "hidden" instead of # ReadReceiptEventFields.MSC2285_HIDDEN. We do this because we're mocking diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index fa3cff598ed6..8560d7116073 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -89,7 +89,7 @@ def prepare(self, reactor, clock, hs): self.handler = hs.get_typing_handler() - self.event_source = hs.get_event_sources().sources["typing"] + self.event_source = hs.get_event_sources().sources.typing self.datastore = hs.get_datastore() self.datastore.get_destination_retry_timings = Mock( diff --git a/tests/rest/client/test_shadow_banned.py b/tests/rest/client/test_shadow_banned.py index 6a0d9a82be93..8555e050ab8b 100644 --- a/tests/rest/client/test_shadow_banned.py +++ b/tests/rest/client/test_shadow_banned.py @@ -193,7 +193,7 @@ def test_typing(self): self.assertEquals(200, channel.code) # There should be no typing events. - event_source = self.hs.get_event_sources().sources["typing"] + event_source = self.hs.get_event_sources().sources.typing self.assertEquals(event_source.get_current_key(), 0) # The other user can join and send typing events. diff --git a/tests/rest/client/test_typing.py b/tests/rest/client/test_typing.py index b54b00473327..c4989baf49a0 100644 --- a/tests/rest/client/test_typing.py +++ b/tests/rest/client/test_typing.py @@ -41,7 +41,7 @@ def make_homeserver(self, reactor, clock): federation_client=Mock(), ) - self.event_source = hs.get_event_sources().sources["typing"] + self.event_source = hs.get_event_sources().sources.typing hs.get_federation_handler = Mock() From c975b21e9d7d52c9cf0af3393c8716264ad89052 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 20 Sep 2021 10:41:48 -0400 Subject: [PATCH 02/10] Fix-up type hints due to using sets, not lists. --- synapse/handlers/receipts.py | 4 ++-- synapse/handlers/room.py | 13 +++++++++++-- synapse/storage/databases/main/receipts.py | 4 ++-- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index c7567ac05fdc..a55e1806d6bf 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple from synapse.api.constants import ReadReceiptEventFields from synapse.appservice import ApplicationService @@ -216,7 +216,7 @@ def filter_out_hidden(events: List[JsonDict], user_id: str) -> List[JsonDict]: return visible_events async def get_new_events( - self, from_key: int, room_ids: List[str], user: UserID, **kwargs: Any + self, from_key: int, room_ids: Iterable[str], user: UserID, **kwargs: Any ) -> Tuple[List[JsonDict], int]: from_key = int(from_key) to_key = self.get_current_key() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index abdd5061649d..8efb0616fb76 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -20,7 +20,16 @@ import random import string from collections import OrderedDict -from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Collection, + Dict, + List, + Optional, + Tuple, +) from synapse.api.constants import ( EventContentFields, @@ -1182,7 +1191,7 @@ async def get_new_events( user: UserID, from_key: RoomStreamToken, limit: int, - room_ids: List[str], + room_ids: Collection[str], is_guest: bool, explicit_room_id: Optional[str] = None, ) -> Tuple[List[EventBase], RoomStreamToken]: diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index edeaacd7a65a..0dc0c3ba0ceb 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, Iterable, List, Optional, Tuple from twisted.internet import defer @@ -153,7 +153,7 @@ def f(txn): } async def get_linearized_receipts_for_rooms( - self, room_ids: List[str], to_key: int, from_key: Optional[int] = None + self, room_ids: Iterable[str], to_key: int, from_key: Optional[int] = None ) -> List[dict]: """Get receipts for multiple rooms for sending to clients. From 352577828b9aaa81b157fa4429622db24c723102 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 20 Sep 2021 10:59:40 -0400 Subject: [PATCH 03/10] Fix incorrect type hint. --- synapse/handlers/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 8efb0616fb76..51d3af1caf34 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1190,7 +1190,7 @@ async def get_new_events( self, user: UserID, from_key: RoomStreamToken, - limit: int, + limit: Optional[int], room_ids: Collection[str], is_guest: bool, explicit_room_id: Optional[str] = None, From 82dc35e4a004988a7eb35451bb1e56459d6f49f6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 20 Sep 2021 11:02:43 -0400 Subject: [PATCH 04/10] Reformat arguments. --- synapse/handlers/account_data.py | 5 ++++- synapse/handlers/receipts.py | 6 +++++- synapse/handlers/typing.py | 5 ++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index e9e7a78546c3..661472f769ac 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -171,7 +171,10 @@ def get_current_key(self, direction: str = "f") -> int: return self.store.get_max_account_data_stream_id() async def get_new_events( - self, user: UserID, from_key: int, **kwargs: Any + self, + user: UserID, + from_key: int, + **kwargs: Any, ) -> Tuple[List[JsonDict], int]: user_id = user.to_string() last_stream_id = from_key diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index a55e1806d6bf..08ee8f0a857d 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -216,7 +216,11 @@ def filter_out_hidden(events: List[JsonDict], user_id: str) -> List[JsonDict]: return visible_events async def get_new_events( - self, from_key: int, room_ids: Iterable[str], user: UserID, **kwargs: Any + self, + user: UserID, + from_key: int, + room_ids: Iterable[str], + **kwargs: Any, ) -> Tuple[List[JsonDict], int]: from_key = int(from_key) to_key = self.get_current_key() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 4492c8567b16..71ba1555bcdc 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -485,7 +485,10 @@ async def get_new_events_as( return (events, handler._latest_room_serial) async def get_new_events( - self, from_key: int, room_ids: Iterable[str], **kwargs: Any + self, + from_key: int, + room_ids: Iterable[str], + **kwargs: Any, ) -> Tuple[List[JsonDict], int]: with Measure(self.clock, "typing.get_new_events"): from_key = int(from_key) From 9aab3aba5a6e2b1a9bb197a6e13e89c50c543dc7 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 20 Sep 2021 11:11:54 -0400 Subject: [PATCH 05/10] Remove kwargs from get_new_events. --- synapse/handlers/account_data.py | 7 ++-- synapse/handlers/presence.py | 5 +-- synapse/handlers/receipts.py | 6 ++-- synapse/handlers/typing.py | 7 ++-- tests/handlers/test_typing.py | 44 +++++++++++++++++++++---- tests/rest/client/test_shadow_banned.py | 8 ++++- tests/rest/client/test_typing.py | 8 ++++- 7 files changed, 68 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 661472f769ac..5df8436c505d 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import random -from typing import TYPE_CHECKING, Any, List, Tuple +from typing import TYPE_CHECKING, Collection, List, Optional, Tuple from synapse.replication.http.account_data import ( ReplicationAddTagRestServlet, @@ -174,7 +174,10 @@ async def get_new_events( self, user: UserID, from_key: int, - **kwargs: Any, + limit: Optional[int], + room_ids: Collection[str], + is_guest: bool, + explicit_room_id: Optional[str] = None, ) -> Tuple[List[JsonDict], int]: user_id = user.to_string() last_stream_id = from_key diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 841c8815b0ae..272ae056c063 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1519,10 +1519,11 @@ async def get_new_events( self, user: UserID, from_key: Optional[int], + limit: Optional[int] = None, room_ids: Optional[List[str]] = None, - include_offline: bool = True, + is_guest: bool = False, explicit_room_id: Optional[str] = None, - **kwargs: Any, + include_offline: bool = True, ) -> Tuple[List[UserPresenceState], int]: # The process for getting presence events are: # 1. Get the rooms the user is in. diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 08ee8f0a857d..4b8a7848913f 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple from synapse.api.constants import ReadReceiptEventFields from synapse.appservice import ApplicationService @@ -219,8 +219,10 @@ async def get_new_events( self, user: UserID, from_key: int, + limit: Optional[int], room_ids: Iterable[str], - **kwargs: Any, + is_guest: bool, + explicit_room_id: Optional[str] = None, ) -> Tuple[List[JsonDict], int]: from_key = int(from_key) to_key = self.get_current_key() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 71ba1555bcdc..3b0a8d10a249 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -14,7 +14,7 @@ import logging import random from collections import namedtuple -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple from synapse.api.errors import AuthError, ShadowBanError, SynapseError from synapse.appservice import ApplicationService @@ -486,9 +486,12 @@ async def get_new_events_as( async def get_new_events( self, + user: UserID, from_key: int, + limit: Optional[int], room_ids: Iterable[str], - **kwargs: Any, + is_guest: bool, + explicit_room_id: Optional[str] = None, ) -> Tuple[List[JsonDict], int]: with Measure(self.clock, "typing.get_new_events"): from_key = int(from_key) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 8560d7116073..000f9b9fde2b 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -171,7 +171,9 @@ def test_started_typing_local(self): self.assertEquals(self.event_source.get_current_key(), 1) events = self.get_success( - self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + self.event_source.get_new_events( + user=U_APPLE, from_key=0, limit=None, room_ids=[ROOM_ID], is_guest=False + ) ) self.assertEquals( events[0], @@ -239,7 +241,9 @@ def test_started_typing_remote_recv(self): self.assertEquals(self.event_source.get_current_key(), 1) events = self.get_success( - self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + self.event_source.get_new_events( + user=U_APPLE, from_key=0, limit=None, room_ids=[ROOM_ID], is_guest=False + ) ) self.assertEquals( events[0], @@ -276,7 +280,13 @@ def test_started_typing_remote_recv_not_in_room(self): self.assertEquals(self.event_source.get_current_key(), 0) events = self.get_success( - self.event_source.get_new_events(room_ids=[OTHER_ROOM_ID], from_key=0) + self.event_source.get_new_events( + user=U_APPLE, + from_key=0, + limit=None, + room_ids=[OTHER_ROOM_ID], + is_guest=False, + ) ) self.assertEquals(events[0], []) self.assertEquals(events[1], 0) @@ -324,7 +334,9 @@ def test_stopped_typing(self): self.assertEquals(self.event_source.get_current_key(), 1) events = self.get_success( - self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + self.event_source.get_new_events( + user=U_APPLE, from_key=0, limit=None, room_ids=[ROOM_ID], is_guest=False + ) ) self.assertEquals( events[0], @@ -350,7 +362,13 @@ def test_typing_timeout(self): self.assertEquals(self.event_source.get_current_key(), 1) events = self.get_success( - self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + self.event_source.get_new_events( + user=U_APPLE, + from_key=0, + limit=None, + room_ids=[ROOM_ID], + is_guest=False, + ) ) self.assertEquals( events[0], @@ -369,7 +387,13 @@ def test_typing_timeout(self): self.assertEquals(self.event_source.get_current_key(), 2) events = self.get_success( - self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=1) + self.event_source.get_new_events( + user=U_APPLE, + from_key=1, + limit=None, + room_ids=[ROOM_ID], + is_guest=False, + ) ) self.assertEquals( events[0], @@ -392,7 +416,13 @@ def test_typing_timeout(self): self.assertEquals(self.event_source.get_current_key(), 3) events = self.get_success( - self.event_source.get_new_events(room_ids=[ROOM_ID], from_key=0) + self.event_source.get_new_events( + user=U_APPLE, + from_key=0, + limit=None, + room_ids=[ROOM_ID], + is_guest=False, + ) ) self.assertEquals( events[0], diff --git a/tests/rest/client/test_shadow_banned.py b/tests/rest/client/test_shadow_banned.py index 8555e050ab8b..b0c44af033c0 100644 --- a/tests/rest/client/test_shadow_banned.py +++ b/tests/rest/client/test_shadow_banned.py @@ -210,7 +210,13 @@ def test_typing(self): # These appear in the room. self.assertEquals(event_source.get_current_key(), 1) events = self.get_success( - event_source.get_new_events(from_key=0, room_ids=[room_id]) + event_source.get_new_events( + user=UserID.from_string(self.other_user_id), + from_key=0, + limit=None, + room_ids=[room_id], + is_guest=False, + ) ) self.assertEquals( events[0], diff --git a/tests/rest/client/test_typing.py b/tests/rest/client/test_typing.py index c4989baf49a0..ee0abd5295af 100644 --- a/tests/rest/client/test_typing.py +++ b/tests/rest/client/test_typing.py @@ -76,7 +76,13 @@ def test_set_typing(self): self.assertEquals(self.event_source.get_current_key(), 1) events = self.get_success( - self.event_source.get_new_events(from_key=0, room_ids=[self.room_id]) + self.event_source.get_new_events( + user=UserID.from_string(self.user_id), + from_key=0, + limit=None, + room_ids=[self.room_id], + is_guest=False, + ) ) self.assertEquals( events[0], From ca3125fdc0760588723a8f76cf919394f558449e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 20 Sep 2021 11:27:25 -0400 Subject: [PATCH 06/10] Newsfragment --- changelog.d/10856.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10856.misc diff --git a/changelog.d/10856.misc b/changelog.d/10856.misc new file mode 100644 index 000000000000..f09af2e00a3b --- /dev/null +++ b/changelog.d/10856.misc @@ -0,0 +1 @@ +Add missing type hints to handlers. From d27010eee94debb6e3e075a4a235a72d405fa305 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 21 Sep 2021 07:42:59 -0400 Subject: [PATCH 07/10] Simplify type hints. Co-authored-by: reivilibre --- synapse/streams/events.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 7d698d3e8741..8afed40853d9 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -37,7 +37,7 @@ class _EventSourcesInner: def get_sources( self, - ) -> Generator[ + ) -> Iterator[ Tuple[ str, Union[ @@ -48,8 +48,6 @@ def get_sources( AccountDataEventSource, ], ], - None, - None, ]: for attribute in _EventSourcesInner.__attrs_attrs__: # type: ignore[attr-defined] yield attribute.name, getattr(self, attribute.name) From cdbd2b0b9529d919c16a10c52032c9207f25c156 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 21 Sep 2021 07:46:30 -0400 Subject: [PATCH 08/10] Fix imports --- synapse/streams/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 8afed40853d9..dbe74e80261a 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Generator, Tuple, Union +from typing import TYPE_CHECKING, Iterator, Tuple, Union import attr From 26a338eba34368664abb0be5e4921e56563db61b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 21 Sep 2021 08:09:04 -0400 Subject: [PATCH 09/10] Inherit from a generic EventSource. --- synapse/handlers/account_data.py | 3 ++- synapse/handlers/presence.py | 3 ++- synapse/handlers/receipts.py | 3 ++- synapse/handlers/room.py | 3 ++- synapse/handlers/typing.py | 3 ++- synapse/streams/__init__.py | 22 ++++++++++++++++++++++ synapse/streams/events.py | 18 +++--------------- 7 files changed, 35 insertions(+), 20 deletions(-) diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 5df8436c505d..96273e2f81c3 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -21,6 +21,7 @@ ReplicationRoomAccountDataRestServlet, ReplicationUserAccountDataRestServlet, ) +from synapse.streams import EventSource from synapse.types import JsonDict, UserID if TYPE_CHECKING: @@ -163,7 +164,7 @@ async def remove_tag_from_room(self, user_id: str, room_id: str, tag: str) -> in return response["max_stream_id"] -class AccountDataEventSource: +class AccountDataEventSource(EventSource[int, JsonDict]): def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 272ae056c063..983c837c662e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -65,6 +65,7 @@ from synapse.replication.tcp.commands import ClearUserSyncsCommand from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream from synapse.storage.databases.main import DataStore +from synapse.streams import EventSource from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import _CacheContext, cached @@ -1500,7 +1501,7 @@ def format_user_presence_state( return content -class PresenceEventSource: +class PresenceEventSource(EventSource[int, UserPresenceState]): def __init__(self, hs: "HomeServer"): # We can't call get_presence_handler here because there's a cycle: # diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 4b8a7848913f..5881f09ebd27 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -17,6 +17,7 @@ from synapse.api.constants import ReadReceiptEventFields from synapse.appservice import ApplicationService from synapse.handlers._base import BaseHandler +from synapse.streams import EventSource from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id if TYPE_CHECKING: @@ -162,7 +163,7 @@ async def received_client_receipt( await self.federation_sender.send_read_receipt(receipt) -class ReceiptEventSource: +class ReceiptEventSource(EventSource[int, JsonDict]): def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() self.config = hs.config diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 51d3af1caf34..287ea2fd06fa 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -56,6 +56,7 @@ from synapse.events.utils import copy_power_levels_contents from synapse.rest.admin._base import assert_user_is_admin from synapse.storage.state import StateFilter +from synapse.streams import EventSource from synapse.types import ( JsonDict, MutableStateMap, @@ -1182,7 +1183,7 @@ async def filter_evts(events: List[EventBase]) -> List[EventBase]: return results -class RoomEventSource: +class RoomEventSource(EventSource[RoomStreamToken, EventBase]): def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 3b0a8d10a249..9326330c9028 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -23,6 +23,7 @@ wrap_as_background_process, ) from synapse.replication.tcp.streams import TypingStream +from synapse.streams import EventSource from synapse.types import JsonDict, Requester, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure @@ -439,7 +440,7 @@ def process_replication_rows( raise Exception("Typing writer instance got typing info over replication") -class TypingNotificationEventSource: +class TypingNotificationEventSource(EventSource[int, JsonDict]): def __init__(self, hs: "HomeServer"): self.hs = hs self.clock = hs.get_clock() diff --git a/synapse/streams/__init__.py b/synapse/streams/__init__.py index 5e83dba2ed6f..806b67130530 100644 --- a/synapse/streams/__init__.py +++ b/synapse/streams/__init__.py @@ -11,3 +11,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from typing import Collection, Generic, List, Optional, Tuple, TypeVar + +from synapse.types import UserID + +# The key, this is either a stream token or int. +K = TypeVar("K") +# The return type. +R = TypeVar("R") + + +class EventSource(Generic[K, R]): + async def get_new_events( + self, + user: UserID, + from_key: K, + limit: Optional[int], + room_ids: Collection[str], + is_guest: bool, + explicit_room_id: Optional[str] = None, + ) -> Tuple[List[R], K]: + ... diff --git a/synapse/streams/events.py b/synapse/streams/events.py index dbe74e80261a..21591d0bfd2f 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Iterator, Tuple, Union +from typing import TYPE_CHECKING, Iterator, Tuple import attr @@ -21,6 +21,7 @@ from synapse.handlers.receipts import ReceiptEventSource from synapse.handlers.room import RoomEventSource from synapse.handlers.typing import TypingNotificationEventSource +from synapse.streams import EventSource from synapse.types import StreamToken if TYPE_CHECKING: @@ -35,20 +36,7 @@ class _EventSourcesInner: receipt: ReceiptEventSource account_data: AccountDataEventSource - def get_sources( - self, - ) -> Iterator[ - Tuple[ - str, - Union[ - RoomEventSource, - PresenceEventSource, - TypingNotificationEventSource, - ReceiptEventSource, - AccountDataEventSource, - ], - ], - ]: + def get_sources(self) -> Iterator[Tuple[str, EventSource]]: for attribute in _EventSourcesInner.__attrs_attrs__: # type: ignore[attr-defined] yield attribute.name, getattr(self, attribute.name) From 352f8b47dfca3ab5f30b968201c2527321e2fbb0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 21 Sep 2021 11:12:46 -0400 Subject: [PATCH 10/10] Fix comment. --- synapse/storage/databases/main/receipts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 0dc0c3ba0ceb..01a42813011a 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -158,7 +158,7 @@ async def get_linearized_receipts_for_rooms( """Get receipts for multiple rooms for sending to clients. Args: - room_id: List of room_ids. + room_id: The room IDs to fetch receipts of. to_key: Max stream id to fetch receipts up to. from_key: Min stream id to fetch receipts from. None fetches from the start.