-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add a linearizer on (appservice, stream) when handling ephemeral events. #11207
Changes from 5 commits
4726c83
c25bc99
7959d0e
1d9ea27
f8f3c4a
1f2b14c
9445883
bc991c2
451c31f
88d2d5c
2d924c5
e76e59c
36ecd70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ | |
) | ||
from synapse.storage.databases.main.directory import RoomAliasMapping | ||
from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID | ||
from synapse.util.async_helpers import Linearizer | ||
from synapse.util.metrics import Measure | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -58,6 +59,10 @@ def __init__(self, hs: "HomeServer"): | |
self.current_max = 0 | ||
self.is_processing = False | ||
|
||
self._ephemeral_events_linearizer = Linearizer( | ||
name="appservice_ephemeral_events" | ||
) | ||
|
||
def notify_interested_services(self, max_token: RoomStreamToken) -> None: | ||
"""Notifies (pushes) all application services interested in this event. | ||
|
||
|
@@ -248,26 +253,37 @@ async def _notify_interested_services_ephemeral( | |
events = await self._handle_typing(service, new_token) | ||
if events: | ||
self.scheduler.submit_ephemeral_events_for_as(service, events) | ||
continue | ||
|
||
elif stream_key == "receipt_key": | ||
events = await self._handle_receipts(service) | ||
if events: | ||
self.scheduler.submit_ephemeral_events_for_as(service, events) | ||
|
||
# Persist the latest handled stream token for this appservice | ||
await self.store.set_type_stream_id_for_appservice( | ||
service, "read_receipt", new_token | ||
# Since we read/update the stream position for this AS/stream | ||
with ( | ||
await self._ephemeral_events_linearizer.queue( | ||
(service.id, stream_key) | ||
) | ||
): | ||
if stream_key == "receipt_key": | ||
events = await self._handle_receipts(service, new_token) | ||
if events: | ||
self.scheduler.submit_ephemeral_events_for_as( | ||
service, events | ||
) | ||
|
||
# Persist the latest handled stream token for this appservice | ||
await self.store.set_type_stream_id_for_appservice( | ||
service, "read_receipt", new_token | ||
) | ||
|
||
elif stream_key == "presence_key": | ||
events = await self._handle_presence(service, users) | ||
if events: | ||
self.scheduler.submit_ephemeral_events_for_as(service, events) | ||
elif stream_key == "presence_key": | ||
anoadragon453 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
events = await self._handle_presence(service, users, new_token) | ||
if events: | ||
self.scheduler.submit_ephemeral_events_for_as( | ||
service, events | ||
) | ||
|
||
# Persist the latest handled stream token for this appservice | ||
await self.store.set_type_stream_id_for_appservice( | ||
service, "presence", new_token | ||
) | ||
# Persist the latest handled stream token for this appservice | ||
await self.store.set_type_stream_id_for_appservice( | ||
service, "presence", new_token | ||
) | ||
|
||
async def _handle_typing( | ||
self, service: ApplicationService, new_token: int | ||
|
@@ -304,7 +320,9 @@ async def _handle_typing( | |
) | ||
return typing | ||
|
||
async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: | ||
async def _handle_receipts( | ||
self, service: ApplicationService, new_token: Optional[int] | ||
) -> List[JsonDict]: | ||
""" | ||
Return the latest read receipts that the given application service should receive. | ||
|
||
|
@@ -323,14 +341,20 @@ async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: | |
from_key = await self.store.get_type_stream_id_for_appservice( | ||
service, "read_receipt" | ||
) | ||
if new_token is not None and new_token <= from_key: | ||
raise Exception("Rejecting token lower than stored: %s" % (new_token,)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I would just log at the debug level and return an empty list in this case. I'm not sure what we'd want to handle the exception (there's not much to do at this point). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added in bc991c2 - I'm unsure if there are situations where it's desirable to set the new token when there's no events? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's always desireable to set the token, even if there are no events. As that means we've checked that there are no events to send up until that stream token. If we don't set it, then when a new stream token comes in we end up checking a token range that overlaps what we've already checked in the previous call. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
receipts_source = self.event_sources.sources.receipt | ||
receipts, _ = await receipts_source.get_new_events_as( | ||
service=service, from_key=from_key | ||
) | ||
return receipts | ||
|
||
async def _handle_presence( | ||
self, service: ApplicationService, users: Collection[Union[str, UserID]] | ||
self, | ||
service: ApplicationService, | ||
users: Collection[Union[str, UserID]], | ||
new_token: Optional[int], | ||
) -> List[JsonDict]: | ||
""" | ||
Return the latest presence updates that the given application service should receive. | ||
|
@@ -353,6 +377,9 @@ async def _handle_presence( | |
from_key = await self.store.get_type_stream_id_for_appservice( | ||
service, "presence" | ||
) | ||
if new_token is not None and new_token <= from_key: | ||
raise Exception("Rejecting token lower than stored: %s" % (new_token,)) | ||
anoadragon453 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
for user in users: | ||
if isinstance(user, str): | ||
user = UserID.from_string(user) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we're sure that we're processing events in order, we should probably add a conditional that bails out early if
new_token
is less than or equal to the stored stream token for the appservice/stream ID combo.This should probably be done right after we call
get_type_stream_id_for_appservice
in_handle_receipts
and_handle_presence
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in f8f3c4a. I also attempted to de-dupe some of the logic here but not sure it's any better as the handle functions take different arguments; pushed to Fizzadar@8e5e670, I can pull that in if desired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a very useful refactor, I'd love to pull it in. However, let's include it in a separate PR if you don't mind - as it will make the overall diff a bit messier.