Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Marker events as state - MSC2716 #12718

Merged
merged 10 commits into from
May 24, 2022
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ docker build -t $COMPLEMENT_BASE_IMAGE -f "docker/complement/$COMPLEMENT_DOCKERF
# Run the tests!
echo "Images built; running complement"
cd "$COMPLEMENT_DIR"
go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/...
go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
49 changes: 47 additions & 2 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,22 @@ async def process_remote_join(
# and discover that we do not have it.
event.internal_metadata.proactively_send = False

return await self.persist_events_and_notify(room_id, [(event, context)])
stream_id_after_persist = await self.persist_events_and_notify(
room_id, [(event, context)]
)

# If we're joining the room again, check if there is new marker
# state indicating that there is new history imported somewhere in
# the DAG.
#
# Do this after the state from the remote join was persisted (via
# `persist_events_and_notify`). Otherwise we can run into a
# situation where the create event doesn't exist yet in the
# `current_state_events`
for e in state:
await self._handle_marker_event(origin, e)

return stream_id_after_persist

async def update_state_for_partial_state_event(
self, destination: str, event: EventBase
Expand Down Expand Up @@ -1015,7 +1030,7 @@ async def _get_state_after_missing_prev_event(

async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
) -> List[EventBase]:
"""Get the complete room state at a given event, and persist any new events
as outliers"""
room_version = await self._store.get_room_version(room_id)
Expand All @@ -1024,6 +1039,14 @@ async def _get_state_and_persist(
)
logger.info("/state returned %i events", len(auth_events) + len(state_events))

logger.info(
"_get_state_and_persist auth_events(%d)=%s state_events(%d)=%s",
len(auth_events),
auth_events,
len(state_events),
state_events,
)

await self._auth_and_persist_outliers(
room_id, itertools.chain(auth_events, state_events)
)
Expand All @@ -1034,6 +1057,8 @@ async def _get_state_and_persist(
destination=destination, room_id=room_id, event_ids=(event_id,)
)

return auth_events + state_events

async def _process_received_pdu(
self,
origin: str,
Expand Down Expand Up @@ -1220,6 +1245,26 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No

logger.debug("_handle_marker_event: received %s", marker_event)

# TODO: Move this to a background queue
async def handle_marker_queue(marker_event: EventBase) -> None:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
# Get the state before the marker event
state_events = await self._get_state_and_persist(
origin, marker_event.room_id, marker_event.event_id
)
logger.info(
"handle_marker_queue marker_event=%s state_events=%s",
marker_event.event_id,
state_events,
)

# TODO: No need to keep going if the marker is already `have_seen_event`

for e in state_events:
await self._handle_marker_event(origin, e)

# TODO: add_to_queue
await handle_marker_queue(marker_event)

insertion_event_id = marker_event.content.get(
EventContentFields.MSC2716_MARKER_INSERTION
)
Expand Down