Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor MSC3030 /timestamp_to_event to move away from our snowflake pull from destination pattern #14096

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14096.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor [MSC3030](https://github.com/matrix-org/matrix-spec-proposals/pull/3030) `/timestamp_to_event` endpoint to loop over federation destinations with standard pattern and error handling.
130 changes: 109 additions & 21 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@
T = TypeVar("T")


@attr.s(frozen=True, slots=True, auto_attribs=True)
class PulledPduInfo:
"""
A result object that stores the PDU and info about it like which homeserver we
pulled it from (`pull_origin`)
"""

pdu: EventBase
# Which homeserver we pulled the PDU from
pull_origin: str


class InvalidResponseError(RuntimeError):
"""Helper for _try_destination_list: indicates that the server returned a response
we couldn't parse
Expand Down Expand Up @@ -114,7 +126,9 @@ def __init__(self, hs: "HomeServer"):
self.hostname = hs.hostname
self.signing_key = hs.signing_key

self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache(
# Cache mapping `event_id` to a tuple of the event itself and the `pull_origin`
# (which server we pulled the event from)
self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
max_len=1000,
Expand Down Expand Up @@ -352,11 +366,11 @@ async def _record_failure_callback(
@tag_args
async def get_pdu(
self,
destinations: Iterable[str],
destinations: Collection[str],
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed that we iterate over destinations twice so if it was actually use an Iterable anywhere it would get exhausted the first time we iterate in the log before we actually went over each destination.

event_id: str,
room_version: RoomVersion,
timeout: Optional[int] = None,
) -> Optional[EventBase]:
) -> Optional[PulledPduInfo]:
"""Requests the PDU with given origin and ID from the remote home
servers.

Expand All @@ -371,11 +385,11 @@ async def get_pdu(
moving to the next destination. None indicates no timeout.

Returns:
The requested PDU, or None if we were unable to find it.
The requested PDU wrapped in `PulledPduInfo`, or None if we were unable to find it.
"""

logger.debug(
"get_pdu: event_id=%s from destinations=%s", event_id, destinations
"get_pdu(event_id=%s): from destinations=%s", event_id, destinations
)

# TODO: Rate limit the number of times we try and get the same event.
Expand All @@ -384,19 +398,25 @@ async def get_pdu(
# it gets persisted to the database), so we cache the results of the lookup.
# Note that this is separate to the regular get_event cache which caches
# events once they have been persisted.
event = self._get_pdu_cache.get(event_id)
get_pdu_cache_entry = self._get_pdu_cache.get(event_id)

event = None
pull_origin = None
if get_pdu_cache_entry:
event, pull_origin = get_pdu_cache_entry
# If we don't see the event in the cache, go try to fetch it from the
# provided remote federated destinations
if not event:
else:
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})

# TODO: We can probably refactor this to use `_try_destination_list`
for destination in destinations:
now = self._clock.time_msec()
last_attempt = pdu_attempts.get(destination, 0)
if last_attempt + PDU_RETRY_TIME_MS > now:
logger.debug(
"get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
"get_pdu(event_id=%s): skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
event_id,
destination,
last_attempt,
PDU_RETRY_TIME_MS,
Expand All @@ -411,43 +431,48 @@ async def get_pdu(
room_version=room_version,
timeout=timeout,
)
pull_origin = destination

pdu_attempts[destination] = now

if event:
# Prime the cache
self._get_pdu_cache[event.event_id] = event
self._get_pdu_cache[event.event_id] = (event, pull_origin)

# Now that we have an event, we can break out of this
# loop and stop asking other destinations.
break

except NotRetryingDestination as e:
logger.info("get_pdu(event_id=%s): %s", event_id, e)
continue
except FederationDeniedError:
logger.info(
"get_pdu(event_id=%s): Not attempting to fetch PDU from %s because the homeserver is not on our federation whitelist",
event_id,
destination,
)
continue
except SynapseError as e:
logger.info(
"Failed to get PDU %s from %s because %s",
"get_pdu(event_id=%s): Failed to get PDU from %s because %s",
event_id,
destination,
e,
)
continue
except NotRetryingDestination as e:
logger.info(str(e))
continue
except FederationDeniedError as e:
logger.info(str(e))
continue
except Exception as e:
pdu_attempts[destination] = now

logger.info(
"Failed to get PDU %s from %s because %s",
"get_pdu(event_id=): Failed to get PDU from %s because %s",
event_id,
destination,
e,
)
continue

if not event:
if not event or not pull_origin:
return None

# `event` now refers to an object stored in `get_pdu_cache`. Our
Expand All @@ -459,7 +484,7 @@ async def get_pdu(
event.room_version,
)

return event_copy
return PulledPduInfo(event_copy, pull_origin)

@trace
@tag_args
Expand Down Expand Up @@ -699,12 +724,14 @@ async def _check_sigs_and_hash_and_fetch_one(
pdu_origin = get_domain_from_id(pdu.sender)
if not res and pdu_origin != origin:
try:
res = await self.get_pdu(
pulled_pdu_info = await self.get_pdu(
destinations=[pdu_origin],
event_id=pdu.event_id,
room_version=room_version,
timeout=10000,
)
if pulled_pdu_info is not None:
res = pulled_pdu_info.pdu
except SynapseError:
pass

Expand Down Expand Up @@ -806,6 +833,7 @@ async def _try_destination_list(
)

for destination in destinations:
# We don't want to ask our own server for information we don't have
if destination == self.server_name:
continue

Expand All @@ -814,9 +842,21 @@ async def _try_destination_list(
except (
RequestSendFailed,
InvalidResponseError,
NotRetryingDestination,
) as e:
logger.warning("Failed to %s via %s: %s", description, destination, e)
# Skip to the next homeserver in the list to try.
continue
except NotRetryingDestination as e:
logger.info("%s: %s", description, e)
continue
except FederationDeniedError:
logger.info(
"%s: Not attempting to %s from %s because the homeserver is not on our federation whitelist",
description,
description,
destination,
)
continue
except UnsupportedRoomVersionError:
raise
except HttpResponseException as e:
Expand Down Expand Up @@ -1609,6 +1649,54 @@ async def send_request(
return result

async def timestamp_to_event(
self, *, destinations: List[str], room_id: str, timestamp: int, direction: str
) -> Optional["TimestampToEventResponse"]:
"""
Calls each remote federating server from `destinations` asking for their closest
event to the given timestamp in the given direction until we get a response.
Also validates the response to always return the expected keys or raises an
error.

Args:
destinations: The domains of homeservers to try fetching from
room_id: Room to fetch the event from
timestamp: The point in time (inclusive) we should navigate from in
the given direction to find the closest event.
direction: ["f"|"b"] to indicate whether we should navigate forward
or backward from the given timestamp to find the closest event.

Returns:
A parsed TimestampToEventResponse including the closest event_id
and origin_server_ts or None if no destination has a response.
"""

async def _timestamp_to_event_from_destination(
destination: str,
) -> TimestampToEventResponse:
return await self._timestamp_to_event_from_destination(
destination, room_id, timestamp, direction
)

try:
# Loop through each homeserver candidate until we get a succesful response
timestamp_to_event_response = await self._try_destination_list(
Comment on lines +1681 to +1682
Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this logic from synapse/handlers/room.py and refactored to use the generic _try_destination_list which handles this pattern of looping over destinations

"timestamp_to_event",
destinations,
# TODO: The requested timestamp may lie in a part of the
# event graph that the remote server *also* didn't have,
# in which case they will have returned another event
# which may be nowhere near the requested timestamp. In
# the future, we may need to reconcile that gap and ask
# other homeservers, and/or extend `/timestamp_to_event`
# to return events on *both* sides of the timestamp to
# help reconcile the gap faster.
_timestamp_to_event_from_destination,
)
return timestamp_to_event_response
except SynapseError:
return None

async def _timestamp_to_event_from_destination(
self, destination: str, room_id: str, timestamp: int, direction: str
) -> "TimestampToEventResponse":
"""
Expand Down
15 changes: 9 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,15 @@ async def try_backfill(domains: Collection[str]) -> bool:
# appropriate stuff.
# TODO: We can probably do something more intelligent here.
return True
except NotRetryingDestination as e:
logger.info("_maybe_backfill_inner: %s", e)
continue
except FederationDeniedError:
logger.info(
"_maybe_backfill_inner: Not attempting to backfill from %s because the homeserver is not on our federation whitelist",
dom,
)
continue
except (SynapseError, InvalidResponseError) as e:
logger.info("Failed to backfill from %s because %s", dom, e)
continue
Expand Down Expand Up @@ -477,15 +486,9 @@ async def try_backfill(domains: Collection[str]) -> bool:

logger.info("Failed to backfill from %s because %s", dom, e)
continue
except NotRetryingDestination as e:
logger.info(str(e))
continue
except RequestSendFailed as e:
logger.info("Failed to get backfill from %s because %s", dom, e)
continue
except FederationDeniedError as e:
logger.info(e)
continue
except Exception as e:
logger.exception("Failed to backfill from %s because %s", dom, e)
continue
Expand Down
31 changes: 14 additions & 17 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
)
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.federation.federation_client import InvalidResponseError, PulledPduInfo
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import (
SynapseTags,
Expand Down Expand Up @@ -1517,8 +1517,8 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No
)

async def backfill_event_id(
self, destination: str, room_id: str, event_id: str
) -> EventBase:
self, destinations: List[str], room_id: str, event_id: str
) -> PulledPduInfo:
"""Backfill a single event and persist it as a non-outlier which means
we also pull in all of the state and auth events necessary for it.

Expand All @@ -1530,38 +1530,35 @@ async def backfill_event_id(
Raises:
FederationError if we are unable to find the event from the destination
"""
logger.info(
"backfill_event_id: event_id=%s from destination=%s", event_id, destination
)
logger.info("backfill_event_id: event_id=%s", event_id)

