Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

add a cache to have_seen_event #9953

Merged
merged 6 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9953.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a cache to `have_seen_events`.
1 change: 1 addition & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def _invalidate_caches_for_event(
backfilled,
):
self._invalidate_get_event_cache(event_id)
self.have_seen_event.prefill((event_id,), True)
richvdh marked this conversation as resolved.
Show resolved Hide resolved

self.get_latest_event_ids_in_room.invalidate((room_id,))

Expand Down
34 changes: 29 additions & 5 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Iterable,
List,
Optional,
Set,
Tuple,
overload,
)
Expand Down Expand Up @@ -55,7 +56,7 @@
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.caches.descriptors import cached
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -1046,7 +1047,7 @@ async def have_events_in_timeline(self, event_ids):

return {r["event_id"] for r in rows}

async def have_seen_events(self, event_ids):
async def have_seen_events(self, event_ids: Collection[str]) -> Set[str]:
"""Given a list of event ids, check if we have already processed them.

Args:
Expand All @@ -1055,23 +1056,46 @@ async def have_seen_events(self, event_ids):
Returns:
set[str]: The events we have already seen.
"""
res = await self._have_seen_events_dict(event_ids)
return {x for (x, y) in res.items() if y}

@cachedList("have_seen_event", "event_ids")
async def _have_seen_events_dict(
self, event_ids: Collection[str]
) -> Dict[str, bool]:
"""Helper for have_seen_events

Returns a dict, which is the right format for @cachedList
"""
# if the event cache contains the event, obviously we've seen it.
results = {x for x in event_ids if self._get_event_cache.contains(x)}
cache_results = {x for x in event_ids if self._get_event_cache.contains(x)}
results = {x: True for x in cache_results}

def have_seen_events_txn(txn, chunk):
# assume everything in this chunk is not found initially
results.update({x: False for x in chunk})

# check the db and update the results for any row that is found
sql = "SELECT event_id FROM events as e WHERE "
clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", chunk
)
txn.execute(sql + clause, args)
results.update(row[0] for row in txn)
results.update({row[0]: True for row in txn})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to include the room ID here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup.


for chunk in batch_iter((x for x in event_ids if x not in results), 100):
for chunk in batch_iter((x for x in event_ids if x not in cache_results), 100):
richvdh marked this conversation as resolved.
Show resolved Hide resolved
await self.db_pool.runInteraction(
"have_seen_events", have_seen_events_txn, chunk
)

return results

@cached(max_entries=100000)
richvdh marked this conversation as resolved.
Show resolved Hide resolved
async def have_seen_event(self, event_id):
# this only exists for the benefit of the @cachedList descriptor on
# _have_seen_events_dict
raise NotImplementedError()

def _get_current_state_event_counts_txn(self, txn, room_id):
"""
See get_current_state_event_counts.
Expand Down
27 changes: 22 additions & 5 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
from typing import Any, List, Set, Tuple

from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore
from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken

logger = logging.getLogger(__name__)


class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> Set[int]:
Expand Down Expand Up @@ -203,8 +203,6 @@ def _purge_history_txn(
"DELETE FROM event_to_state_groups "
"WHERE event_id IN (SELECT event_id from events_to_purge)"
)
for event_id, _ in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))

# Delete all remote non-state events
for table in (
Expand Down Expand Up @@ -283,6 +281,18 @@ def _purge_history_txn(
# so make sure to keep this actually last.
txn.execute("DROP TABLE events_to_purge")

for event_id, should_delete in event_rows:
self._invalidate_cache_and_stream(
txn, self._get_state_group_for_event, (event_id,)
)

# FIXME: this is racy - what if have_seen_event gets called between the
# transaction completing and the invalidation running?
richvdh marked this conversation as resolved.
Show resolved Hide resolved
if should_delete:
self._invalidate_cache_and_stream(
txn, self.have_seen_event, (event_id,)
)

logger.info("[purge] done")

return referenced_state_groups
Expand Down Expand Up @@ -422,7 +432,14 @@ def _purge_room_txn(self, txn, room_id: str) -> List[int]:
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)

# TODO: we could probably usefully do a bunch of cache invalidation here
# TODO: we could probably usefully do a bunch more cache invalidation here
richvdh marked this conversation as resolved.
Show resolved Hide resolved

# we have no way to know which events to clear out of have_seen_event
# so just have to drop the whole cache
#
# FIXME: this is racy - what if have_seen_event gets called between the
# DELETE completing and the invalidation running?
self._invalidate_all_cache_and_stream(txn, self.have_seen_event)

logger.info("[purge] done")

Expand Down