Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimize filter_events_for_client for faster /messages - v1 #14494

1 change: 1 addition & 0 deletions changelog.d/14494.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed-up `/messages` with `filter_events_for_client` optimizations.
46 changes: 45 additions & 1 deletion synapse/storage/controllers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from synapse.api.constants import EventTypes
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.storage.roommember import ProfileInfo
from synapse.storage.state import StateFilter
from synapse.storage.util.partial_state_events_tracker import (
Expand Down Expand Up @@ -182,6 +182,49 @@ def _get_state_groups_from_groups(

return self.stores.state._get_state_groups_from_groups(groups, state_filter)

@trace
@tag_args
async def _get_state_for_client_filtering_for_events(
self, event_ids: Collection[str], user_id_viewing_events: str
) -> Dict[str, StateMap[EventBase]]:
"""TODO"""
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
str(len(event_ids)),
)

# Since we're making decisions based on the state, we need to wait.
await_full_state = True
Comment on lines +207 to +208
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this accurate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the source of our TestPartialStateJoin failures in Complement (https://github.com/matrix-org/synapse/actions/runs/3520947916/jobs/5902365185#step:6:11950). What should I be doing here?


event_to_groups = await self.get_state_group_for_events(
event_ids, await_full_state=await_full_state
)

groups = set(event_to_groups.values())
group_to_state = await self.stores.state._get_state_for_client_filtering(
groups, user_id_viewing_events
)
# logger.info(
# "_get_state_for_client_filtering_for_events: group_to_state=%s",
# group_to_state,
# )

state_event_map = await self.stores.main.get_events(
[ev_id for sd in group_to_state.values() for ev_id in sd.values()],
get_prev_content=False,
)

event_to_state = {
event_id: {
k: state_event_map[v]
for k, v in group_to_state[group].items()
if v in state_event_map
}
for event_id, group in event_to_groups.items()
}

return {event: event_to_state[event] for event in event_ids}

@trace
async def get_state_for_events(
self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
Expand Down Expand Up @@ -212,6 +255,7 @@ async def get_state_for_events(
group_to_state = await self.stores.state._get_state_for_groups(
groups, state_filter or StateFilter.all()
)
# logger.info("get_state_for_events: group_to_state=%s", group_to_state)

state_event_map = await self.stores.main.get_events(
[ev_id for sd in group_to_state.values() for ev_id in sd.values()],
Expand Down
158 changes: 157 additions & 1 deletion synapse/storage/databases/state/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,22 @@
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
List,
Mapping,
Optional,
Set,
Tuple,
)

import attr

from synapse.api.constants import EventTypes
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
Expand All @@ -29,9 +40,11 @@
from synapse.storage.types import Cursor
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import MutableStateMap, StateKey, StateMap
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import cached
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -158,6 +171,8 @@ def _get_state_group_delta_txn(txn: LoggingTransaction) -> _GetStateGroupDelta:
)

@cancellable
@trace
@tag_args
Comment on lines 173 to +175
Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should @cancellable be the innermost or outermost decorator?

I am guessing outermost but perhaps it doesn't matter

async def _get_state_groups_from_groups(
self, groups: List[int], state_filter: StateFilter
) -> Dict[int, StateMap[str]]:
Expand All @@ -171,6 +186,11 @@ async def _get_state_groups_from_groups(
Returns:
Dict of state group to state map.
"""
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "groups.length",
str(len(groups)),
)

results: Dict[int, StateMap[str]] = {}

chunks = [groups[i : i + 100] for i in range(0, len(groups), 100)]
Expand Down Expand Up @@ -237,7 +257,142 @@ def _get_state_for_group_using_cache(

return state_filter.filter_state(state_dict_ids), not missing_types

async def _get_state_groups_from_cache(
self, state_groups: Iterable[int], state_filter: StateFilter
) -> Tuple[Dict[int, MutableStateMap[str]], Set[int]]:
"""TODO

Returns:
A map from each state_group to the complete/incomplete state map (filled in by cached
values) and the set of incomplete groups
"""
member_filter, non_member_filter = state_filter.get_member_split()

# Now we look them up in the member and non-member caches
(
non_member_state,
incomplete_groups_nm,
) = self._get_state_for_groups_using_cache(
state_groups, self._state_group_cache, state_filter=non_member_filter
)

(member_state, incomplete_groups_m) = self._get_state_for_groups_using_cache(
state_groups, self._state_group_members_cache, state_filter=member_filter
)

state = dict(non_member_state)
for state_group in state_groups:
state[state_group].update(member_state[state_group])

# We may have only got one of the events for the group
incomplete_groups = incomplete_groups_m | incomplete_groups_nm

return (state, incomplete_groups)

@cancellable
@trace
@tag_args
async def _get_state_for_client_filtering(
self, groups: Iterable[int], user_id_viewing_events: str
) -> Dict[int, StateMap[str]]:
"""
TODO
"""

def _get_state_for_client_filtering_txn(
txn: LoggingTransaction, groups: Iterable[int]
) -> Mapping[int, StateMap[str]]:
sql = """
WITH RECURSIVE sgs(state_group) AS (
VALUES(?::bigint)
UNION ALL
SELECT prev_state_group FROM state_group_edges e, sgs s
WHERE s.state_group = e.state_group
)
SELECT
type, state_key, event_id
FROM state_groups_state
WHERE
state_group IN (
SELECT state_group FROM sgs
)
AND (type = ? AND state_key = ?)
ORDER BY
type,
state_key,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
-- Use the lastest state in the chain (highest numbered state_group in the chain)
state_group DESC
LIMIT 1
Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before

One query that takes ~80ms:

Before raw query and EXPLAIN ANALYZE
EXPLAIN ANALYZE WITH RECURSIVE state(state_group) AS (
    VALUES(739988088::bigint)
    UNION ALL
    SELECT prev_state_group FROM state_group_edges e, state s
    WHERE s.state_group = e.state_group
)
SELECT DISTINCT ON (type, state_key)
    type, state_key, event_id
FROM state_groups_state
WHERE
    state_group IN (
        SELECT state_group FROM state
    )
    AND ((type = 'm.room.history_visibility' AND state_key = '') OR (type = 'm.room.member' AND state_key = '@madlittlemods:matrix.org'))
ORDER BY type, state_key, state_group DESC;
                                                                                          QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Unique  (cost=5274.51..5274.52 rows=1 width=87) (actual time=76.555..76.561 rows=2 loops=1)
   CTE state
     ->  Recursive Union  (cost=0.00..284.28 rows=101 width=8) (actual time=0.002..0.443 rows=59 loops=1)
           ->  Result  (cost=0.00..0.01 rows=1 width=8) (actual time=0.001..0.002 rows=1 loops=1)
           ->  Nested Loop  (cost=0.57..28.23 rows=10 width=8) (actual time=0.006..0.007 rows=1 loops=59)
                 ->  WorkTable Scan on state s  (cost=0.00..0.20 rows=10 width=8) (actual time=0.000..0.000 rows=1 loops=59)
                 ->  Index Only Scan using state_group_edges_unique_idx on state_group_edges e  (cost=0.57..2.79 rows=1 width=16) (actual time=0.006..0.006 rows=1 loops=59)
                       Index Cond: (state_group = s.state_group)
                       Heap Fetches: 58
   ->  Sort  (cost=4990.23..4990.23 rows=1 width=87) (actual time=76.554..76.555 rows=2 loops=1)
         Sort Key: state_groups_state.type, state_groups_state.state_key, state_groups_state.state_group DESC
         Sort Method: quicksort  Memory: 25kB
         ->  Nested Loop  (cost=3.11..4990.22 rows=1 width=87) (actual time=0.740..76.542 rows=2 loops=1)
               ->  HashAggregate  (cost=2.27..3.28 rows=101 width=8) (actual time=0.484..0.498 rows=59 loops=1)
                     Group Key: state.state_group
                     Batches: 1  Memory Usage: 24kB
                     ->  CTE Scan on state  (cost=0.00..2.02 rows=101 width=8) (actual time=0.004..0.459 rows=59 loops=1)
               ->  Index Scan using state_groups_state_type_idx on state_groups_state  (cost=0.84..49.37 rows=1 width=87) (actual time=0.009..1.289 rows=0 loops=59)
                     Index Cond: (state_group = state.state_group)
                     Filter: (((type = 'm.room.history_visibility'::text) AND (state_key = ''::text)) OR ((type = 'm.room.member'::text) AND (state_key = '@madlittlemods:matrix.org'::text)))
                     Rows Removed by Filter: 1495
 Planning Time: 3.309 ms
 Execution Time: 76.629 ms
(23 rows)

Time: 80.624 ms

After

Two queries that each take about ~4ms

After raw query (for membership) and EXPLAIN ANALYZE
EXPLAIN ANALYZE WITH RECURSIVE sgs(state_group) AS (
    VALUES(739988088::bigint)
    UNION ALL
    SELECT prev_state_group FROM state_group_edges e, sgs s
    WHERE s.state_group = e.state_group
)
SELECT
    state_group, type, state_key, event_id
FROM state_groups_state
WHERE
    state_group IN (
        SELECT state_group FROM sgs
    )
    AND (type = 'm.room.member' AND state_key = '@madlittlemods:matrix.org')
ORDER BY
    type,
    state_key,
    -- Use the lastest state in the chain
    state_group DESC
LIMIT 1;
                                                                                 QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=779.70..779.70 rows=1 width=87) (actual time=1.248..1.251 rows=1 loops=1)
   CTE sgs
     ->  Recursive Union  (cost=0.00..284.28 rows=101 width=8) (actual time=0.003..0.653 rows=59 loops=1)
           ->  Result  (cost=0.00..0.01 rows=1 width=8) (actual time=0.001..0.002 rows=1 loops=1)
           ->  Nested Loop  (cost=0.57..28.23 rows=10 width=8) (actual time=0.009..0.010 rows=1 loops=59)
                 ->  WorkTable Scan on sgs s  (cost=0.00..0.20 rows=10 width=8) (actual time=0.000..0.000 rows=1 loops=59)
                 ->  Index Only Scan using state_group_edges_unique_idx on state_group_edges e  (cost=0.57..2.79 rows=1 width=16) (actual time=0.008..0.009 rows=1 loops=59)
                       Index Cond: (state_group = s.state_group)
                       Heap Fetches: 58
   ->  Sort  (cost=495.42..495.42 rows=1 width=87) (actual time=1.246..1.247 rows=1 loops=1)
         Sort Key: state_groups_state.state_group DESC
         Sort Method: quicksort  Memory: 25kB
         ->  Nested Loop  (cost=3.11..495.41 rows=1 width=87) (actual time=0.925..1.239 rows=1 loops=1)
               ->  HashAggregate  (cost=2.27..3.28 rows=101 width=8) (actual time=0.717..0.729 rows=59 loops=1)
                     Group Key: sgs.state_group
                     Batches: 1  Memory Usage: 24kB
                     ->  CTE Scan on sgs  (cost=0.00..2.02 rows=101 width=8) (actual time=0.005..0.680 rows=59 loops=1)
               ->  Index Scan using state_groups_state_type_idx on state_groups_state  (cost=0.84..4.86 rows=1 width=87) (actual time=0.008..0.008 rows=0 loops=59)
                     Index Cond: ((state_group = sgs.state_group) AND (type = 'm.room.member'::text) AND (state_key = '@madlittlemods:matrix.org'::text))
 Planning Time: 2.611 ms
 Execution Time: 1.326 ms
(21 rows)

Time: 4.614 ms
After raw query (for history visibility) and EXPLAIN ANALYZE
WITH RECURSIVE sgs(state_group) AS (
    VALUES(739988088::bigint)
    UNION ALL
    SELECT prev_state_group FROM state_group_edges e, sgs s
    WHERE s.state_group = e.state_group
)
SELECT
    state_group, type, state_key, event_id
FROM state_groups_state
WHERE
    state_group IN (
        SELECT state_group FROM sgs
    )
    AND (type = 'm.room.history_visibility' AND state_key = '')
ORDER BY
    type,
    state_key,
    -- Use the lastest state in the chain
    state_group DESC
LIMIT 1;

                                                                                 QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=779.70..779.70 rows=1 width=87) (actual time=0.961..0.964 rows=1 loops=1)
   CTE sgs
     ->  Recursive Union  (cost=0.00..284.28 rows=101 width=8) (actual time=0.003..0.458 rows=59 loops=1)
           ->  Result  (cost=0.00..0.01 rows=1 width=8) (actual time=0.002..0.002 rows=1 loops=1)
           ->  Nested Loop  (cost=0.57..28.23 rows=10 width=8) (actual time=0.007..0.007 rows=1 loops=59)
                 ->  WorkTable Scan on sgs s  (cost=0.00..0.20 rows=10 width=8) (actual time=0.000..0.000 rows=1 loops=59)
                 ->  Index Only Scan using state_group_edges_unique_idx on state_group_edges e  (cost=0.57..2.79 rows=1 width=16) (actual time=0.006..0.006 rows=1 loops=59)
                       Index Cond: (state_group = s.state_group)
                       Heap Fetches: 58
   ->  Sort  (cost=495.42..495.42 rows=1 width=87) (actual time=0.960..0.961 rows=1 loops=1)
         Sort Key: state_groups_state.state_group DESC
         Sort Method: quicksort  Memory: 25kB
         ->  Nested Loop  (cost=3.11..495.41 rows=1 width=87) (actual time=0.675..0.953 rows=1 loops=1)
               ->  HashAggregate  (cost=2.27..3.28 rows=101 width=8) (actual time=0.503..0.514 rows=59 loops=1)
                     Group Key: sgs.state_group
                     Batches: 1  Memory Usage: 24kB
                     ->  CTE Scan on sgs  (cost=0.00..2.02 rows=101 width=8) (actual time=0.004..0.476 rows=59 loops=1)
               ->  Index Scan using state_groups_state_type_idx on state_groups_state  (cost=0.84..4.86 rows=1 width=87) (actual time=0.007..0.007 rows=0 loops=59)
                     Index Cond: ((state_group = sgs.state_group) AND (type = 'm.room.history_visibility'::text) AND (state_key = ''::text))
 Planning Time: 1.648 ms
 Execution Time: 1.018 ms
(21 rows)

Time: 3.280 ms

Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have exact details to be confident in why this is faster but my thinking was we will be trying to do less by matching quicker because of one criteria at a time and LIMIT 1 where it can bail early after one successful state_groups_state match. And not doing any DISTINCT weirdness that probably keeps track of values.

Plus the query planner shows us that we're not throwing away 88k of rows filtering them away anymore.

We still recurse the entire state_event_edges chain but that is probably quick since there are no joins in it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh, that is very interesting. I think what is going on here is that the OR condition means that the query planner is no longer just pulling out the specific type/state_key pairs it needs, its instead pulling out the entire state group and filtering in memory. This has significant impact for rooms with large state groups, obviously.

One alternate proposal is instead of using OR conditions is to instead do UNION, e.g.:

EXPLAIN ANALYZE WITH RECURSIVE sgs(state_group) AS (
    VALUES(739988088::bigint)
    UNION ALL
    SELECT prev_state_group FROM state_group_edges e, sgs s
    WHERE s.state_group = e.state_group
)
(
    SELECT DISTINCT ON (type, state_key)
        state_group, type, state_key, event_id
    FROM state_groups_state
    INNER JOIN sgs USING (state_group)
    WHERE (type = 'm.room.member' AND state_key = '@madlittlemods:matrix.org')
    ORDER BY type, state_key, state_group DESC
) UNION (
    SELECT DISTINCT ON (type, state_key)
        state_group, type, state_key, event_id
    FROM state_groups_state
    INNER JOIN sgs USING (state_group)
    WHERE (type = 'm.room.history_visibility' AND state_key = '')
    ORDER BY type, state_key, state_group DESC
);

This has the advantage of being a single query, and reusing the work for pulling out the state edges.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can actually _get_state_groups_from_groups_txn implementation to do the above if we get a state filter? Would likely be a much smaller change?

Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikjohnston Have you tested your query? It does seem like an improvement (~50ms) over develop but takes longer than the proposed above and about ~20ms after Postgres cache is warm still.

https://explain.depesz.com/s/cyaR

Raw query plan
                                                                                      QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=1246.10..1246.61 rows=51 width=104) (actual time=28.398..28.403 rows=0 loops=1)
   Group Key: state_groups_state.state_group, state_groups_state.type, state_groups_state.state_key, state_groups_state.event_id
   Batches: 1  Memory Usage: 24kB
   CTE sgs
     ->  Recursive Union  (cost=0.00..515.03 rows=101 width=8) (actual time=0.003..1.594 rows=1 loops=1)
           ->  Result  (cost=0.00..0.01 rows=1 width=8) (actual time=0.001..0.002 rows=1 loops=1)
           ->  Nested Loop  (cost=0.28..51.30 rows=10 width=8) (actual time=1.590..1.590 rows=0 loops=1)
                 ->  WorkTable Scan on sgs s  (cost=0.00..0.20 rows=10 width=8) (actual time=0.001..0.002 rows=1 loops=1)
                 ->  Index Only Scan using state_group_edges_unique_idx on state_group_edges e  (cost=0.28..5.10 rows=1 width=16) (actual time=1.585..1.585 rows=0 loops=1)
                       Index Cond: (state_group = s.state_group)
                       Heap Fetches: 0
   ->  Append  (cost=0.28..730.56 rows=51 width=104) (actual time=28.392..28.395 rows=0 loops=1)
         ->  Unique  (cost=0.28..304.38 rows=1 width=95) (actual time=18.021..18.021 rows=0 loops=1)
               ->  Nested Loop  (cost=0.28..304.38 rows=1 width=95) (actual time=18.020..18.020 rows=0 loops=1)
                     Join Filter: (state_groups_state.state_group = sgs.state_group)
                     ->  Index Scan Backward using state_groups_state_type_idx on state_groups_state  (cost=0.28..301.10 rows=1 width=95) (actual time=18.019..18.019 rows=0 loops=1)
                           Index Cond: ((type = 'm.room.member'::text) AND (state_key = '@madlittlemods:matrix.org'::text))
                     ->  CTE Scan on sgs  (cost=0.00..2.02 rows=101 width=8) (never executed)
         ->  Unique  (cost=425.28..425.41 rows=50 width=95) (actual time=10.369..10.371 rows=0 loops=1)
               ->  Sort  (cost=425.28..425.41 rows=50 width=95) (actual time=10.368..10.370 rows=0 loops=1)
                     Sort Key: state_groups_state_1.state_group DESC
                     Sort Method: quicksort  Memory: 25kB
                     ->  Hash Join  (cost=300.39..423.87 rows=50 width=95) (actual time=9.244..9.245 rows=0 loops=1)
                           Hash Cond: (state_groups_state_1.state_group = sgs_1.state_group)
                           ->  Bitmap Heap Scan on state_groups_state state_groups_state_1  (cost=297.11..419.88 rows=56 width=95) (actual time=0.921..5.960 rows=232 loops=1)
                                 Recheck Cond: ((type = 'm.room.history_visibility'::text) AND (state_key = ''::text))
                                 Heap Blocks: exact=62
                                 ->  Bitmap Index Scan on state_groups_state_type_idx  (cost=0.00..297.09 rows=56 width=0) (actual time=0.904..0.905 rows=232 loops=1)
                                       Index Cond: ((type = 'm.room.history_visibility'::text) AND (state_key = ''::text))
                           ->  Hash  (cost=2.02..2.02 rows=101 width=8) (actual time=2.140..2.141 rows=1 loops=1)
                                 Buckets: 1024  Batches: 1  Memory Usage: 9kB
                                 ->  CTE Scan on sgs sgs_1  (cost=0.00..2.02 rows=101 width=8) (actual time=0.004..1.595 rows=1 loops=1)
 Planning Time: 25.952 ms
 Execution Time: 33.217 ms
(34 rows)

Time: 141.669 ms

Copy link
Member

@erikjohnston erikjohnston Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes only a few ms for me :/

Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes only a few ms for me :/

It's possible I was testing on the wrong database (local vs matrix.org) (kinda hard to judge now that caches are warm eveywhere). We can at least see that it still has a higher cost and deals with more rows.

But I do really like the idea of it being a single query and re-using the state-edge chain!

Talking with @erikjohnston out of band, the plan is to go with the neat UNION solution proposed and I will get a PR for that up in the next few hours (hopefully a much smaller and clean diff for easy review).

Other exploration

Adding LIMIT 1 doesn't seem different and actually slightly higher "cost" (whatever that is worth at this small scale)

https://explain.depesz.com/s/1PQG

Query plan from LIMIT 1
EXPLAIN ANALYZE WITH RECURSIVE sgs(state_group) AS (
    VALUES(739988088::bigint)
    UNION ALL
    SELECT prev_state_group FROM state_group_edges e, sgs s
    WHERE s.state_group = e.state_group
)
(
    SELECT DISTINCT ON (type, state_key)
        state_group, type, state_key, event_id
    FROM state_groups_state
    INNER JOIN sgs USING (state_group)
    WHERE (type = 'm.room.member' AND state_key = '@madlittlemods:matrix.org')
    ORDER BY type, state_key, state_group DESC
    LIMIT 1
) UNION (
    SELECT DISTINCT ON (type, state_key)
        state_group, type, state_key, event_id
    FROM state_groups_state
    INNER JOIN sgs USING (state_group)
    WHERE (type = 'm.room.history_visibility' AND state_key = '')
    ORDER BY type, state_key, state_group DESC
    LIMIT 1
);
                                                                                                   QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Unique  (cost=1272.63..1272.66 rows=2 width=104) (actual time=1.541..1.550 rows=2 loops=1)
   CTE sgs
     ->  Recursive Union  (cost=0.00..284.28 rows=101 width=8) (actual time=0.003..0.563 rows=59 loops=1)
           ->  Result  (cost=0.00..0.01 rows=1 width=8) (actual time=0.001..0.002 rows=1 loops=1)
           ->  Nested Loop  (cost=0.57..28.23 rows=10 width=8) (actual time=0.008..0.008 rows=1 loops=59)
                 ->  WorkTable Scan on sgs s  (cost=0.00..0.20 rows=10 width=8) (actual time=0.000..0.000 rows=1 loops=59)
                 ->  Index Only Scan using state_group_edges_unique_idx on state_group_edges e  (cost=0.57..2.79 rows=1 width=16) (actual time=0.007..0.007 rows=1 loops=59)
                       Index Cond: (state_group = s.state_group)
                       Heap Fetches: 58
   ->  Sort  (cost=988.35..988.36 rows=2 width=104) (actual time=1.540..1.543 rows=2 loops=1)
         Sort Key: state_groups_state.state_group, state_groups_state.type, state_groups_state.state_key, state_groups_state.event_id
         Sort Method: quicksort  Memory: 25kB
         ->  Append  (cost=494.15..988.34 rows=2 width=104) (actual time=1.159..1.534 rows=2 loops=1)
               ->  Limit  (cost=494.15..494.16 rows=1 width=87) (actual time=1.158..1.160 rows=1 loops=1)
                     ->  Unique  (cost=494.15..494.16 rows=1 width=87) (actual time=1.157..1.158 rows=1 loops=1)
                           ->  Sort  (cost=494.15..494.16 rows=1 width=87) (actual time=1.156..1.157 rows=1 loops=1)
                                 Sort Key: state_groups_state.state_group DESC
                                 Sort Method: quicksort  Memory: 25kB
                                 ->  Nested Loop  (cost=0.84..494.14 rows=1 width=87) (actual time=1.140..1.151 rows=1 loops=1)
                                       ->  CTE Scan on sgs  (cost=0.00..2.02 rows=101 width=8) (actual time=0.005..0.591 rows=59 loops=1)
                                       ->  Index Scan using state_groups_state_type_idx on state_groups_state  (cost=0.84..4.86 rows=1 width=87) (actual time=0.009..0.009 rows=0 loops=59)
                                             Index Cond: ((state_group = sgs.state_group) AND (type = 'm.room.member'::text) AND (state_key = '@madlittlemods:matrix.org'::text))
               ->  Limit  (cost=494.15..494.16 rows=1 width=87) (actual time=0.370..0.371 rows=1 loops=1)
                     ->  Unique  (cost=494.15..494.16 rows=1 width=87) (actual time=0.370..0.370 rows=1 loops=1)
                           ->  Sort  (cost=494.15..494.16 rows=1 width=87) (actual time=0.369..0.370 rows=1 loops=1)
                                 Sort Key: state_groups_state_1.state_group DESC
                                 Sort Method: quicksort  Memory: 25kB
                                 ->  Nested Loop  (cost=0.84..494.14 rows=1 width=87) (actual time=0.364..0.366 rows=1 loops=1)
                                       ->  CTE Scan on sgs sgs_1  (cost=0.00..2.02 rows=101 width=8) (actual time=0.001..0.012 rows=59 loops=1)
                                       ->  Index Scan using state_groups_state_type_idx on state_groups_state state_groups_state_1  (cost=0.84..4.86 rows=1 width=87) (actual time=0.006..0.006 rows=0 loops=59)
                                             Index Cond: ((state_group = sgs_1.state_group) AND (type = 'm.room.history_visibility'::text) AND (state_key = ''::text))
 Planning Time: 3.998 ms
 Execution Time: 1.640 ms
(33 rows)

Time: 6.662 ms

Getting rid of the DISTINCT at this point seems to make no difference either:

https://explain.depesz.com/s/eEW

Query plan from LIMIT 1 and no DISTINCT
EXPLAIN ANALYZE WITH RECURSIVE sgs(state_group) AS (
    VALUES(739988088::bigint)
    UNION ALL
    SELECT prev_state_group FROM state_group_edges e, sgs s
    WHERE s.state_group = e.state_group
)
(
    SELECT
        state_group, type, state_key, event_id
    FROM state_groups_state
    INNER JOIN sgs USING (state_group)
    WHERE (type = 'm.room.member' AND state_key = '@madlittlemods:matrix.org')
    ORDER BY type, state_key, state_group DESC
    LIMIT 1
) UNION (
    SELECT
        state_group, type, state_key, event_id
    FROM state_groups_state
    INNER JOIN sgs USING (state_group)
    WHERE (type = 'm.room.history_visibility' AND state_key = '')
    ORDER BY type, state_key, state_group DESC
    LIMIT 1
);
                                                                                                QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Unique  (cost=1272.63..1272.66 rows=2 width=104) (actual time=1.537..1.546 rows=2 loops=1)
   CTE sgs
     ->  Recursive Union  (cost=0.00..284.28 rows=101 width=8) (actual time=0.003..0.560 rows=59 loops=1)
           ->  Result  (cost=0.00..0.01 rows=1 width=8) (actual time=0.001..0.002 rows=1 loops=1)
           ->  Nested Loop  (cost=0.57..28.23 rows=10 width=8) (actual time=0.008..0.008 rows=1 loops=59)
                 ->  WorkTable Scan on sgs s  (cost=0.00..0.20 rows=10 width=8) (actual time=0.000..0.000 rows=1 loops=59)
                 ->  Index Only Scan using state_group_edges_unique_idx on state_group_edges e  (cost=0.57..2.79 rows=1 width=16) (actual time=0.007..0.007 rows=1 loops=59)
                       Index Cond: (state_group = s.state_group)
                       Heap Fetches: 58
   ->  Sort  (cost=988.35..988.36 rows=2 width=104) (actual time=1.536..1.539 rows=2 loops=1)
         Sort Key: state_groups_state.state_group, state_groups_state.type, state_groups_state.state_key, state_groups_state.event_id
         Sort Method: quicksort  Memory: 25kB
         ->  Append  (cost=494.15..988.34 rows=2 width=104) (actual time=1.144..1.529 rows=2 loops=1)
               ->  Limit  (cost=494.15..494.16 rows=1 width=87) (actual time=1.143..1.145 rows=1 loops=1)
                     ->  Sort  (cost=494.15..494.16 rows=1 width=87) (actual time=1.142..1.143 rows=1 loops=1)
                           Sort Key: state_groups_state.state_group DESC
                           Sort Method: quicksort  Memory: 25kB
                           ->  Nested Loop  (cost=0.84..494.14 rows=1 width=87) (actual time=1.125..1.136 rows=1 loops=1)
                                 ->  CTE Scan on sgs  (cost=0.00..2.02 rows=101 width=8) (actual time=0.005..0.592 rows=59 loops=1)
                                 ->  Index Scan using state_groups_state_type_idx on state_groups_state  (cost=0.84..4.86 rows=1 width=87) (actual time=0.009..0.009 rows=0 loops=59)
                                       Index Cond: ((state_group = sgs.state_group) AND (type = 'm.room.member'::text) AND (state_key = '@madlittlemods:matrix.org'::text))
               ->  Limit  (cost=494.15..494.16 rows=1 width=87) (actual time=0.380..0.381 rows=1 loops=1)
                     ->  Sort  (cost=494.15..494.16 rows=1 width=87) (actual time=0.380..0.380 rows=1 loops=1)
                           Sort Key: state_groups_state_1.state_group DESC
                           Sort Method: quicksort  Memory: 25kB
                           ->  Nested Loop  (cost=0.84..494.14 rows=1 width=87) (actual time=0.375..0.377 rows=1 loops=1)
                                 ->  CTE Scan on sgs sgs_1  (cost=0.00..2.02 rows=101 width=8) (actual time=0.000..0.013 rows=59 loops=1)
                                 ->  Index Scan using state_groups_state_type_idx on state_groups_state state_groups_state_1  (cost=0.84..4.86 rows=1 width=87) (actual time=0.006..0.006 rows=0 loops=59)
                                       Index Cond: ((state_group = sgs_1.state_group) AND (type = 'm.room.history_visibility'::text) AND (state_key = ''::text))
 Planning Time: 4.449 ms
 Execution Time: 1.632 ms
(31 rows)

Time: 7.024 ms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in v2 -> #14527

"""

results: Dict[int, MutableStateMap[str]] = {group: {} for group in groups}
for group in groups:
row_info_list: List[Tuple] = []
txn.execute(sql, (group, EventTypes.RoomHistoryVisibility, ""))
history_vis_info = txn.fetchone()
if history_vis_info is not None:
row_info_list.append(history_vis_info)

txn.execute(sql, (group, EventTypes.Member, user_id_viewing_events))
membership_info = txn.fetchone()
if membership_info is not None:
row_info_list.append(membership_info)
Comment on lines +346 to +354
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can batch up these two individual queries to have less database round-trip time?

Is there a way we can batch up all the queries across all of the state groups?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, see the UNION idea from @erikjohnston in #14494 (comment)


for row in row_info_list:
typ, state_key, event_id = row
key = (intern_string(typ), intern_string(state_key))
results[group][key] = event_id

# The results should be considered immutable because we are using
# `intern_string` (TODO: Should we? copied from _get_state_groups_from_groups_txn).
return results

# Craft a StateFilter to use with the cache
state_filter_for_cache_lookup = StateFilter.from_types(
(
(EventTypes.RoomHistoryVisibility, ""),
(EventTypes.Member, user_id_viewing_events),
)
)
(
results_from_cache,
incomplete_groups,
) = await self._get_state_groups_from_cache(
groups, state_filter_for_cache_lookup
)

cache_sequence_nm = self._state_group_cache.sequence
cache_sequence_m = self._state_group_members_cache.sequence

results: Dict[int, StateMap[str]] = results_from_cache
for batch in batch_iter(incomplete_groups, 100):
group_to_state_mapping = await self.db_pool.runInteraction(
"_get_state_for_client_filtering_txn",
_get_state_for_client_filtering_txn,
batch,
)

# Now lets update the caches
# Help the cache hit ratio by expanding the filter a bit
state_filter_for_cache_insertion = (
state_filter_for_cache_lookup.return_expanded()
)
group_to_state_dict: Dict[int, StateMap[str]] = {}
group_to_state_dict.update(group_to_state_mapping)
self._insert_into_cache(
group_to_state_dict,
state_filter_for_cache_insertion,
cache_seq_num_members=cache_sequence_m,
cache_seq_num_non_members=cache_sequence_nm,
)

results.update(group_to_state_mapping)

return results

@cancellable
@trace
@tag_args
async def _get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
) -> Dict[int, MutableStateMap[str]]:
Expand All @@ -254,6 +409,7 @@ async def _get_state_for_groups(
"""
state_filter = state_filter or StateFilter.all()

# TODO: Replace with _get_state_groups_from_cache
member_filter, non_member_filter = state_filter.get_member_split()

# Now we look them up in the member and non-member caches
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ def from_types(types: Iterable[Tuple[str, Optional[str]]]) -> "StateFilter":
The new state filter.
"""
type_dict: Dict[str, Optional[Set[str]]] = {}
for typ, s in types:
for typ, state_key in types:
if typ in type_dict:
if type_dict[typ] is None:
continue

if s is None:
if state_key is None:
type_dict[typ] = None
continue

type_dict.setdefault(typ, set()).add(s) # type: ignore
type_dict.setdefault(typ, set()).add(state_key) # type: ignore

return StateFilter(
types=frozendict(
Expand Down
50 changes: 42 additions & 8 deletions synapse/visibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.events.utils import prune_event
from synapse.logging.opentracing import trace
from synapse.logging.opentracing import (
SynapseTags,
set_tag,
start_active_span,
tag_args,
trace,
)
from synapse.storage.controllers import StorageControllers
from synapse.storage.databases.main import DataStore
from synapse.storage.state import StateFilter
Expand Down Expand Up @@ -53,6 +59,7 @@


@trace
@tag_args
async def filter_events_for_client(
storage: StorageControllers,
user_id: str,
Expand Down Expand Up @@ -82,6 +89,11 @@ async def filter_events_for_client(
Returns:
The filtered events.
"""
set_tag(
SynapseTags.FUNC_ARG_PREFIX + "events.length",
str(len(events)),
)

# Filter out events that have been soft failed so that we don't relay them
# to clients.
events_before_filtering = events
Expand All @@ -94,11 +106,32 @@ async def filter_events_for_client(
[event.event_id for event in events],
)

types = (_HISTORY_VIS_KEY, (EventTypes.Member, user_id))
non_outlier_event_ids = event_ids = frozenset(
e.event_id for e in events if not e.internal_metadata.outlier
)

# we exclude outliers at this point, and then handle them separately later
event_id_to_state = await storage.state.get_state_for_events(
frozenset(e.event_id for e in events if not e.internal_metadata.outlier),
# TODO: Remove: We do this just to remove await_full_state from the comparison
await storage.state.get_state_group_for_events(
non_outlier_event_ids, await_full_state=True
)

# Grab the history visibility and membership for each of the events. That's all we
# need to know in order to filter them.
event_id_to_state = await storage.state._get_state_for_client_filtering_for_events(
# we exclude outliers at this point, and then handle them separately later
event_ids=non_outlier_event_ids,
user_id_viewing_events=user_id,
Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything is slightly rushed here and I wish I could have explored more before putting this up for review but this does seem like an improvement and we need to get any change deployed today so I've marked as ready for review.

Let me know if this just isn't right to ship.

)

# TODO: Remove comparison
# TODO: Remove cache invalidation
storage.state.stores.state._state_group_cache.invalidate_all()
storage.state.stores.state._state_group_members_cache.invalidate_all()
# logger.info("----------------------------------------------------")
# logger.info("----------------------------------------------------")
types = (_HISTORY_VIS_KEY, (EventTypes.Member, user_id))
event_id_to_state_orig = await storage.state.get_state_for_events(
non_outlier_event_ids,
state_filter=StateFilter.from_types(types),
)

Expand Down Expand Up @@ -130,9 +163,10 @@ def allowed(event: EventBase) -> Optional[EventBase]:
sender_erased=erased_senders.get(event.sender, False),
)

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the state_groups table exist?

$ psql synapse
# \d+ state_groups
                                Table "public.state_groups"
  Column  |  Type  | Collation | Nullable | Default | Storage  | Stats target | Description
----------+--------+-----------+----------+---------+----------+--------------+-------------
 id       | bigint |           | not null |         | plain    |              |
 room_id  | text   |           | not null |         | extended |              |
 event_id | text   |           | not null |         | extended |              |

It seems like a subset of state_groups_state

$ psql synapse
# \d+ state_groups_state
                               Table "public.state_groups_state"
   Column    |  Type  | Collation | Nullable | Default | Storage  | Stats target | Description
-------------+--------+-----------+----------+---------+----------+--------------+-------------
 state_group | bigint |           | not null |         | plain    |              |
 room_id     | text   |           | not null |         | extended |              |
 type        | text   |           | not null |         | extended |              |
 state_key   | text   |           | not null |         | extended |              |
 event_id    | text   |           | not null |         | extended |              |

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
# Check each event: gives an iterable of None or (a potentially modified)
# EventBase.
filtered_events = map(allowed, events)
with start_active_span("filtering events against allowed function"):
# Check each event: gives an iterable of None or (a potentially modified)
# EventBase.
filtered_events = map(allowed, events)

# Turn it into a list and remove None entries before returning.
return [ev for ev in filtered_events if ev]
Expand Down