Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add ResponseCache tests #9458

Merged
merged 8 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9458.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add tests to ResponseCache.
2 changes: 1 addition & 1 deletion synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()

self.protocol_meta_cache = ResponseCache(
hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS
hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
) # type: ResponseCache[Tuple[str, str]]

async def query_user(self, service, user_id):
Expand Down
13 changes: 8 additions & 5 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Awaitable,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Expand Down Expand Up @@ -99,7 +100,7 @@


class FederationServer(FederationBase):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.auth = hs.get_auth()
Expand All @@ -119,7 +120,7 @@ def __init__(self, hs):

# We cache results for transaction with the same ID
self._transaction_resp_cache = ResponseCache(
hs, "fed_txn_handler", timeout_ms=30000
hs.get_clock(), "fed_txn_handler", timeout_ms=30000
) # type: ResponseCache[Tuple[str, str]]

self.transaction_actions = TransactionActions(self.store)
Expand All @@ -129,10 +130,10 @@ def __init__(self, hs):
# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(
hs, "state_resp", timeout_ms=30000
hs.get_clock(), "state_resp", timeout_ms=30000
) # type: ResponseCache[Tuple[str, str]]
self._state_ids_resp_cache = ResponseCache(
hs, "state_ids_resp", timeout_ms=30000
hs.get_clock(), "state_ids_resp", timeout_ms=30000
) # type: ResponseCache[Tuple[str, str]]

