Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add a linearizer on (appservice, stream) when handling ephemeral events. #11207

Merged
23 changes: 18 additions & 5 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def __init__(self, hs: "HomeServer"):
self.current_max = 0
self.is_processing = False

self._ephemeral_events_linearizer = Linearizer(name="appservice_ephemeral_events")
self._ephemeral_events_linearizer = Linearizer(
name="appservice_ephemeral_events"
)

def notify_interested_services(self, max_token: RoomStreamToken) -> None:
"""Notifies (pushes) all application services interested in this event.
Expand Down Expand Up @@ -260,7 +262,7 @@ async def _notify_interested_services_ephemeral(
)
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service)
events = await self._handle_receipts(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
Expand All @@ -272,7 +274,7 @@ async def _notify_interested_services_ephemeral(
)

elif stream_key == "presence_key":
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
events = await self._handle_presence(service, users)
events = await self._handle_presence(service, users, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(
service, events
Expand Down Expand Up @@ -318,7 +320,9 @@ async def _handle_typing(
)
return typing

async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
async def _handle_receipts(
self, service: ApplicationService, new_token: Optional[int]
) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.

Expand All @@ -337,14 +341,20 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
if new_token is not None and new_token <= from_key:
raise Exception("Rejecting token lower than stored: %s" % (new_token,))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would just log at the debug level and return an empty list in this case. I'm not sure what we'd want to handle the exception (there's not much to do at this point).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in bc991c2 - I'm unsure if there are situations where it's desirable to set the new token when there's no events?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's always desireable to set the token, even if there are no events. As that means we've checked that there are no events to send up until that stream token.

If we don't set it, then when a new stream token comes in we end up checking a token range that overlaps what we've already checked in the previous call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now fixed in 88d2d5c + 2d924c5


receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
)
return receipts

async def _handle_presence(
self, service: ApplicationService, users: Collection[Union[str, UserID]]
self,
service: ApplicationService,
users: Collection[Union[str, UserID]],
new_token: Optional[int],
) -> List[JsonDict]:
"""
Return the latest presence updates that the given application service should receive.
Expand All @@ -367,6 +377,9 @@ async def _handle_presence(
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
if new_token is not None and new_token <= from_key:
raise Exception("Rejecting token lower than stored: %s" % (new_token,))
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

for user in users:
if isinstance(user, str):
user = UserID.from_string(user)
Expand Down