Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Rename MSC2716 things from chunk to batch to match /batch_send endpoint #10838

Merged
merged 11 commits into from
Sep 21, 2021
10 changes: 5 additions & 5 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class EventTypes:
SpaceParent = "m.space.parent"

MSC2716_INSERTION = "org.matrix.msc2716.insertion"
MSC2716_CHUNK = "org.matrix.msc2716.chunk"
MSC2716_BATCH = "org.matrix.msc2716.batch"
MSC2716_MARKER = "org.matrix.msc2716.marker"


Expand Down Expand Up @@ -200,11 +200,11 @@ class EventContentFields:

# Used on normal messages to indicate they were historically imported after the fact
MSC2716_HISTORICAL = "org.matrix.msc2716.historical"
# For "insertion" events to indicate what the next chunk ID should be in
# For "insertion" events to indicate what the next batch ID should be in
# order to connect to it
MSC2716_NEXT_CHUNK_ID = "org.matrix.msc2716.next_chunk_id"
# Used on "chunk" events to indicate which insertion event it connects to
MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
MSC2716_NEXT_BATCH_ID = "org.matrix.msc2716.next_batch_id"
# Used on "batch" events to indicate which insertion event it connects to
MSC2716_BATCH_ID = "org.matrix.msc2716.batch_id"
# For "marker" events
MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"