room_version = await self._store.get_room_version(room_id)

event_from_response = await self._federation_client.get_pdu(
[destination],
pulled_pdu_info = await self._federation_client.get_pdu(
destinations,
event_id,
room_version,
)

if not event_from_response:
if not pulled_pdu_info:
raise FederationError(
"ERROR",
404,
"Unable to find event_id=%s from destination=%s to backfill."
% (event_id, destination),
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
f"Unable to find event_id={event_id} from remote servers to backfill.",
affected=event_id,
)

# Persist the event we just fetched, including pulling all of the state
# and auth events to de-outlier it. This also sets up the necessary
# `state_groups` for the event.
await self._process_pulled_events(
destination,
[event_from_response],
pulled_pdu_info.pull_origin,
[pulled_pdu_info.pdu],
# Prevent notifications going to clients
backfilled=True,
)

return event_from_response
return pulled_pdu_info

@trace
@tag_args
Expand All @@ -1584,19 +1581,19 @@ async def _get_events_and_persist(
async def get_event(event_id: str) -> None:
with nested_logging_context(event_id):
try:
event = await self._federation_client.get_pdu(
pulled_pdu_info = await self._federation_client.get_pdu(
[destination],
event_id,
room_version,
)
if event is None:
if pulled_pdu_info is None:
logger.warning(
"Server %s didn't return event %s",
destination,
event_id,
)
return
events.append(event)
events.append(pulled_pdu_info.pdu)

except Exception as e:
logger.warning(
Expand Down
Loading