Skip to content

Commit

Permalink
Refactor handling of appservice ephemeral stream.
Browse files Browse the repository at this point in the history
  • Loading branch information
Fizzadar committed Oct 29, 2021
1 parent f8f3c4a commit 8e5e670
Showing 1 changed file with 27 additions and 33 deletions.
60 changes: 27 additions & 33 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,29 +262,31 @@ async def _notify_interested_services_ephemeral(
)
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "read_receipt", new_token
)

stream_id = "read_recipt"
handler = self._handle_receipts
elif stream_key == "presence_key":
events = await self._handle_presence(service, users, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, "presence", new_token
stream_id = "presence"
handler = self._handle_presence
else:
continue

from_key = await self.store.get_type_stream_id_for_appservice(
service, stream_id
)
if new_token is not None and new_token <= from_key:
raise Exception(
"Rejecting token lower than stored: %s" % (new_token,)
)

events = await handler(service, users, from_key)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)

# Persist the latest handled stream token for this appservice
await self.store.set_type_stream_id_for_appservice(
service, stream_id, new_token
)

async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
Expand Down Expand Up @@ -321,7 +323,10 @@ async def _handle_typing(
return typing

async def _handle_receipts(
self, service: ApplicationService, new_token: Optional[int]
self,
service: ApplicationService,
users: Collection[Union[str, UserID]],
from_key: int,
) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
Expand All @@ -338,12 +343,6 @@ async def _handle_receipts(
A list of JSON dictionaries containing data derived from the read receipts that
should be sent to the given application service.
"""
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
if new_token is not None and new_token <= from_key:
raise Exception("Rejecting token lower than stored: %s" % (new_token,))

receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
Expand All @@ -354,7 +353,7 @@ async def _handle_presence(
self,
service: ApplicationService,
users: Collection[Union[str, UserID]],
new_token: Optional[int],
from_key: int,
) -> List[JsonDict]:
"""
Return the latest presence updates that the given application service should receive.
Expand All @@ -374,11 +373,6 @@ async def _handle_presence(
"""
events: List[JsonDict] = []
presence_source = self.event_sources.sources.presence
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
if new_token is not None and new_token <= from_key:
raise Exception("Rejecting token lower than stored: %s" % (new_token,))

for user in users:
if isinstance(user, str):
Expand Down

0 comments on commit 8e5e670

Please sign in to comment.