Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add type hints to event streams. #10856

Merged
merged 11 commits into from
Sep 21, 2021
1 change: 1 addition & 0 deletions changelog.d/10856.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add missing type hints to handlers.
10 changes: 8 additions & 2 deletions synapse/handlers/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import random
from typing import TYPE_CHECKING, Any, List, Tuple
from typing import TYPE_CHECKING, Collection, List, Optional, Tuple

from synapse.replication.http.account_data import (
ReplicationAddTagRestServlet,
Expand Down Expand Up @@ -171,7 +171,13 @@ def get_current_key(self, direction: str = "f") -> int:
return self.store.get_max_account_data_stream_id()

async def get_new_events(
self, user: UserID, from_key: int, **kwargs: Any
self,
user: UserID,
from_key: int,
limit: Optional[int],
room_ids: Collection[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
clokep marked this conversation as resolved.
Show resolved Hide resolved
) -> Tuple[List[JsonDict], int]:
user_id = user.to_string()
last_stream_id = from_key
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ async def _notify_interested_services_ephemeral(
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
typing_source = self.event_sources.sources["typing"]
typing_source = self.event_sources.sources.typing
# Get the typing events from just before current
typing, _ = await typing_source.get_new_events_as(
service=service,
Expand All @@ -269,7 +269,7 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
receipts_source = self.event_sources.sources["receipt"]
receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
Expand All @@ -279,7 +279,7 @@ async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
) -> List[JsonDict]:
events: List[JsonDict] = []
presence_source = self.event_sources.sources["presence"]
presence_source = self.event_sources.sources.presence
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async def _snapshot_all_rooms(

now_token = self.hs.get_event_sources().get_current_token()

presence_stream = self.hs.get_event_sources().sources["presence"]
presence_stream = self.hs.get_event_sources().sources.presence
presence, _ = await presence_stream.get_new_events(
user, from_key=None, include_offline=False
)
Expand Down
5 changes: 3 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1519,10 +1519,11 @@ async def get_new_events(
self,
user: UserID,
from_key: Optional[int],
limit: Optional[int] = None,
room_ids: Optional[List[str]] = None,
include_offline: bool = True,
is_guest: bool = False,
explicit_room_id: Optional[str] = None,
**kwargs: Any,
include_offline: bool = True,
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
# 1. Get the rooms the user is in.
Expand Down
10 changes: 8 additions & 2 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Any, List, Optional, Tuple
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple

from synapse.api.constants import ReadReceiptEventFields
from synapse.appservice import ApplicationService
Expand Down Expand Up @@ -216,7 +216,13 @@ def filter_out_hidden(events: List[JsonDict], user_id: str) -> List[JsonDict]:
return visible_events

async def get_new_events(
self, from_key: int, room_ids: List[str], user: UserID, **kwargs: Any
self,
user: UserID,
from_key: int,
limit: Optional[int],
room_ids: Iterable[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
) -> Tuple[List[JsonDict], int]:
from_key = int(from_key)
to_key = self.get_current_key()
Expand Down
15 changes: 12 additions & 3 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
import random
import string
from collections import OrderedDict
from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Collection,
Dict,
List,
Optional,
Tuple,
)

from synapse.api.constants import (
EventContentFields,
Expand Down Expand Up @@ -1181,8 +1190,8 @@ async def get_new_events(
self,
user: UserID,
from_key: RoomStreamToken,
limit: int,
room_ids: List[str],
limit: Optional[int],
room_ids: Collection[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
) -> Tuple[List[EventBase], RoomStreamToken]:
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ async def ephemeral_by_room(

room_ids = sync_result_builder.joined_room_ids

typing_source = self.event_sources.sources["typing"]
typing_source = self.event_sources.sources.typing
typing, typing_key = await typing_source.get_new_events(
user=sync_config.user,
from_key=typing_key,
Expand All @@ -465,7 +465,7 @@ async def ephemeral_by_room(

receipt_key = since_token.receipt_key if since_token else 0

receipt_source = self.event_sources.sources["receipt"]
receipt_source = self.event_sources.sources.receipt
receipts, receipt_key = await receipt_source.get_new_events(
user=sync_config.user,
from_key=receipt_key,
Expand Down Expand Up @@ -1415,7 +1415,7 @@ async def _generate_sync_entry_for_presence(
sync_config = sync_result_builder.sync_config
user = sync_result_builder.sync_config.user

presence_source = self.event_sources.sources["presence"]
presence_source = self.event_sources.sources.presence

since_token = sync_result_builder.since_token
presence_key = None
Expand Down
10 changes: 8 additions & 2 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging
import random
from collections import namedtuple
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple

from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService
Expand Down Expand Up @@ -485,7 +485,13 @@ async def get_new_events_as(
return (events, handler._latest_room_serial)

async def get_new_events(
self, from_key: int, room_ids: Iterable[str], **kwargs: Any
self,
user: UserID,
from_key: int,
limit: Optional[int],
room_ids: Iterable[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
) -> Tuple[List[JsonDict], int]:
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
Expand Down
2 changes: 1 addition & 1 deletion synapse/module_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(self, hs: "HomeServer", auth_handler):
self._auth = hs.get_auth()
self._auth_handler = auth_handler
self._server_name = hs.hostname
self._presence_stream = hs.get_event_sources().sources["presence"]
self._presence_stream = hs.get_event_sources().sources.presence
self._state = hs.get_state_handler()
self._clock: Clock = hs.get_clock()
self._send_email_handler = hs.get_send_email_handler()
Expand Down
2 changes: 1 addition & 1 deletion synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ async def check_for_updates(
events: List[EventBase] = []
end_token = from_token

for name, source in self.event_sources.sources.items():
for name, source in self.event_sources.sources.get_sources():
keyname = "%s_key" % name
before_id = getattr(before_token, keyname)
after_id = getattr(after_token, keyname)
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

import logging
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, Iterable, List, Optional, Tuple

from twisted.internet import defer

Expand Down Expand Up @@ -153,7 +153,7 @@ def f(txn):
}

async def get_linearized_receipts_for_rooms(
self, room_ids: List[str], to_key: int, from_key: Optional[int] = None
self, room_ids: Iterable[str], to_key: int, from_key: Optional[int] = None
clokep marked this conversation as resolved.
Show resolved Hide resolved
) -> List[dict]:
"""Get receipts for multiple rooms for sending to clients.

Expand Down
63 changes: 44 additions & 19 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Dict
from typing import TYPE_CHECKING, Generator, Tuple, Union

import attr

from synapse.handlers.account_data import AccountDataEventSource
from synapse.handlers.presence import PresenceEventSource
Expand All @@ -21,20 +23,43 @@
from synapse.handlers.typing import TypingNotificationEventSource
from synapse.types import StreamToken

if TYPE_CHECKING:
from synapse.server import HomeServer

class EventSources:
SOURCE_TYPES = {
"room": RoomEventSource,
"presence": PresenceEventSource,
"typing": TypingNotificationEventSource,
"receipt": ReceiptEventSource,
"account_data": AccountDataEventSource,
}

def __init__(self, hs):
self.sources: Dict[str, Any] = {
name: cls(hs) for name, cls in EventSources.SOURCE_TYPES.items()
}
@attr.s(frozen=True, slots=True, auto_attribs=True)
class _EventSourcesInner:
room: RoomEventSource
presence: PresenceEventSource
typing: TypingNotificationEventSource
receipt: ReceiptEventSource
account_data: AccountDataEventSource

def get_sources(
self,
) -> Generator[
Tuple[
str,
Union[
RoomEventSource,
PresenceEventSource,
TypingNotificationEventSource,
ReceiptEventSource,
AccountDataEventSource,
],
],
None,
None,
]:
clokep marked this conversation as resolved.
Show resolved Hide resolved
for attribute in _EventSourcesInner.__attrs_attrs__: # type: ignore[attr-defined]
yield attribute.name, getattr(self, attribute.name)


class EventSources:
def __init__(self, hs: "HomeServer"):
self.sources = _EventSourcesInner(
*(attribute.type(hs) for attribute in _EventSourcesInner.__attrs_attrs__) # type: ignore[attr-defined]
)
self.store = hs.get_datastore()

def get_current_token(self) -> StreamToken:
Expand All @@ -44,11 +69,11 @@ def get_current_token(self) -> StreamToken:
groups_key = self.store.get_group_stream_token()

token = StreamToken(
room_key=self.sources["room"].get_current_key(),
presence_key=self.sources["presence"].get_current_key(),
typing_key=self.sources["typing"].get_current_key(),
receipt_key=self.sources["receipt"].get_current_key(),
account_data_key=self.sources["account_data"].get_current_key(),
room_key=self.sources.room.get_current_key(),
presence_key=self.sources.presence.get_current_key(),
typing_key=self.sources.typing.get_current_key(),
receipt_key=self.sources.receipt.get_current_key(),
account_data_key=self.sources.account_data.get_current_key(),
push_rules_key=push_rules_key,
to_device_key=to_device_key,
device_list_key=device_list_key,
Expand All @@ -67,7 +92,7 @@ def get_current_token_for_pagination(self) -> StreamToken:
The current token for pagination.
"""
token = StreamToken(
room_key=self.sources["room"].get_current_key(),
room_key=self.sources.room.get_current_key(),
presence_key=0,
typing_key=0,
receipt_key=0,
Expand Down
2 changes: 1 addition & 1 deletion tests/handlers/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

class ReceiptsTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.event_source = hs.get_event_sources().sources["receipt"]
self.event_source = hs.get_event_sources().sources.receipt

# In the first param of _test_filters_hidden we use "hidden" instead of
# ReadReceiptEventFields.MSC2285_HIDDEN. We do this because we're mocking
Expand Down
Loading