Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add base starting insertion event when no chunk ID is provided (MSC2716) #10250

Merged
merged 5 commits into from
Jul 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10250.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add base starting insertion event when no chunk ID is specified in the historical batch send API.
8 changes: 8 additions & 0 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,9 @@ async def create_event(
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
Expand Down Expand Up @@ -772,6 +775,7 @@ async def create_and_send_nonmember_event(
txn_id: Optional[str] = None,
ignore_shadow_ban: bool = False,
outlier: bool = False,
historical: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, int]:
"""
Expand Down Expand Up @@ -799,6 +803,9 @@ async def create_and_send_nonmember_event(
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
Expand Down Expand Up @@ -847,6 +854,7 @@ async def create_and_send_nonmember_event(
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
outlier=outlier,
historical=historical,
depth=depth,
)

Expand Down
112 changes: 89 additions & 23 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,35 @@ async def inherit_depth_from_prev_ids(self, prev_event_ids) -> int:

return depth

def _create_insertion_event_dict(
self, sender: str, room_id: str, origin_server_ts: int
):
"""Creates an event dict for an "insertion" event with the proper fields
and a random chunk ID.

Args:
sender: The event author MXID
room_id: The room ID that the event belongs to
origin_server_ts: Timestamp when the event was sent

Returns:
Tuple of event ID and stream ordering position
"""

next_chunk_id = random_string(8)
insertion_event = {
"type": EventTypes.MSC2716_INSERTION,
"sender": sender,
"room_id": room_id,
"content": {
EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
"origin_server_ts": origin_server_ts,
}

return insertion_event

async def on_POST(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=False)

Expand Down Expand Up @@ -449,37 +478,68 @@ async def on_POST(self, request, room_id):

events_to_create = body["events"]

# If provided, connect the chunk to the last insertion point
# The chunk ID passed in comes from the chunk_id in the
# "insertion" event from the previous chunk.
prev_event_ids = prev_events_from_query
inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query)

# Figure out which chunk to connect to. If they passed in
# chunk_id_from_query let's use it. The chunk ID passed in comes
# from the chunk_id in the "insertion" event from the previous chunk.
last_event_in_chunk = events_to_create[-1]
chunk_id_to_connect_to = chunk_id_from_query
base_insertion_event = None
if chunk_id_from_query:
last_event_in_chunk = events_to_create[-1]
last_event_in_chunk["content"][
EventContentFields.MSC2716_CHUNK_ID
] = chunk_id_from_query
# TODO: Verify the chunk_id_from_query corresponds to an insertion event
pass
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a database index to make this lookup fast. Going to add this in #10245

# Otherwise, create an insertion event to act as a starting point.
#
# We don't always have an insertion event to start hanging more history
# off of (ideally there would be one in the main DAG, but that's not the
# case if we're wanting to add history to e.g. existing rooms without
# an insertion event), in which case we just create a new insertion event
# that can then get pointed to by a "marker" event later.
else:
base_insertion_event_dict = self._create_insertion_event_dict(
sender=requester.user.to_string(),
room_id=room_id,
origin_server_ts=last_event_in_chunk["origin_server_ts"],
)
base_insertion_event_dict["prev_events"] = prev_event_ids.copy()

# Add an "insertion" event to the start of each chunk (next to the oldest
(
base_insertion_event,
_,
) = await self.event_creation_handler.create_and_send_nonmember_event(
requester,
base_insertion_event_dict,
prev_event_ids=base_insertion_event_dict.get("prev_events"),
auth_event_ids=auth_event_ids,
historical=True,
depth=inherited_depth,
)
Copy link
Contributor Author

@MadLittleMods MadLittleMods Jul 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikjohnston I've decided to make the base insertion event float off on its own so its easier to reason about in my head (all of them are hooked up consistently).


chunk_id_to_connect_to = base_insertion_event["content"][
EventContentFields.MSC2716_NEXT_CHUNK_ID
]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is where we are creating the base insertion event


# Connect this current chunk to the insertion event from the previous chunk
last_event_in_chunk["content"][
EventContentFields.MSC2716_CHUNK_ID
] = chunk_id_to_connect_to

# Add an "insertion" event to the start of each chunk (next to the oldest-in-time
# event in the chunk) so the next chunk can be connected to this one.
next_chunk_id = random_string(64)
insertion_event = {
"type": EventTypes.MSC2716_INSERTION,
"sender": requester.user.to_string(),
"content": {
EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
insertion_event = self._create_insertion_event_dict(
sender=requester.user.to_string(),
room_id=room_id,
# Since the insertion event is put at the start of the chunk,
# where the oldest event is, copy the origin_server_ts from
# where the oldest-in-time event is, copy the origin_server_ts from
# the first event we're inserting
"origin_server_ts": events_to_create[0]["origin_server_ts"],
}
origin_server_ts=events_to_create[0]["origin_server_ts"],
)
# Prepend the insertion event to the start of the chunk
events_to_create = [insertion_event] + events_to_create

inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query)

event_ids = []
prev_event_ids = prev_events_from_query
events_to_persist = []
for ev in events_to_create:
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
Expand Down Expand Up @@ -533,10 +593,16 @@ async def on_POST(self, request, room_id):
context=context,
)

# Add the base_insertion_event to the bottom of the list we return
if base_insertion_event is not None:
event_ids.append(base_insertion_event.event_id)

return 200, {
"state_events": auth_event_ids,
"events": event_ids,
"next_chunk_id": next_chunk_id,
"next_chunk_id": insertion_event["content"][
EventContentFields.MSC2716_NEXT_CHUNK_ID
],
}

def on_GET(self, request, room_id):
Expand Down