Expand Down
20 changes: 2 additions & 18 deletions synapse/api/room_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,24 +244,8 @@ class RoomVersions:
msc2716_historical=False,
msc2716_redactions=False,
)
MSC2716 = RoomVersion(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping support for older MSC2716 room versions so we don't have to worry about
supporting both chunk and batch events.

"org.matrix.msc2716",
RoomDisposition.UNSTABLE,
EventFormatVersions.V3,
StateResolutionVersions.V2,
enforce_key_validity=True,
special_case_aliases_auth=False,
strict_canonicaljson=True,
limit_notifications_power_levels=True,
msc2176_redaction_rules=False,
msc3083_join_rules=False,
msc3375_redaction_rules=False,
msc2403_knocking=True,
msc2716_historical=True,
msc2716_redactions=False,
)
MSC2716v2 = RoomVersion(
"org.matrix.msc2716v2",
MSC2716v3 = RoomVersion(
"org.matrix.msc2716v3",
RoomDisposition.UNSTABLE,
EventFormatVersions.V3,
StateResolutionVersions.V2,
Expand Down
8 changes: 4 additions & 4 deletions synapse/event_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def check(

if (
event.type == EventTypes.MSC2716_INSERTION
or event.type == EventTypes.MSC2716_CHUNK
or event.type == EventTypes.MSC2716_BATCH
or event.type == EventTypes.MSC2716_MARKER
):
check_historical(room_version_obj, event, auth_events)
Expand Down Expand Up @@ -549,14 +549,14 @@ def check_historical(
auth_events: StateMap[EventBase],
) -> None:
"""Check whether the event sender is allowed to send historical related
events like "insertion", "chunk", and "marker".
events like "insertion", "batch", and "marker".

Returns:
None

Raises:
AuthError if the event sender is not allowed to send historical related events
("insertion", "chunk", and "marker").
("insertion", "batch", and "marker").
"""
# Ignore the auth checks in room versions that do not support historical
# events
Expand All @@ -570,7 +570,7 @@ def check_historical(
if user_level < historical_level:
raise AuthError(
403,
'You don\'t have permission to send send historical related events ("insertion", "chunk", and "marker")',
'You don\'t have permission to send send historical related events ("insertion", "batch", and "marker")',
)


Expand Down
6 changes: 3 additions & 3 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ def add_fields(*fields):
elif event_type == EventTypes.Redaction and room_version.msc2176_redaction_rules:
add_fields("redacts")
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_INSERTION:
add_fields(EventContentFields.MSC2716_NEXT_CHUNK_ID)
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_CHUNK:
add_fields(EventContentFields.MSC2716_CHUNK_ID)
add_fields(EventContentFields.MSC2716_NEXT_BATCH_ID)
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_BATCH:
add_fields(EventContentFields.MSC2716_BATCH_ID)
elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_MARKER:
add_fields(EventContentFields.MSC2716_MARKER_INSERTION)

Expand Down
76 changes: 38 additions & 38 deletions synapse/rest/client/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,25 @@

class RoomBatchSendEventRestServlet(RestServlet):
"""
API endpoint which can insert a chunk of events historically back in time
API endpoint which can insert a batch of events historically back in time
next to the given `prev_event`.

`chunk_id` comes from `next_chunk_id `in the response of the batch send
endpoint and is derived from the "insertion" events added to each chunk.
`batch_id` comes from `next_batch_id `in the response of the batch send
endpoint and is derived from the "insertion" events added to each batch.
It's not required for the first batch send.

`state_events_at_start` is used to define the historical state events
needed to auth the events like join events. These events will float
outside of the normal DAG as outlier's and won't be visible in the chat
history which also allows us to insert multiple chunks without having a bunch
of `@mxid joined the room` noise between each chunk.
history which also allows us to insert multiple batches without having a bunch
of `@mxid joined the room` noise between each batch.

`events` is chronological chunk/list of events you want to insert.
There is a reverse-chronological constraint on chunks so once you insert
`events` is chronological list of events you want to insert.
There is a reverse-chronological constraint on batches so once you insert
some messages, you can only insert older ones after that.
tldr; Insert chunks from your most recent history -> oldest history.
tldr; Insert batches from your most recent history -> oldest history.

POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event=<eventID>&chunk_id=<chunkID>
POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event=<eventID>&batch_id=<batchID>
{
"events": [ ... ],
"state_events_at_start": [ ... ]
Expand Down Expand Up @@ -120,7 +120,7 @@ def _create_insertion_event_dict(
self, sender: str, room_id: str, origin_server_ts: int
):
"""Creates an event dict for an "insertion" event with the proper fields
and a random chunk ID.
and a random batch ID.

Args:
sender: The event author MXID
Expand All @@ -131,13 +131,13 @@ def _create_insertion_event_dict(
Tuple of event ID and stream ordering position
"""

next_chunk_id = random_string(8)
next_batch_id = random_string(8)
insertion_event = {
"type": EventTypes.MSC2716_INSERTION,
"sender": sender,
"room_id": room_id,
"content": {
EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
"origin_server_ts": origin_server_ts,
Expand Down Expand Up @@ -177,7 +177,7 @@ async def on_POST(self, request, room_id):
assert_params_in_dict(body, ["state_events_at_start", "events"])

prev_events_from_query = parse_strings_from_args(request.args, "prev_event")
chunk_id_from_query = parse_string(request, "chunk_id")
batch_id_from_query = parse_string(request, "batch_id")

if prev_events_from_query is None:
raise SynapseError(
Expand Down Expand Up @@ -277,18 +277,18 @@ async def on_POST(self, request, room_id):
prev_events_from_query
)

# Figure out which chunk to connect to. If they passed in
# chunk_id_from_query let's use it. The chunk ID passed in comes
# from the chunk_id in the "insertion" event from the previous chunk.
last_event_in_chunk = events_to_create[-1]
chunk_id_to_connect_to = chunk_id_from_query
# Figure out which batch to connect to. If they passed in
# batch_id_from_query let's use it. The batch ID passed in comes
# from the batch_id in the "insertion" event from the previous batch.
last_event_in_batch = events_to_create[-1]
batch_id_to_connect_to = batch_id_from_query
base_insertion_event = None
if chunk_id_from_query:
if batch_id_from_query:
# All but the first base insertion event should point at a fake
# event, which causes the HS to ask for the state at the start of
# the chunk later.
# the batch later.
prev_event_ids = [fake_prev_event_id]
# TODO: Verify the chunk_id_from_query corresponds to an insertion event
# TODO: Verify the batch_id_from_query corresponds to an insertion event
pass
# Otherwise, create an insertion event to act as a starting point.
#
Expand All @@ -303,7 +303,7 @@ async def on_POST(self, request, room_id):
base_insertion_event_dict = self._create_insertion_event_dict(
sender=requester.user.to_string(),
room_id=room_id,
origin_server_ts=last_event_in_chunk["origin_server_ts"],
origin_server_ts=last_event_in_batch["origin_server_ts"],
)
base_insertion_event_dict["prev_events"] = prev_event_ids.copy()

Expand All @@ -322,38 +322,38 @@ async def on_POST(self, request, room_id):
depth=inherited_depth,
)

chunk_id_to_connect_to = base_insertion_event["content"][
EventContentFields.MSC2716_NEXT_CHUNK_ID
batch_id_to_connect_to = base_insertion_event["content"][
EventContentFields.MSC2716_NEXT_BATCH_ID
]

# Connect this current chunk to the insertion event from the previous chunk
chunk_event = {
"type": EventTypes.MSC2716_CHUNK,
# Connect this current batch to the insertion event from the previous batch
batch_event = {
"type": EventTypes.MSC2716_BATCH,
"sender": requester.user.to_string(),
"room_id": room_id,
"content": {
EventContentFields.MSC2716_CHUNK_ID: chunk_id_to_connect_to,
EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to,
EventContentFields.MSC2716_HISTORICAL: True,
},
# Since the chunk event is put at the end of the chunk,
# Since the batch event is put at the end of the batch,
# where the newest-in-time event is, copy the origin_server_ts from
# the last event we're inserting
"origin_server_ts": last_event_in_chunk["origin_server_ts"],
"origin_server_ts": last_event_in_batch["origin_server_ts"],
}
# Add the chunk event to the end of the chunk (newest-in-time)
events_to_create.append(chunk_event)
# Add the batch event to the end of the batch (newest-in-time)
events_to_create.append(batch_event)

# Add an "insertion" event to the start of each chunk (next to the oldest-in-time
# event in the chunk) so the next chunk can be connected to this one.
# Add an "insertion" event to the start of each batch (next to the oldest-in-time
# event in the batch) so the next batch can be connected to this one.
insertion_event = self._create_insertion_event_dict(
sender=requester.user.to_string(),
room_id=room_id,
# Since the insertion event is put at the start of the chunk,
# Since the insertion event is put at the start of the batch,
# where the oldest-in-time event is, copy the origin_server_ts from
# the first event we're inserting
origin_server_ts=events_to_create[0]["origin_server_ts"],
)
# Prepend the insertion event to the start of the chunk (oldest-in-time)
# Prepend the insertion event to the start of the batch (oldest-in-time)
events_to_create = [insertion_event] + events_to_create

event_ids = []
Expand Down Expand Up @@ -420,8 +420,8 @@ async def on_POST(self, request, room_id):
return 200, {
"state_events": state_events_at_start,
"events": event_ids,
"next_chunk_id": insertion_event["content"][
EventContentFields.MSC2716_NEXT_CHUNK_ID
"next_batch_id": insertion_event["content"][
EventContentFields.MSC2716_NEXT_BATCH_ID
],
}

Expand Down
46 changes: 23 additions & 23 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,7 @@ def _update_metadata_tables_txn(
self._handle_event_relations(txn, event)

self._handle_insertion_event(txn, event)
self._handle_chunk_event(txn, event)
self._handle_batch_event(txn, event)

# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
Expand Down Expand Up @@ -1776,23 +1776,23 @@ def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
if not room_version.msc2716_historical:
return

next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
if next_chunk_id is None:
# Invalid insertion event without next chunk ID
next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
if next_batch_id is None:
# Invalid insertion event without next batch ID
return

logger.debug(
"_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
"_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
)

# Keep track of the insertion event and the chunk ID
# Keep track of the insertion event and the batch ID
self.db_pool.simple_insert_txn(
txn,
table="insertion_events",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"next_chunk_id": next_chunk_id,
"next_batch_id": next_batch_id,
},
)

Expand All @@ -1808,54 +1808,54 @@ def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
},
)

def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
"""Handles inserting the chunk edges/connections between the chunk event
def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase):
"""Handles inserting the batch edges/connections between the batch event
and an insertion event. Part of MSC2716.

Args:
txn: The database transaction object
event: The event to process
"""

if event.type != EventTypes.MSC2716_CHUNK:
# Not a chunk event
if event.type != EventTypes.MSC2716_BATCH:
# Not a batch event
return

# Skip processing a chunk event if the room version doesn't
# Skip processing a batch event if the room version doesn't
# support it.
room_version = self.store.get_room_version_txn(txn, event.room_id)
if not room_version.msc2716_historical:
return

chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
if chunk_id is None:
# Invalid chunk event without a chunk ID
batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
if batch_id is None:
# Invalid batch event without a batch ID
return

logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)

# Keep track of the insertion event and the chunk ID
# Keep track of the insertion event and the batch ID
self.db_pool.simple_insert_txn(
txn,
table="chunk_events",
table="batch_events",
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
values={
"event_id": event.event_id,
"room_id": event.room_id,
"chunk_id": chunk_id,
"batch_id": batch_id,
},
)

# When we receive an event with a `chunk_id` referencing the
# `next_chunk_id` of the insertion event, we can remove it from the
# When we receive an event with a `batch_id` referencing the
# `next_batch_id` of the insertion event, we can remove it from the
# `insertion_event_extremities` table.
sql = """
DELETE FROM insertion_event_extremities WHERE event_id IN (
SELECT event_id FROM insertion_events
WHERE next_chunk_id = ?
WHERE next_batch_id = ?
)
"""

txn.execute(sql, (chunk_id,))
txn.execute(sql, (batch_id,))

def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
Expand Down