-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Optimize filter_events_for_client
for faster /messages
- v1
#14494
Changes from 6 commits
ede07ca
8340906
3ee285f
2e86455
92a1aaf
0459a9c
2939ead
65a5d8f
d4b647b
e257e9f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Speed-up `/messages` with `filter_events_for_client` optimizations. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,11 +13,22 @@ | |
# limitations under the License. | ||
|
||
import logging | ||
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple | ||
from typing import ( | ||
TYPE_CHECKING, | ||
Collection, | ||
Dict, | ||
Iterable, | ||
List, | ||
Mapping, | ||
Optional, | ||
Set, | ||
Tuple, | ||
) | ||
|
||
import attr | ||
|
||
from synapse.api.constants import EventTypes | ||
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace | ||
from synapse.storage._base import SQLBaseStore | ||
from synapse.storage.database import ( | ||
DatabasePool, | ||
|
@@ -29,9 +40,11 @@ | |
from synapse.storage.types import Cursor | ||
from synapse.storage.util.sequence import build_sequence_generator | ||
from synapse.types import MutableStateMap, StateKey, StateMap | ||
from synapse.util.caches import intern_string | ||
from synapse.util.caches.descriptors import cached | ||
from synapse.util.caches.dictionary_cache import DictionaryCache | ||
from synapse.util.cancellation import cancellable | ||
from synapse.util.iterutils import batch_iter | ||
|
||
if TYPE_CHECKING: | ||
from synapse.server import HomeServer | ||
|
@@ -158,6 +171,8 @@ def _get_state_group_delta_txn(txn: LoggingTransaction) -> _GetStateGroupDelta: | |
) | ||
|
||
@cancellable | ||
@trace | ||
@tag_args | ||
Comment on lines
173
to
+175
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should I am guessing outermost but perhaps it doesn't matter |
||
async def _get_state_groups_from_groups( | ||
self, groups: List[int], state_filter: StateFilter | ||
) -> Dict[int, StateMap[str]]: | ||
|
@@ -171,6 +186,11 @@ async def _get_state_groups_from_groups( | |
Returns: | ||
Dict of state group to state map. | ||
""" | ||
set_tag( | ||
SynapseTags.FUNC_ARG_PREFIX + "groups.length", | ||
str(len(groups)), | ||
) | ||
|
||
results: Dict[int, StateMap[str]] = {} | ||
|
||
chunks = [groups[i : i + 100] for i in range(0, len(groups), 100)] | ||
|
@@ -237,7 +257,142 @@ def _get_state_for_group_using_cache( | |
|
||
return state_filter.filter_state(state_dict_ids), not missing_types | ||
|
||
async def _get_state_groups_from_cache( | ||
self, state_groups: Iterable[int], state_filter: StateFilter | ||
) -> Tuple[Dict[int, MutableStateMap[str]], Set[int]]: | ||
"""TODO | ||
|
||
Returns: | ||
A map from each state_group to the complete/incomplete state map (filled in by cached | ||
values) and the set of incomplete groups | ||
""" | ||
member_filter, non_member_filter = state_filter.get_member_split() | ||
|
||
# Now we look them up in the member and non-member caches | ||
( | ||
non_member_state, | ||
incomplete_groups_nm, | ||
) = self._get_state_for_groups_using_cache( | ||
state_groups, self._state_group_cache, state_filter=non_member_filter | ||
) | ||
|
||
(member_state, incomplete_groups_m) = self._get_state_for_groups_using_cache( | ||
state_groups, self._state_group_members_cache, state_filter=member_filter | ||
) | ||
|
||
state = dict(non_member_state) | ||
for state_group in state_groups: | ||
state[state_group].update(member_state[state_group]) | ||
|
||
# We may have only got one of the events for the group | ||
incomplete_groups = incomplete_groups_m | incomplete_groups_nm | ||
|
||
return (state, incomplete_groups) | ||
|
||
@cancellable | ||
@trace | ||
@tag_args | ||
async def _get_state_for_client_filtering( | ||
self, groups: Iterable[int], user_id_viewing_events: str | ||
) -> Dict[int, StateMap[str]]: | ||
""" | ||
TODO | ||
""" | ||
|
||
def _get_state_for_client_filtering_txn( | ||
txn: LoggingTransaction, groups: Iterable[int] | ||
) -> Mapping[int, StateMap[str]]: | ||
sql = """ | ||
WITH RECURSIVE sgs(state_group) AS ( | ||
VALUES(?::bigint) | ||
UNION ALL | ||
SELECT prev_state_group FROM state_group_edges e, sgs s | ||
WHERE s.state_group = e.state_group | ||
) | ||
SELECT | ||
type, state_key, event_id | ||
FROM state_groups_state | ||
WHERE | ||
state_group IN ( | ||
SELECT state_group FROM sgs | ||
) | ||
AND (type = ? AND state_key = ?) | ||
ORDER BY | ||
type, | ||
state_key, | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
-- Use the lastest state in the chain (highest numbered state_group in the chain) | ||
state_group DESC | ||
LIMIT 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BeforeOne query that takes ~80ms: Before raw query and
|
||
""" | ||
|
||
results: Dict[int, MutableStateMap[str]] = {group: {} for group in groups} | ||
for group in groups: | ||
row_info_list: List[Tuple] = [] | ||
txn.execute(sql, (group, EventTypes.RoomHistoryVisibility, "")) | ||
history_vis_info = txn.fetchone() | ||
if history_vis_info is not None: | ||
row_info_list.append(history_vis_info) | ||
|
||
txn.execute(sql, (group, EventTypes.Member, user_id_viewing_events)) | ||
membership_info = txn.fetchone() | ||
if membership_info is not None: | ||
row_info_list.append(membership_info) | ||
Comment on lines
+346
to
+354
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way we can batch up these two individual queries to have less database round-trip time? Is there a way we can batch up all the queries across all of the state groups? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, see the |
||
|
||
for row in row_info_list: | ||
typ, state_key, event_id = row | ||
key = (intern_string(typ), intern_string(state_key)) | ||
results[group][key] = event_id | ||
|
||
# The results should be considered immutable because we are using | ||
# `intern_string` (TODO: Should we? copied from _get_state_groups_from_groups_txn). | ||
return results | ||
|
||
# Craft a StateFilter to use with the cache | ||
state_filter_for_cache_lookup = StateFilter.from_types( | ||
( | ||
(EventTypes.RoomHistoryVisibility, ""), | ||
(EventTypes.Member, user_id_viewing_events), | ||
) | ||
) | ||
( | ||
results_from_cache, | ||
incomplete_groups, | ||
) = await self._get_state_groups_from_cache( | ||
groups, state_filter_for_cache_lookup | ||
) | ||
|
||
cache_sequence_nm = self._state_group_cache.sequence | ||
cache_sequence_m = self._state_group_members_cache.sequence | ||
|
||
results: Dict[int, StateMap[str]] = results_from_cache | ||
for batch in batch_iter(incomplete_groups, 100): | ||
group_to_state_mapping = await self.db_pool.runInteraction( | ||
"_get_state_for_client_filtering_txn", | ||
_get_state_for_client_filtering_txn, | ||
batch, | ||
) | ||
|
||
# Now lets update the caches | ||
# Help the cache hit ratio by expanding the filter a bit | ||
state_filter_for_cache_insertion = ( | ||
state_filter_for_cache_lookup.return_expanded() | ||
) | ||
group_to_state_dict: Dict[int, StateMap[str]] = {} | ||
group_to_state_dict.update(group_to_state_mapping) | ||
self._insert_into_cache( | ||
group_to_state_dict, | ||
state_filter_for_cache_insertion, | ||
cache_seq_num_members=cache_sequence_m, | ||
cache_seq_num_non_members=cache_sequence_nm, | ||
) | ||
|
||
results.update(group_to_state_mapping) | ||
|
||
return results | ||
|
||
@cancellable | ||
@trace | ||
@tag_args | ||
async def _get_state_for_groups( | ||
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None | ||
) -> Dict[int, MutableStateMap[str]]: | ||
|
@@ -254,6 +409,7 @@ async def _get_state_for_groups( | |
""" | ||
state_filter = state_filter or StateFilter.all() | ||
|
||
# TODO: Replace with _get_state_groups_from_cache | ||
member_filter, non_member_filter = state_filter.get_member_split() | ||
|
||
# Now we look them up in the member and non-member caches | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,13 @@ | |
from synapse.events import EventBase | ||
from synapse.events.snapshot import EventContext | ||
from synapse.events.utils import prune_event | ||
from synapse.logging.opentracing import trace | ||
from synapse.logging.opentracing import ( | ||
SynapseTags, | ||
set_tag, | ||
start_active_span, | ||
tag_args, | ||
trace, | ||
) | ||
from synapse.storage.controllers import StorageControllers | ||
from synapse.storage.databases.main import DataStore | ||
from synapse.storage.state import StateFilter | ||
|
@@ -53,6 +59,7 @@ | |
|
||
|
||
@trace | ||
@tag_args | ||
async def filter_events_for_client( | ||
storage: StorageControllers, | ||
user_id: str, | ||
|
@@ -82,6 +89,11 @@ async def filter_events_for_client( | |
Returns: | ||
The filtered events. | ||
""" | ||
set_tag( | ||
SynapseTags.FUNC_ARG_PREFIX + "events.length", | ||
str(len(events)), | ||
) | ||
|
||
# Filter out events that have been soft failed so that we don't relay them | ||
# to clients. | ||
events_before_filtering = events | ||
|
@@ -94,11 +106,32 @@ async def filter_events_for_client( | |
[event.event_id for event in events], | ||
) | ||
|
||
types = (_HISTORY_VIS_KEY, (EventTypes.Member, user_id)) | ||
non_outlier_event_ids = event_ids = frozenset( | ||
e.event_id for e in events if not e.internal_metadata.outlier | ||
) | ||
|
||
# we exclude outliers at this point, and then handle them separately later | ||
event_id_to_state = await storage.state.get_state_for_events( | ||
frozenset(e.event_id for e in events if not e.internal_metadata.outlier), | ||
# TODO: Remove: We do this just to remove await_full_state from the comparison | ||
await storage.state.get_state_group_for_events( | ||
non_outlier_event_ids, await_full_state=True | ||
) | ||
|
||
# Grab the history visibility and membership for each of the events. That's all we | ||
# need to know in order to filter them. | ||
event_id_to_state = await storage.state._get_state_for_client_filtering_for_events( | ||
# we exclude outliers at this point, and then handle them separately later | ||
event_ids=non_outlier_event_ids, | ||
user_id_viewing_events=user_id, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Everything is slightly rushed here and I wish I could have explored more before putting this up for review but this does seem like an improvement and we need to get any change deployed today so I've marked as ready for review. Let me know if this just isn't right to ship. |
||
) | ||
|
||
# TODO: Remove comparison | ||
# TODO: Remove cache invalidation | ||
storage.state.stores.state._state_group_cache.invalidate_all() | ||
storage.state.stores.state._state_group_members_cache.invalidate_all() | ||
# logger.info("----------------------------------------------------") | ||
# logger.info("----------------------------------------------------") | ||
types = (_HISTORY_VIS_KEY, (EventTypes.Member, user_id)) | ||
event_id_to_state_orig = await storage.state.get_state_for_events( | ||
non_outlier_event_ids, | ||
state_filter=StateFilter.from_types(types), | ||
) | ||
|
||
|
@@ -130,9 +163,10 @@ def allowed(event: EventBase) -> Optional[EventBase]: | |
sender_erased=erased_senders.get(event.sender, False), | ||
) | ||
|
||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does the $ psql synapse
# \d+ state_groups
Table "public.state_groups"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
----------+--------+-----------+----------+---------+----------+--------------+-------------
id | bigint | | not null | | plain | |
room_id | text | | not null | | extended | |
event_id | text | | not null | | extended | | It seems like a subset of $ psql synapse
# \d+ state_groups_state
Table "public.state_groups_state"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
-------------+--------+-----------+----------+---------+----------+--------------+-------------
state_group | bigint | | not null | | plain | |
room_id | text | | not null | | extended | |
type | text | | not null | | extended | |
state_key | text | | not null | | extended | |
event_id | text | | not null | | extended | |
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# Check each event: gives an iterable of None or (a potentially modified) | ||
# EventBase. | ||
filtered_events = map(allowed, events) | ||
with start_active_span("filtering events against allowed function"): | ||
# Check each event: gives an iterable of None or (a potentially modified) | ||
# EventBase. | ||
filtered_events = map(allowed, events) | ||
|
||
# Turn it into a list and remove None entries before returning. | ||
return [ev for ev in filtered_events if ev] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this accurate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably the source of our
TestPartialStateJoin
failures in Complement (https://github.com/matrix-org/synapse/actions/runs/3520947916/jobs/5902365185#step:6:11950). What should I be doing here?