Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add support for MSC2716 marker events (federation) #10498

Merged
merged 63 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
d2e2aa7
Make historical messages available to federated servers
MadLittleMods Jun 24, 2021
2d942ec
Debug message not available on federation
MadLittleMods Jun 25, 2021
38bcf13
Add base starting insertion point when no chunk ID is provided
MadLittleMods Jun 25, 2021
e405a23
Fix messages from multiple senders in historical chunk
MadLittleMods Jun 29, 2021
36f1565
Remove debug lines
MadLittleMods Jun 29, 2021
05d6c51
Messing with selecting insertion event extremeties
MadLittleMods Jul 7, 2021
defc536
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 7, 2021
dfad8a8
Move db schema change to new version
MadLittleMods Jul 7, 2021
7d850db
Add more better comments
MadLittleMods Jul 7, 2021
164dee4
Make a fake requester with just what we need
MadLittleMods Jul 7, 2021
04b1f7e
Store insertion events in table
MadLittleMods Jul 8, 2021
b703962
Make base insertion event float off on its own
MadLittleMods Jul 8, 2021
8c205e5
Validate that the app service can actually control the given user
MadLittleMods Jul 9, 2021
7b8b2d1
Add some better comments on what we're trying to check for
MadLittleMods Jul 9, 2021
281588f
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 9, 2021
4226165
Continue debugging
MadLittleMods Jul 12, 2021
baae5d8
Share validation logic
MadLittleMods Jul 12, 2021
c05e43b
Add inserted historical messages to /backfill response
MadLittleMods Jul 13, 2021
02b1bea
Remove debug sql queries
MadLittleMods Jul 13, 2021
66cf5be
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 13, 2021
ab8011b
Some marker event implemntation trials
MadLittleMods Jul 14, 2021
f20ba02
Clean up PR
MadLittleMods Jul 14, 2021
64aeb73
Rename insertion_event_id to just event_id
MadLittleMods Jul 14, 2021
ea7c30d
Add some better sql comments
MadLittleMods Jul 14, 2021
9a6fd3f
More accurate description
MadLittleMods Jul 14, 2021
0f6179f
Add changelog
MadLittleMods Jul 14, 2021
5970e3f
Make it clear what MSC the change is part of
MadLittleMods Jul 14, 2021
bc13396
Add more detail on which insertion event came through
MadLittleMods Jul 14, 2021
669da52
Address review and improve sql queries
MadLittleMods Jul 14, 2021
9a86e05
Only use event_id as unique constraint
MadLittleMods Jul 14, 2021
8999567
Fix test case where insertion event is already in the normal DAG
MadLittleMods Jul 15, 2021
35a4569
Remove debug changes
MadLittleMods Jul 15, 2021
164e32b
Add support for MSC2716 marker events
MadLittleMods Jul 16, 2021
435f074
Process markers when we receive it over federation
MadLittleMods Jul 16, 2021
e0e1bd0
WIP: make hs2 backfill historical messages after marker event
MadLittleMods Jul 17, 2021
d63c34c
hs2 to better ask for insertion event extremity
MadLittleMods Jul 17, 2021
2196ba5
Add insertion_event_extremities table
MadLittleMods Jul 17, 2021
b2be8ce
Switch to chunk events so we can auth via power_levels
MadLittleMods Jul 20, 2021
04a29fe
Switch to chunk events for federation
MadLittleMods Jul 20, 2021
258fa57
Add unstable room version to support new historical PL
MadLittleMods Jul 20, 2021
8ebbc5f
Merge branch 'madlittlemods/2716-backfill-historical-events-for-feder…
MadLittleMods Jul 20, 2021
187ab28
Messy: Fix undefined state_group for federated historical events
MadLittleMods Jul 21, 2021
9d70e95
Revert "Messy: Fix undefined state_group for federated historical eve…
MadLittleMods Jul 21, 2021
9352635
Fix federated events being rejected for no state_groups
MadLittleMods Jul 21, 2021
5c454b7
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 21, 2021
e881cff
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 21, 2021
c9330ec
Merge branch 'develop' into madlittlemods/2716-backfill-historical-ev…
MadLittleMods Jul 23, 2021
347a3e1
Merge branch 'madlittlemods/2716-backfill-historical-events-for-feder…
MadLittleMods Jul 23, 2021
97fb158
Merge branch 'develop' into madlittlemods/2716-marker-events
MadLittleMods Jul 28, 2021
f115aec
Adapting to experimental room version
MadLittleMods Jul 29, 2021
b55315f
Some log cleanup
MadLittleMods Jul 29, 2021
088c3ef
Add better comments around extremity fetching code and why
MadLittleMods Jul 29, 2021
8072170
Rename to be more accurate to what the function returns
MadLittleMods Jul 29, 2021
44b883c
Add changelog
MadLittleMods Jul 29, 2021
5268749
Ignore rejected events
MadLittleMods Aug 3, 2021
71c2f05
Use simplified upsert
MadLittleMods Aug 3, 2021
b832264
Add Erik's explanation of extra event checks
MadLittleMods Aug 3, 2021
1dc0996
Clarify that the depth is not directly correlated to the backwards ex…
MadLittleMods Aug 4, 2021
32af944
lock only matters for sqlite
MadLittleMods Aug 4, 2021
1b7e627
Move new SQL changes to its own delta file
MadLittleMods Aug 4, 2021
e6e48ed
Clean up upsert docstring
MadLittleMods Aug 4, 2021
23bc5e5
Bump database schema version (62)
MadLittleMods Aug 4, 2021
92dd985
Merge branch 'develop' into madlittlemods/2716-marker-events-v2
MadLittleMods Aug 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10498.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for "marker" events which makes historical events discoverable for servers that already have all of the scrollback history (part of MSC2716).
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
fi

