Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove stream ordering from Metadata dict (#8452)
Browse files Browse the repository at this point in the history
There's no need for it to be in the dict as well as the events table. Instead,
we store it in a separate attribute in the EventInternalMetadata object, and
populate that on load.

This means that we can rely on it being correctly populated for any event which
has been persited to the database.
  • Loading branch information
richvdh committed Oct 5, 2020
1 parent f64c6aa commit f31f8e6
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 33 deletions.
1 change: 1 addition & 0 deletions changelog.d/8452.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove redundant databae loads of stream_ordering for events we already have.
6 changes: 4 additions & 2 deletions synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,16 @@ def __get__(self, instance, owner=None):


class _EventInternalMetadata:
__slots__ = ["_dict"]
__slots__ = ["_dict", "stream_ordering"]

def __init__(self, internal_metadata_dict: JsonDict):
# we have to copy the dict, because it turns out that the same dict is
# reused. TODO: fix that
self._dict = dict(internal_metadata_dict)

# the stream ordering of this event. None, until it has been persisted.
self.stream_ordering = None # type: Optional[int]

outlier = DictProperty("outlier") # type: bool
out_of_band_membership = DictProperty("out_of_band_membership") # type: bool
send_on_behalf_of = DictProperty("send_on_behalf_of") # type: str
Expand All @@ -113,7 +116,6 @@ def __init__(self, internal_metadata_dict: JsonDict):
redacted = DictProperty("redacted") # type: bool
txn_id = DictProperty("txn_id") # type: str
token_id = DictProperty("token_id") # type: str
stream_ordering = DictProperty("stream_ordering") # type: int

# XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't
Expand Down
5 changes: 5 additions & 0 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ def prune_event(event: EventBase) -> EventBase:
pruned_event_dict, event.room_version, event.internal_metadata.get_dict()
)

# copy the internal fields
pruned_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)

# Mark the event as redacted
pruned_event.internal_metadata.redacted = True

Expand Down
2 changes: 2 additions & 0 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc()

assert pdu.internal_metadata.stream_ordering

# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
Expand Down
2 changes: 2 additions & 0 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def send_pdu(self, pdu: EventBase) -> None:
# yet know if we have anything to catch up (None)
self._pending_pdus.append(pdu)
else:
assert pdu.internal_metadata.stream_ordering
self._catchup_last_skipped = pdu.internal_metadata.stream_ordering

