From d5fd6b0727c15a057096f08b53cee603fc05afdb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 7 May 2021 15:22:02 +0100 Subject: [PATCH 1/5] add a cache to have_seen_event --- changelog.d/9953.misc | 1 + synapse/storage/databases/main/cache.py | 1 + .../storage/databases/main/events_worker.py | 34 ++++++++++++++++--- .../storage/databases/main/purge_events.py | 27 ++++++++++++--- 4 files changed, 53 insertions(+), 10 deletions(-) create mode 100644 changelog.d/9953.misc diff --git a/changelog.d/9953.misc b/changelog.d/9953.misc new file mode 100644 index 000000000000..5fc56097c25d --- /dev/null +++ b/changelog.d/9953.misc @@ -0,0 +1 @@ +Add a cache to `have_seen_events`. diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index ecc1f935e228..eec0c52abf14 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -168,6 +168,7 @@ def _invalidate_caches_for_event( backfilled, ): self._invalidate_get_event_cache(event_id) + self.have_seen_event.prefill((event_id,), True) self.get_latest_event_ids_in_room.invalidate((room_id,)) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 2c823e09cfa6..01b0c1769c94 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -22,6 +22,7 @@ Iterable, List, Optional, + Set, Tuple, overload, ) @@ -55,7 +56,7 @@ from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.storage.util.sequence import build_sequence_generator from synapse.types import JsonDict, get_domain_from_id -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -1046,7 +1047,7 @@ async def have_events_in_timeline(self, event_ids): return {r["event_id"] for r in rows} - async def have_seen_events(self, event_ids): + async def have_seen_events(self, event_ids: Collection[str]) -> Set[str]: """Given a list of event ids, check if we have already processed them. Args: @@ -1055,23 +1056,46 @@ async def have_seen_events(self, event_ids): Returns: set[str]: The events we have already seen. """ + res = await self._have_seen_events_dict(event_ids) + return {x for (x, y) in res.items() if y} + + @cachedList("have_seen_event", "event_ids") + async def _have_seen_events_dict( + self, event_ids: Collection[str] + ) -> Dict[str, bool]: + """Helper for have_seen_events + + Returns a dict, which is the right format for @cachedList + """ # if the event cache contains the event, obviously we've seen it. - results = {x for x in event_ids if self._get_event_cache.contains(x)} + cache_results = {x for x in event_ids if self._get_event_cache.contains(x)} + results = {x: True for x in cache_results} def have_seen_events_txn(txn, chunk): + # assume everything in this chunk is not found initially + results.update({x: False for x in chunk}) + + # check the db and update the results for any row that is found sql = "SELECT event_id FROM events as e WHERE " clause, args = make_in_list_sql_clause( txn.database_engine, "e.event_id", chunk ) txn.execute(sql + clause, args) - results.update(row[0] for row in txn) + results.update({row[0]: True for row in txn}) - for chunk in batch_iter((x for x in event_ids if x not in results), 100): + for chunk in batch_iter((x for x in event_ids if x not in cache_results), 100): await self.db_pool.runInteraction( "have_seen_events", have_seen_events_txn, chunk ) + return results + @cached(max_entries=100000) + async def have_seen_event(self, event_id): + # this only exists for the benefit of the @cachedList descriptor on + # _have_seen_events_dict + raise NotImplementedError() + def _get_current_state_event_counts_txn(self, txn, room_id): """ See get_current_state_event_counts. diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 8f83748b5edb..a37734156143 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -16,14 +16,14 @@ from typing import Any, List, Set, Tuple from synapse.api.errors import SynapseError -from synapse.storage._base import SQLBaseStore +from synapse.storage.databases.main import CacheInvalidationWorkerStore from synapse.storage.databases.main.state import StateGroupWorkerStore from synapse.types import RoomStreamToken logger = logging.getLogger(__name__) -class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): +class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): async def purge_history( self, room_id: str, token: str, delete_local_events: bool ) -> Set[int]: @@ -203,8 +203,6 @@ def _purge_history_txn( "DELETE FROM event_to_state_groups " "WHERE event_id IN (SELECT event_id from events_to_purge)" ) - for event_id, _ in event_rows: - txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) # Delete all remote non-state events for table in ( @@ -283,6 +281,18 @@ def _purge_history_txn( # so make sure to keep this actually last. txn.execute("DROP TABLE events_to_purge") + for event_id, should_delete in event_rows: + self._invalidate_cache_and_stream( + txn, self._get_state_group_for_event, (event_id,) + ) + + # FIXME: this is racy - what if have_seen_event gets called between the + # transaction completing and the invalidation running? + if should_delete: + self._invalidate_cache_and_stream( + txn, self.have_seen_event, (event_id,) + ) + logger.info("[purge] done") return referenced_state_groups @@ -422,7 +432,14 @@ def _purge_room_txn(self, txn, room_id: str) -> List[int]: # index on them. In any case we should be clearing out 'stream' tables # periodically anyway (#5888) - # TODO: we could probably usefully do a bunch of cache invalidation here + # TODO: we could probably usefully do a bunch more cache invalidation here + + # we have no way to know which events to clear out of have_seen_event + # so just have to drop the whole cache + # + # FIXME: this is racy - what if have_seen_event gets called between the + # DELETE completing and the invalidation running? + self._invalidate_all_cache_and_stream(txn, self.have_seen_event) logger.info("[purge] done") From dbe8659a642ab5c94c0f932877b85533dad7b4f2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 12 May 2021 18:59:59 +0100 Subject: [PATCH 2/5] Address review comments --- synapse/handlers/federation.py | 12 +++-- synapse/storage/databases/main/cache.py | 2 +- .../storage/databases/main/events_worker.py | 47 +++++++++++++------ .../storage/databases/main/purge_events.py | 17 ++++--- 4 files changed, 49 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 678f6b770797..c12d942b2838 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -576,7 +576,9 @@ async def _get_state_for_room( # Fetch the state events from the DB, and check we have the auth events. event_map = await self.store.get_events(state_event_ids, allow_rejected=True) - auth_events_in_store = await self.store.have_seen_events(auth_event_ids) + auth_events_in_store = await self.store.have_seen_events( + room_id, auth_event_ids + ) # Check for missing events. We handle state and auth event seperately, # as we want to pull the state from the DB, but we don't for the auth @@ -609,7 +611,7 @@ async def _get_state_for_room( if missing_auth_events: auth_events_in_store = await self.store.have_seen_events( - missing_auth_events + room_id, missing_auth_events ) missing_auth_events.difference_update(auth_events_in_store) @@ -709,7 +711,7 @@ async def _get_state_after_missing_prev_event( missing_auth_events = set(auth_event_ids) - fetched_events.keys() missing_auth_events.difference_update( - await self.store.have_seen_events(missing_auth_events) + await self.store.have_seen_events(room_id, missing_auth_events) ) logger.debug("We are also missing %i auth events", len(missing_auth_events)) @@ -2474,7 +2476,7 @@ async def _update_auth_events_and_context_for_auth( # # we start by checking if they are in the store, and then try calling /event_auth/. if missing_auth: - have_events = await self.store.have_seen_events(missing_auth) + have_events = await self.store.have_seen_events(event.room_id, missing_auth) logger.debug("Events %s are in the store", have_events) missing_auth.difference_update(have_events) @@ -2493,7 +2495,7 @@ async def _update_auth_events_and_context_for_auth( return context seen_remotes = await self.store.have_seen_events( - [e.event_id for e in remote_auth_chain] + event.room_id, [e.event_id for e in remote_auth_chain] ) for e in remote_auth_chain: diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index eb32f64639a3..c57ae5ef15c6 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -168,7 +168,7 @@ def _invalidate_caches_for_event( backfilled, ): self._invalidate_get_event_cache(event_id) - self.have_seen_event.prefill((event_id,), True) + self.have_seen_event.invalidate((room_id, event_id)) self.get_latest_event_ids_in_room.invalidate((room_id,)) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index d3b43837f7bc..9821440094cf 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1046,51 +1046,70 @@ async def have_events_in_timeline(self, event_ids): return {r["event_id"] for r in rows} - async def have_seen_events(self, event_ids: Collection[str]) -> Set[str]: + async def have_seen_events( + self, room_id: str, event_ids: Iterable[str] + ) -> Set[str]: """Given a list of event ids, check if we have already processed them. + The room_id is only used to structure the cache (so that it can later be + invalidated by room_id) - there is no guarantee that the events are actually + in the room in question. + Args: - event_ids (iterable[str]): + room_id: Room we are polling + event_ids: events we are looking for Returns: set[str]: The events we have already seen. """ - res = await self._have_seen_events_dict(event_ids) - return {x for (x, y) in res.items() if y} + res = await self._have_seen_events_dict( + (room_id, event_id) for event_id in event_ids + ) + return {eid for ((_rid, eid), have_event) in res.items() if have_event} - @cachedList("have_seen_event", "event_ids") + @cachedList("have_seen_event", "keys") async def _have_seen_events_dict( - self, event_ids: Collection[str] - ) -> Dict[str, bool]: + self, keys: Iterable[Tuple[str, str]] + ) -> Dict[Tuple[str, str], bool]: """Helper for have_seen_events - Returns a dict, which is the right format for @cachedList + Returns: + a dict {(room_id, event_id)-> bool} """ # if the event cache contains the event, obviously we've seen it. - cache_results = {x for x in event_ids if self._get_event_cache.contains(x)} + cache_results = { + eid for (_rid, eid) in keys if self._get_event_cache.contains(eid) + } results = {x: True for x in cache_results} - def have_seen_events_txn(txn, chunk): + def have_seen_events_txn(txn, chunk: Tuple[str, ...]): # assume everything in this chunk is not found initially results.update({x: False for x in chunk}) # check the db and update the results for any row that is found - sql = "SELECT event_id FROM events as e WHERE " + # NB this deliberately does *not* query on room_id, to make this an + # index-only lookup on `events_event_id_key`. + sql = "SELECT event_id FROM events AS e WHERE" clause, args = make_in_list_sql_clause( txn.database_engine, "e.event_id", chunk ) txn.execute(sql + clause, args) results.update({row[0]: True for row in txn}) - for chunk in batch_iter((x for x in event_ids if x not in cache_results), 100): + # each batch requires its own index scan, so we make the batches as big as + # possible. + for chunk in batch_iter( + (eid for (_rid, eid) in keys if eid not in cache_results), + 500, + ): await self.db_pool.runInteraction( "have_seen_events", have_seen_events_txn, chunk ) return results - @cached(max_entries=100000) - async def have_seen_event(self, event_id): + @cached(max_entries=100000, tree=True) + async def have_seen_event(self, room_id: str, event_id: str): # this only exists for the benefit of the @cachedList descriptor on # _have_seen_events_dict raise NotImplementedError() diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index a37734156143..7fb7780d0ffd 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -286,11 +286,13 @@ def _purge_history_txn( txn, self._get_state_group_for_event, (event_id,) ) - # FIXME: this is racy - what if have_seen_event gets called between the - # transaction completing and the invalidation running? + # XXX: This is racy, since have_seen_events could be called between the + # transaction completing and the invalidation running. On the other hand, + # that's no different to calling `have_seen_events` just before the + # event is deleted from the database. if should_delete: self._invalidate_cache_and_stream( - txn, self.have_seen_event, (event_id,) + txn, self.have_seen_event, (room_id, event_id) ) logger.info("[purge] done") @@ -434,12 +436,9 @@ def _purge_room_txn(self, txn, room_id: str) -> List[int]: # TODO: we could probably usefully do a bunch more cache invalidation here - # we have no way to know which events to clear out of have_seen_event - # so just have to drop the whole cache - # - # FIXME: this is racy - what if have_seen_event gets called between the - # DELETE completing and the invalidation running? - self._invalidate_all_cache_and_stream(txn, self.have_seen_event) + # XXX: as with purge_history, this is racy, but no worse than other races + # that already exist. + self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,)) logger.info("[purge] done") From 70cb739d7953c885287b895ecd0ee3a4dabc77b2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 12 May 2021 19:04:13 +0100 Subject: [PATCH 3/5] update news files --- changelog.d/9953.feature | 1 + changelog.d/9953.misc | 1 - changelog.d/9973.feature | 1 + changelog.d/9973.misc | 1 - 4 files changed, 2 insertions(+), 2 deletions(-) create mode 100644 changelog.d/9953.feature delete mode 100644 changelog.d/9953.misc create mode 100644 changelog.d/9973.feature delete mode 100644 changelog.d/9973.misc diff --git a/changelog.d/9953.feature b/changelog.d/9953.feature new file mode 100644 index 000000000000..6b3d1adc707f --- /dev/null +++ b/changelog.d/9953.feature @@ -0,0 +1 @@ +Improve performance of incoming federation transactions in large rooms. diff --git a/changelog.d/9953.misc b/changelog.d/9953.misc deleted file mode 100644 index 5fc56097c25d..000000000000 --- a/changelog.d/9953.misc +++ /dev/null @@ -1 +0,0 @@ -Add a cache to `have_seen_events`. diff --git a/changelog.d/9973.feature b/changelog.d/9973.feature new file mode 100644 index 000000000000..6b3d1adc707f --- /dev/null +++ b/changelog.d/9973.feature @@ -0,0 +1 @@ +Improve performance of incoming federation transactions in large rooms. diff --git a/changelog.d/9973.misc b/changelog.d/9973.misc deleted file mode 100644 index 7f22d42291b2..000000000000 --- a/changelog.d/9973.misc +++ /dev/null @@ -1 +0,0 @@ -Make `LruCache.invalidate` support tree invalidation, and remove `invalidate_many`. From ae20c8dfcaddc2c5eaf778eb74797bbc95bb0174 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 27 May 2021 15:48:53 +0100 Subject: [PATCH 4/5] Fix brokenness, address review comments, add tests --- .../storage/databases/main/events_worker.py | 28 +++--- tests/storage/databases/__init__.py | 13 +++ tests/storage/databases/main/__init__.py | 13 +++ .../databases/main/test_events_worker.py | 95 +++++++++++++++++++ 4 files changed, 135 insertions(+), 14 deletions(-) create mode 100644 tests/storage/databases/__init__.py create mode 100644 tests/storage/databases/main/__init__.py create mode 100644 tests/storage/databases/main/test_events_worker.py diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 9821440094cf..403a5ddaba3e 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1077,31 +1077,31 @@ async def _have_seen_events_dict( a dict {(room_id, event_id)-> bool} """ # if the event cache contains the event, obviously we've seen it. + cache_results = { - eid for (_rid, eid) in keys if self._get_event_cache.contains(eid) + (rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,)) } results = {x: True for x in cache_results} - def have_seen_events_txn(txn, chunk: Tuple[str, ...]): - # assume everything in this chunk is not found initially - results.update({x: False for x in chunk}) + def have_seen_events_txn(txn, chunk: Tuple[Tuple[str, str], ...]): + # we deliberately do *not* query the database for room_id, to make the + # query an index-only lookup on `events_event_id_key`. + # + # We therefore pull the events from the database into a set... - # check the db and update the results for any row that is found - # NB this deliberately does *not* query on room_id, to make this an - # index-only lookup on `events_event_id_key`. - sql = "SELECT event_id FROM events AS e WHERE" + sql = "SELECT event_id FROM events AS e WHERE " clause, args = make_in_list_sql_clause( - txn.database_engine, "e.event_id", chunk + txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk] ) txn.execute(sql + clause, args) - results.update({row[0]: True for row in txn}) + found_events = {eid for eid, in txn} + + # ... and then we can update the results for each row in the batch + results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk}) # each batch requires its own index scan, so we make the batches as big as # possible. - for chunk in batch_iter( - (eid for (_rid, eid) in keys if eid not in cache_results), - 500, - ): + for chunk in batch_iter((k for k in keys if k not in cache_results), 500): await self.db_pool.runInteraction( "have_seen_events", have_seen_events_txn, chunk ) diff --git a/tests/storage/databases/__init__.py b/tests/storage/databases/__init__.py new file mode 100644 index 000000000000..c24c7ecd92c2 --- /dev/null +++ b/tests/storage/databases/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/storage/databases/main/__init__.py b/tests/storage/databases/main/__init__.py new file mode 100644 index 000000000000..c24c7ecd92c2 --- /dev/null +++ b/tests/storage/databases/main/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py new file mode 100644 index 000000000000..2e05a813c6b5 --- /dev/null +++ b/tests/storage/databases/main/test_events_worker.py @@ -0,0 +1,95 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json + +from synapse.logging.context import LoggingContext +from synapse.storage.databases.main.events_worker import EventsWorkerStore + +from tests import unittest + + +class HaveSeenEventsTestCase(unittest.HomeserverTestCase): + def prepare(self, reactor, clock, hs): + self.store: EventsWorkerStore = hs.get_datastore() + + # insert some test data + for rid in ("room1", "room2"): + self.get_success( + self.store.db_pool.simple_insert( + "rooms", + {"room_id": rid, "room_version": 4}, + ) + ) + + for idx, (rid, eid) in enumerate( + ( + ("room1", "event10"), + ("room1", "event11"), + ("room1", "event12"), + ("room2", "event20"), + ) + ): + self.get_success( + self.store.db_pool.simple_insert( + "events", + { + "event_id": eid, + "room_id": rid, + "topological_ordering": idx, + "type": "test", + "processed": True, + "outlier": False, + }, + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + "event_json", + { + "event_id": eid, + "room_id": rid, + "json": json.dumps({"type": "test", "room_id": rid}), + "internal_metadata": "{}", + "format_version": 3, + }, + ) + ) + + def test_simple(self): + with LoggingContext(name="test") as ctx: + res = self.get_success( + self.store.have_seen_events("room1", ["event10", "event19"]) + ) + self.assertEquals(res, {"event10"}) + + # that should result in a single db query + self.assertEquals(ctx.get_resource_usage().db_txn_count, 1) + + # a second lookup of the same events should cause no queries + with LoggingContext(name="test") as ctx: + res = self.get_success( + self.store.have_seen_events("room1", ["event10", "event19"]) + ) + self.assertEquals(res, {"event10"}) + self.assertEquals(ctx.get_resource_usage().db_txn_count, 0) + + def test_query_via_event_cache(self): + # fetch an event into the event cache + self.get_success(self.store.get_event("event10")) + + # looking it up should now cause no db hits + with LoggingContext(name="test") as ctx: + res = self.get_success(self.store.have_seen_events("room1", ["event10"])) + self.assertEquals(res, {"event10"}) + self.assertEquals(ctx.get_resource_usage().db_txn_count, 0) From c8cf20c74ba12b367d01116c649f928eb0b3bf92 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 28 May 2021 11:51:55 +0100 Subject: [PATCH 5/5] fix test breakage under postgres --- tests/storage/databases/main/test_events_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 2e05a813c6b5..932970fd9ad1 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -47,6 +47,7 @@ def prepare(self, reactor, clock, hs): "event_id": eid, "room_id": rid, "topological_ordering": idx, + "stream_ordering": idx, "type": "test", "processed": True, "outlier": False,