# Run the tests!
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
119 changes: 113 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

from synapse import event_auth
from synapse.api.constants import (
EventContentFields,
EventTypes,
Membership,
RejectedReason,
Expand Down Expand Up @@ -262,7 +263,12 @@ async def on_receive_pdu(

state = None

# Get missing pdus if necessary.
# Check that the event passes auth based on the state at the event. This is
# done for events that are to be added to the timeline (non-outliers).
#
# Get missing pdus if necessary:
# - Fetching any missing prev events to fill in gaps in the graph
# - Fetching state if we have a hole in the graph
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = await self.get_min_depth_for_context(pdu.room_id)
Expand Down Expand Up @@ -432,6 +438,13 @@ async def on_receive_pdu(
affected=event_id,
)

# A second round of checks for all events. Check that the event passes auth
# based on `auth_events`, this allows us to assert that the event would
# have been allowed at some point. If an event passes this check its OK
# for it to be used as part of a returned `/state` request, as either
# a) we received the event as part of the original join and so trust it, or
# b) we'll do a state resolution with existing state before it becomes
# part of the "current state", which adds more protection.
await self._process_received_pdu(origin, pdu, state=state)

async def _get_missing_events_for_pdu(
Expand Down Expand Up @@ -889,6 +902,79 @@ async def _process_received_pdu(
"resync_device_due_to_pdu", self._resync_device, event.sender
)

await self._handle_marker_event(origin, event)

async def _handle_marker_event(self, origin: str, marker_event: EventBase):
"""Handles backfilling the insertion event when we receive a marker
event that points to one.

Args:
origin: Origin of the event. Will be called to get the insertion event
marker_event: The event to process
"""

if marker_event.type != EventTypes.MSC2716_MARKER:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
# Not a marker event
return

if marker_event.rejected_reason is not None:
# Rejected event
return

# Skip processing a marker event if the room version doesn't
# support it.
room_version = await self.store.get_room_version(marker_event.room_id)
if not room_version.msc2716_historical:
return

logger.debug("_handle_marker_event: received %s", marker_event)

insertion_event_id = marker_event.content.get(
EventContentFields.MSC2716_MARKER_INSERTION
)

if insertion_event_id is None:
# Nothing to retrieve then (invalid marker)
return

logger.debug(
"_handle_marker_event: backfilling insertion event %s", insertion_event_id
)

await self._get_events_and_persist(
origin,
marker_event.room_id,
[insertion_event_id],
)

insertion_event = await self.store.get_event(
insertion_event_id, allow_none=True
)
if insertion_event is None:
logger.warning(
"_handle_marker_event: server %s didn't return insertion event %s for marker %s",
origin,
insertion_event_id,
marker_event.event_id,
)
return

logger.debug(
"_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
insertion_event,
marker_event,
)

await self.store.insert_insertion_extremity(
insertion_event_id, marker_event.room_id
)

logger.debug(
"_handle_marker_event: insertion extremity added for %s from marker event %s",
insertion_event,
marker_event,
)

async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.
Expand Down Expand Up @@ -1057,9 +1143,19 @@ async def maybe_backfill(
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
oldest_events_with_depth = (
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
)
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
)
logger.debug(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,
insertion_events_to_be_backfilled,
)

if not extremities:
if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
logger.debug("Not backfilling as no extremeties found.")
return False

Expand Down Expand Up @@ -1089,10 +1185,12 @@ async def _maybe_backfill_inner(
# state *before* the event, ignoring the special casing certain event
# types have.

forward_events = await self.store.get_successor_events(list(extremities))
forward_event_ids = await self.store.get_successor_events(
list(oldest_events_with_depth)
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

extremities_events = await self.store.get_events(
forward_events,
forward_event_ids,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
)
Expand All @@ -1106,10 +1204,19 @@ async def _maybe_backfill_inner(
redact=False,
check_history_visibility_only=True,
)
logger.debug(
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
)

if not filtered_extremities:
if not filtered_extremities and not insertion_events_to_be_backfilled:
return False

extremities = {
**oldest_events_with_depth,
# TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
**insertion_events_to_be_backfilled,
}

# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
max_depth = sorted_extremeties_tuple[0][1]
Expand Down
14 changes: 7 additions & 7 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,13 +941,13 @@ async def simple_upsert(

`lock` should generally be set to True (the default), but can be set
to False if either of the following are true:

* there is a UNIQUE INDEX on the key columns. In this case a conflict
will cause an IntegrityError in which case this function will retry
the update.

* we somehow know that we are the only thread which will be updating
this table.
1. there is a UNIQUE INDEX on the key columns. In this case a conflict
will cause an IntegrityError in which case this function will retry
the update.
2. we somehow know that we are the only thread which will be updating
this table.
As an additional note, this parameter only matters for old SQLite versions
because we will use native upserts otherwise.

Args:
table: The table to upsert into
Expand Down
114 changes: 98 additions & 16 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,27 +671,97 @@ def _get_auth_chain_difference_txn(
# Return all events where not all sets can reach them.
return {eid for eid, n in event_to_missing_sets.items() if n}

async def get_oldest_events_with_depth_in_room(self, room_id):
async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
"""Gets the oldest events(backwards extremities) in the room along with the
aproximate depth.

We use this function so that we can compare and see if someones current
depth at their current scrollback is within pagination range of the
event extremeties. If the current depth is close to the depth of given
oldest event, we can trigger a backfill.

Args:
room_id: Room where we want to find the oldest events

Returns:
Map from event_id to depth
"""

def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
# Assemble a dictionary with event_id -> depth for the oldest events
# we know of in the room. Backwards extremeties are the oldest
# events we know of in the room but we only know of them because
# some other event referenced them by prev_event and aren't peristed
# in our database yet (meaning we don't know their depth
# specifically). So we need to look for the aproximate depth from
# the events connected to the current backwards extremeties.
sql = """
SELECT b.event_id, MAX(e.depth) FROM events as e
/**
* Get the edge connections from the event_edges table
* so we can see whether this event's prev_events points
* to a backward extremity in the next join.
*/
INNER JOIN event_edges as g
ON g.event_id = e.event_id
/**
* We find the "oldest" events in the room by looking for
* events connected to backwards extremeties (oldest events
* in the room that we know of so far).
*/
INNER JOIN event_backward_extremities as b
ON g.prev_event_id = b.event_id
WHERE b.room_id = ? AND g.is_state is ?
GROUP BY b.event_id
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No change to the query, just new """ syntax and comments


txn.execute(sql, (room_id, False))

return dict(txn)

return await self.db_pool.runInteraction(
"get_oldest_events_with_depth_in_room",
self.get_oldest_events_with_depth_in_room_txn,
"get_oldest_event_ids_with_depth_in_room",
get_oldest_event_ids_with_depth_in_room_txn,
room_id,
)

def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
" ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
" ON g.prev_event_id = b.event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
)
async def get_insertion_event_backwards_extremities_in_room(
self, room_id
) -> Dict[str, int]:
"""Get the insertion events we know about that we haven't backfilled yet.

txn.execute(sql, (room_id, False))
We use this function so that we can compare and see if someones current
depth at their current scrollback is within pagination range of the
insertion event. If the current depth is close to the depth of given
insertion event, we can trigger a backfill.

return dict(txn)
Args:
room_id: Room where we want to find the oldest events

Returns:
Map from event_id to depth
"""

def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
sql = """
SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
/* We only want insertion events that are also marked as backwards extremities */
INNER JOIN insertion_event_extremities as b USING (event_id)
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
WHERE b.room_id = ?
GROUP BY b.event_id
"""

txn.execute(sql, (room_id,))

return dict(txn)

return await self.db_pool.runInteraction(
"get_insertion_event_backwards_extremities_in_room",
get_insertion_event_backwards_extremities_in_room_txn,
room_id,
)

async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
"""Returns the event ID and depth for the event that has the max depth from a set of event IDs
Expand Down Expand Up @@ -1041,7 +1111,6 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
if row[1] not in event_results:
queue.put((-row[0], row[1]))

# Navigate up the DAG by prev_event
txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = txn.fetchall()
logger.debug(
Expand Down Expand Up @@ -1136,6 +1205,19 @@ def _delete_old_forward_extrem_cache_txn(txn):
_delete_old_forward_extrem_cache_txn,
)

async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
await self.db_pool.simple_upsert(
table="insertion_event_extremities",
keyvalues={"event_id": event_id},
values={
"event_id": event_id,
"room_id": room_id,
},
insertion_values={},
desc="insert_insertion_extremity",
lock=False,
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:
Expand Down
24 changes: 20 additions & 4 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1845,6 +1845,18 @@ def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
},
)

# When we receive an event with a `chunk_id` referencing the
# `next_chunk_id` of the insertion event, we can remove it from the
# `insertion_event_extremities` table.
sql = """
DELETE FROM insertion_event_extremities WHERE event_id IN (
SELECT event_id FROM insertion_events
WHERE next_chunk_id = ?
)
"""
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

txn.execute(sql, (chunk_id,))

def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.
Expand Down Expand Up @@ -2101,15 +2113,17 @@ def _update_backward_extremeties(self, txn, events):

Forward extremities are handled when we first start persisting the events.
"""
# From the events passed in, add all of the prev events as backwards extremities.
# Ignore any events that are already backwards extrems or outliers.
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" )"
)

Expand All @@ -2123,6 +2137,8 @@ def _update_backward_extremeties(self, txn, events):
],
)

# Delete all these events that we've already fetched and now know that their
# prev events are the new backwards extremeties.
query = (
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
Expand Down
Loading