Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert streams to async.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Jul 31, 2020
1 parent 394be6a commit 652248a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
1 change: 1 addition & 0 deletions changelog.d/8014.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
22 changes: 9 additions & 13 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

from typing import Any, Dict

from twisted.internet import defer

from synapse.handlers.account_data import AccountDataEventSource
from synapse.handlers.presence import PresenceEventSource
from synapse.handlers.receipts import ReceiptEventSource
Expand All @@ -40,39 +38,37 @@ def __init__(self, hs):
} # type: Dict[str, Any]
self.store = hs.get_datastore()

@defer.inlineCallbacks
def get_current_token(self):
async def get_current_token(self) -> StreamToken:
push_rules_key, _ = self.store.get_push_rules_stream_token()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
groups_key = self.store.get_group_stream_token()

token = StreamToken(
room_key=(yield self.sources["room"].get_current_key()),
presence_key=(yield self.sources["presence"].get_current_key()),
typing_key=(yield self.sources["typing"].get_current_key()),
receipt_key=(yield self.sources["receipt"].get_current_key()),
account_data_key=(yield self.sources["account_data"].get_current_key()),
room_key=await self.sources["room"].get_current_key(),
presence_key=self.sources["presence"].get_current_key(),
typing_key=self.sources["typing"].get_current_key(),
receipt_key=self.sources["receipt"].get_current_key(),
account_data_key=self.sources["account_data"].get_current_key(),
push_rules_key=push_rules_key,
to_device_key=to_device_key,
device_list_key=device_list_key,
groups_key=groups_key,
)
return token

@defer.inlineCallbacks
def get_current_token_for_pagination(self):
async def get_current_token_for_pagination(self) -> StreamToken:
"""Get the current token for a given room to be used to paginate
events.
The returned token does not have the current values for fields other
than `room`, since they are not used during pagination.
Returns:
Deferred[StreamToken]
The current token for pagination.
"""
token = StreamToken(
room_key=(yield self.sources["room"].get_current_key()),
room_key=await self.sources["room"].get_current_key(),
presence_key=0,
typing_key=0,
receipt_key=0,
Expand Down

0 comments on commit 652248a

Please sign in to comment.