self._federation_metrics_domains = (
Expand Down Expand Up @@ -455,7 +456,9 @@ async def _on_context_state_request_compute(
self, room_id: str, event_id: str
) -> Dict[str, list]:
if event_id:
pdus = await self.handler.get_state_for_pdu(room_id, event_id)
pdus = await self.handler.get_state_for_pdu(
room_id, event_id
) # type: Iterable[EventBase]
else:
pdus = (await self.state.get_current_state(room_id)).values()

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.validator = EventValidator()
self.snapshot_cache = ResponseCache(
hs, "initial_sync_cache"
hs.get_clock(), "initial_sync_cache"
) # type: ResponseCache[Tuple[str, Optional[StreamToken], Optional[StreamToken], str, Optional[int], bool, bool]]
self._event_serializer = hs.get_event_client_serializer()
self.storage = hs.get_storage()
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __init__(self, hs: "HomeServer"):
# succession, only process the first attempt and return its result to
# subsequent requests
self._upgrade_response_cache = ResponseCache(
hs, "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS
hs.get_clock(), "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS
) # type: ResponseCache[Tuple[str, str]]
self._server_notices_mxid = hs.config.server_notices_mxid

Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/room_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.enable_room_list_search = hs.config.enable_room_list_search
self.response_cache = ResponseCache(
hs, "room_list"
hs.get_clock(), "room_list"
) # type: ResponseCache[Tuple[Optional[int], Optional[str], ThirdPartyInstanceID]]
self.remote_response_cache = ResponseCache(
hs, "remote_room_list", timeout_ms=30 * 1000
hs.get_clock(), "remote_room_list", timeout_ms=30 * 1000
) # type: ResponseCache[Tuple[str, Optional[int], Optional[str], bool, Optional[str]]]

async def get_local_public_room_list(
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def __init__(self, hs: "HomeServer"):
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(
hs, "sync"
hs.get_clock(), "sync"
) # type: ResponseCache[Tuple[Any, ...]]
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
Expand Down
9 changes: 6 additions & 3 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import re
import urllib
from inspect import signature
from typing import Dict, List, Tuple
from typing import TYPE_CHECKING, Dict, List, Tuple

from prometheus_client import Counter, Gauge

Expand All @@ -28,6 +28,9 @@
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)

_pending_outgoing_requests = Gauge(
Expand Down Expand Up @@ -88,10 +91,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
CACHE = True
RETRY_ON_TIMEOUT = True

def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
if self.CACHE:
self.response_cache = ResponseCache(
hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000
hs.get_clock(), "repl." + self.NAME, timeout_ms=30 * 60 * 1000
) # type: ResponseCache[str]

# We reserve `instance_name` as a parameter to sending requests, so we
Expand Down
10 changes: 4 additions & 6 deletions synapse/util/caches/response_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar
from typing import Any, Callable, Dict, Generic, Optional, TypeVar

from twisted.internet import defer

from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.util import Clock
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches import register_cache

if TYPE_CHECKING:
from synapse.app.homeserver import HomeServer

logger = logging.getLogger(__name__)

T = TypeVar("T")
Expand All @@ -37,11 +35,11 @@ class ResponseCache(Generic[T]):
used rather than trying to compute a new response.
"""

def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0):
def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
# Requests that haven't finished yet.
self.pending_result_cache = {} # type: Dict[T, ObservableDeferred]

self.clock = hs.get_clock()
self.clock = clock
self.timeout_sec = timeout_ms / 1000.0

self._name = name
Expand Down
131 changes: 131 additions & 0 deletions tests/util/caches/test_responsecache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.util.caches.response_cache import ResponseCache

from tests.server import get_clock
from tests.unittest import TestCase


class DeferredCacheTestCase(TestCase):
"""
A TestCase class for ResponseCache.

The test-case function naming has some logic to it in it's parts, here's some notes about it:
wait: Denotes tests that have an element of "waiting" before its wrapped result becomes available
(Generally these just use .delayed_return instead of .instant_return in it's wrapped call.)
expire: Denotes tests that test expiry after assured existence.
(These have cache with a short timeout_ms=, shorter than will be tested through advancing the clock)
"""

def setUp(self):
self.reactor, self.clock = get_clock()

def with_cache(self, name: str, ms: int = 0) -> ResponseCache:
return ResponseCache(self.clock, name, timeout_ms=ms)

@staticmethod
async def instant_return(o: str) -> str:
return o

async def delayed_return(self, o: str) -> str:
await self.clock.sleep(1)
return o

def test_cache_hit(self):
cache = self.with_cache("keeping_cache", ms=9001)

expected_result = "howdy"

wrap_d = cache.wrap(0, self.instant_return, expected_result)

self.assertEqual(
expected_result,
self.successResultOf(wrap_d),
"initial wrap result should be the same",
)
self.assertEqual(
expected_result,
self.successResultOf(cache.get(0)),
"cache should have the result",
)

def test_cache_miss(self):
cache = self.with_cache("trashing_cache", ms=0)

expected_result = "howdy"

wrap_d = cache.wrap(0, self.instant_return, expected_result)

self.assertEqual(
expected_result,
self.successResultOf(wrap_d),
"initial wrap result should be the same",
)
self.assertIsNone(cache.get(0), "cache should not have the result now")

def test_cache_expire(self):
cache = self.with_cache("short_cache", ms=1000)

expected_result = "howdy"

wrap_d = cache.wrap(0, self.instant_return, expected_result)

self.assertEqual(expected_result, self.successResultOf(wrap_d))
self.assertEqual(
expected_result,
self.successResultOf(cache.get(0)),
"cache should still have the result",
)

# cache eviction timer is handled
self.reactor.pump((2,))

self.assertIsNone(cache.get(0), "cache should not have the result now")

def test_cache_wait_hit(self):
cache = self.with_cache("neutral_cache")

expected_result = "howdy"

wrap_d = cache.wrap(0, self.delayed_return, expected_result)
self.assertNoResult(wrap_d)

# function wakes up, returns result
self.reactor.pump((2,))

self.assertEqual(expected_result, self.successResultOf(wrap_d))

def test_cache_wait_expire(self):
cache = self.with_cache("medium_cache", ms=3000)

expected_result = "howdy"

wrap_d = cache.wrap(0, self.delayed_return, expected_result)
self.assertNoResult(wrap_d)

# stop at 1 second to callback cache eviction callLater at that time, then another to set time at 2
self.reactor.pump((1, 1))

self.assertEqual(expected_result, self.successResultOf(wrap_d))
self.assertEqual(
expected_result,
self.successResultOf(cache.get(0)),
"cache should still have the result",
)

# (1 + 1 + 2) > 3.0, cache eviction timer is handled
self.reactor.pump((2,))

self.assertIsNone(cache.get(0), "cache should not have the result now")