diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index e24d5c5cd369..aee6bc8d3762 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -262,29 +262,31 @@ async def _notify_interested_services_ephemeral( ) ): if stream_key == "receipt_key": - events = await self._handle_receipts(service, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) - - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "read_receipt", new_token - ) - + stream_id = "read_recipt" + handler = self._handle_receipts elif stream_key == "presence_key": - events = await self._handle_presence(service, users, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) - - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token + stream_id = "presence" + handler = self._handle_presence + else: + continue + + from_key = await self.store.get_type_stream_id_for_appservice( + service, stream_id + ) + if new_token is not None and new_token <= from_key: + raise Exception( + "Rejecting token lower than stored: %s" % (new_token,) ) + events = await handler(service, users, from_key) + if events: + self.scheduler.submit_ephemeral_events_for_as(service, events) + + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, stream_id, new_token + ) + async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: @@ -321,7 +323,10 @@ async def _handle_typing( return typing async def _handle_receipts( - self, service: ApplicationService, new_token: Optional[int] + self, + service: ApplicationService, + users: Collection[Union[str, UserID]], + from_key: int, ) -> List[JsonDict]: """ Return the latest read receipts that the given application service should receive. @@ -338,12 +343,6 @@ async def _handle_receipts( A list of JSON dictionaries containing data derived from the read receipts that should be sent to the given application service. """ - from_key = await self.store.get_type_stream_id_for_appservice( - service, "read_receipt" - ) - if new_token is not None and new_token <= from_key: - raise Exception("Rejecting token lower than stored: %s" % (new_token,)) - receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( service=service, from_key=from_key @@ -354,7 +353,7 @@ async def _handle_presence( self, service: ApplicationService, users: Collection[Union[str, UserID]], - new_token: Optional[int], + from_key: int, ) -> List[JsonDict]: """ Return the latest presence updates that the given application service should receive. @@ -374,11 +373,6 @@ async def _handle_presence( """ events: List[JsonDict] = [] presence_source = self.event_sources.sources.presence - from_key = await self.store.get_type_stream_id_for_appservice( - service, "presence" - ) - if new_token is not None and new_token <= from_key: - raise Exception("Rejecting token lower than stored: %s" % (new_token,)) for user in users: if isinstance(user, str):