self.attempt_new_transaction()
Expand Down Expand Up @@ -361,6 +362,7 @@ async def _transaction_transmission_loop(self) -> None:
last_successful_stream_ordering = (
final_pdu.internal_metadata.stream_ordering
)
assert last_successful_stream_ordering
await self._store.set_destination_last_successful_stream_ordering(
self._destination, last_successful_stream_ordering
)
Expand Down
3 changes: 3 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3008,6 +3008,9 @@ async def _notify_persisted_event(
elif event.internal_metadata.is_outlier():
return

# the event has been persisted so it should have a stream ordering.
assert event.internal_metadata.stream_ordering

event_pos = PersistedEventPosition(
self._instance_name, event.internal_metadata.stream_ordering
)
Expand Down
4 changes: 3 additions & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,9 @@ async def send_nonmember_event(
event.event_id,
prev_event.event_id,
)
return await self.store.get_stream_id_for_event(prev_event.event_id)
# we know it was persisted, so must have a stream ordering
assert prev_event.internal_metadata.stream_ordering
return prev_event.internal_metadata.stream_ordering

return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
Expand Down
13 changes: 7 additions & 6 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ async def _local_membership_update(
)
if duplicate is not None:
# Discard the new event since this membership change is a no-op.
_, stream_id = await self.store.get_event_ordering(duplicate.event_id)
return duplicate.event_id, stream_id
# we know it was persisted, so must have a stream ordering.
assert duplicate.internal_metadata.stream_ordering
return duplicate.event_id, duplicate.internal_metadata.stream_ordering

prev_state_ids = await context.get_prev_state_ids()

Expand Down Expand Up @@ -441,12 +442,12 @@ async def _update_membership(
same_membership = old_membership == effective_membership_state
same_sender = requester.user.to_string() == old_state.sender
if same_sender and same_membership and same_content:
_, stream_id = await self.store.get_event_ordering(
old_state.event_id
)
# duplicate event.
# we know it was persisted, so must have a stream ordering.
assert old_state.internal_metadata.stream_ordering
return (
old_state.event_id,
stream_id,
old_state.internal_metadata.stream_ordering,
)

if old_membership in ["ban", "leave"] and action == "kick":
Expand Down
5 changes: 4 additions & 1 deletion synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
UsersRestServletV2,
WhoisRestServlet,
)
from synapse.types import RoomStreamToken
from synapse.util.versionstring import get_version_string

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -109,7 +110,9 @@ async def on_POST(self, request, room_id, event_id):
if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")

room_token = await self.store.get_topological_token_for_event(event_id)
room_token = RoomStreamToken(
event.depth, event.internal_metadata.stream_ordering
)
token = await room_token.to_string(self.store)

logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ def _persist_events_txn(
min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering

# stream orderings should have been assigned by now
assert min_stream_order
assert max_stream_order

self._update_forward_extremities_txn(
txn,
new_forward_extremities=new_forward_extremeties,
Expand Down
26 changes: 16 additions & 10 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ async def _get_events_from_db(self, event_ids, allow_rejected=False):
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
original_ev.internal_metadata.stream_ordering = row["stream_ordering"]

event_map[event_id] = original_ev

Expand Down Expand Up @@ -790,6 +791,8 @@ def _fetch_event_rows(self, txn, event_ids):
* event_id (str)
* stream_ordering (int): stream ordering for this event
* json (str): json-encoded event structure
* internal_metadata (str): json-encoded internal metadata dict
Expand Down Expand Up @@ -822,13 +825,15 @@ def _fetch_event_rows(self, txn, event_ids):
sql = """\
SELECT
e.event_id,
e.internal_metadata,
e.json,
e.format_version,
e.stream_ordering,
ej.internal_metadata,
ej.json,
ej.format_version,
r.room_version,
rej.reason
FROM event_json as e
LEFT JOIN rooms r USING (room_id)
FROM events AS e
JOIN event_json AS ej USING (event_id)
LEFT JOIN rooms r ON r.room_id = e.room_id
LEFT JOIN rejections as rej USING (event_id)
WHERE """

Expand All @@ -842,11 +847,12 @@ def _fetch_event_rows(self, txn, event_ids):
event_id = row[0]
event_dict[event_id] = {
"event_id": event_id,
"internal_metadata": row[1],
"json": row[2],
"format_version": row[3],
"room_version_id": row[4],
"rejected_reason": row[5],
"stream_ordering": row[1],
"internal_metadata": row[2],
"json": row[3],
"format_version": row[4],
"room_version_id": row[5],
"rejected_reason": row[6],
"redactions": [],
}

Expand Down
13 changes: 0 additions & 13 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,19 +589,6 @@ async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
)
return "t%d-%d" % (topo, token)

async def get_stream_id_for_event(self, event_id: str) -> int:
"""The stream ID for an event
Args:
event_id: The id of the event to look up a stream token for.
Raises:
StoreError if the event wasn't in the database.
Returns:
A stream ID.
"""
return await self.db_pool.runInteraction(
"get_stream_id_for_event", self.get_stream_id_for_event_txn, event_id,
)

def get_stream_id_for_event_txn(
self, txn: LoggingTransaction, event_id: str, allow_none=False,
) -> int:
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ async def persist_event(
await make_deferred_yieldable(deferred)

event_stream_id = event.internal_metadata.stream_ordering
# stream ordering should have been assigned by now
assert event_stream_id

pos = PersistedEventPosition(self._instance_name, event_stream_id)
return pos, self.main_store.get_room_max_token()
Expand Down

0 comments on commit f31f8e6

Please sign in to comment.