Skip to content

Commit

Permalink
ref: Remove default processing store, use redis backend everywhere (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker authored Jul 3, 2023
1 parent 2c31ee0 commit 842495d
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 39 deletions.
4 changes: 3 additions & 1 deletion src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1871,7 +1871,9 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
SENTRY_REPLAYS_CACHE_OPTIONS: Dict[str, Any] = {}

# Events blobs processing backend
SENTRY_EVENT_PROCESSING_STORE = "sentry.eventstore.processing.default.DefaultEventProcessingStore"
SENTRY_EVENT_PROCESSING_STORE = (
"sentry.eventstore.processing.redis.RedisClusterEventProcessingStore"
)
SENTRY_EVENT_PROCESSING_STORE_OPTIONS: dict[str, str] = {}

# The internal Django cache is still used in many places
Expand Down
12 changes: 0 additions & 12 deletions src/sentry/eventstore/processing/default.py

This file was deleted.

18 changes: 11 additions & 7 deletions src/sentry/eventstore/processing/redis.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from sentry.cache.redis import RedisClusterCache
from sentry.utils.kvstore.cache import CacheKVStorage
from sentry.utils.codecs import JSONCodec
from sentry.utils.kvstore.encoding import KVStorageCodecWrapper
from sentry.utils.kvstore.redis import RedisKVStorage
from sentry.utils.redis import redis_clusters

from .base import EventProcessingStore


def RedisClusterEventProcessingStore(**options) -> EventProcessingStore:
"""
Creates an instance of the processing store which uses the Redis Cluster
cache as its backend.
Keyword argument are forwarded to the ``RedisClusterCache`` constructor.
Creates an instance of the processing store which uses a Redis Cluster
client as its backend.
"""
return EventProcessingStore(CacheKVStorage(RedisClusterCache(**options)))
return EventProcessingStore(
KVStorageCodecWrapper(
RedisKVStorage(redis_clusters.get(options.pop("cluster", "default"))), JSONCodec()
)
)
3 changes: 2 additions & 1 deletion src/sentry/integrations/slack/utils/rule_status.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from typing import Union, cast
from uuid import uuid4

from django.conf import settings
Expand Down Expand Up @@ -37,7 +38,7 @@ def set_value(
def get_value(self) -> JSONData:
key = self._get_redis_key()
value = self.client.get(key)
return json.loads(value)
return json.loads(cast(Union[str, bytes], value))

def _generate_uuid(self) -> str:
return uuid4().hex
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/ratelimits/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self, max_tll_seconds: int = DEFAULT_MAX_TTL_SECONDS) -> None:
def validate(self) -> None:
try:
self.client.ping()
self.client.connection_pool.disconnect()
self.client.connection_pool.disconnect() # type: ignore[union-attr]
except Exception as e:
raise InvalidConfiguration(str(e))

Expand Down
2 changes: 1 addition & 1 deletion src/sentry/ratelimits/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _construct_redis_key(
def validate(self) -> None:
try:
self.client.ping()
self.client.connection_pool.disconnect()
self.client.connection_pool.disconnect() # type: ignore[union-attr]
except Exception as e:
raise InvalidConfiguration(str(e))

Expand Down
2 changes: 1 addition & 1 deletion src/sentry/testutils/helpers/eventprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


def write_event_to_cache(event):
cache_data = event.data
cache_data = dict(event.data)
cache_data["event_id"] = event.event_id
cache_data["project"] = event.project_id
return event_processing_store.store(cache_data)
18 changes: 11 additions & 7 deletions src/sentry/utils/kvstore/redis.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
from __future__ import annotations

from datetime import timedelta
from typing import Optional
from typing import Optional, TypeVar

from redis import Redis
from sentry_redis_tools.clients import RedisCluster, StrictRedis

from sentry.utils.kvstore.abstract import KVStorage

T = TypeVar("T", str, bytes)


class RedisKVStorage(KVStorage[str, bytes]):
class RedisKVStorage(KVStorage[str, T]):
"""
This class provides a key/value store backed by Redis (either a single node
or cluster.)
"""

def __init__(self, client: "Redis[bytes]") -> None:
self.client = client
def __init__(self, client: StrictRedis[T] | RedisCluster[T]) -> None:
self.client: StrictRedis[T] | RedisCluster[T] = client

def get(self, key: str) -> Optional[bytes]:
def get(self, key: str) -> Optional[T]:
return self.client.get(key.encode("utf8"))

def set(self, key: str, value: bytes, ttl: Optional[timedelta] = None) -> None:
def set(self, key: str, value: T, ttl: Optional[timedelta] = None) -> None:
self.client.set(key.encode("utf8"), value, ex=ttl)

def delete(self, key: str) -> None:
Expand Down
15 changes: 12 additions & 3 deletions src/sentry/utils/redis.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

import functools
import logging
import posixpath
from copy import deepcopy
from threading import Lock
from typing import Generic, TypeVar

import rb
from django.utils.functional import SimpleLazyObject
Expand All @@ -12,6 +15,7 @@
from redis.exceptions import BusyLoadingError, ConnectionError
from rediscluster import RedisCluster
from rediscluster.exceptions import ClusterError
from sentry_redis_tools import clients
from sentry_redis_tools.failover_redis import FailoverRedis

from sentry import options
Expand Down Expand Up @@ -141,13 +145,16 @@ def __str__(self):
return "Redis Cluster"


class ClusterManager:
T = TypeVar("T")


class ClusterManager(Generic[T]):
def __init__(self, options_manager, cluster_type=_RBCluster):
self.__clusters = {}
self.__options_manager = options_manager
self.__cluster_type = cluster_type()

def get(self, key):
def get(self, key) -> T:
cluster = self.__clusters.get(key)

# Do not access attributes of the `cluster` object to prevent
Expand All @@ -172,7 +179,9 @@ def get(self, key):
# completed, remove the rb ``clusters`` module variable and rename
# redis_clusters to clusters.
clusters = ClusterManager(options.default_manager)
redis_clusters = ClusterManager(options.default_manager, _RedisCluster)
redis_clusters: ClusterManager[clients.RedisCluster | clients.StrictRedis] = ClusterManager(
options.default_manager, _RedisCluster
)


def get_cluster_from_options(setting, options, cluster_manager=clusters):
Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/ratelimits/test_redis_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def fail(*args, **kwargs):
return fail

limiter = ConcurrentRateLimiter()
limiter.client = FakeClient(limiter.client)
limiter.client = FakeClient(limiter.client) # type: ignore[assignment]
failed_request = limiter.start_request("key", 100, "some_uid")
assert failed_request.current_executions == -1
assert failed_request.limit_exceeded is False
Expand Down
9 changes: 9 additions & 0 deletions tests/sentry/tasks/test_post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest import mock
from unittest.mock import Mock, patch

import pytest
import pytz
from django.test import override_settings
from django.utils import timezone
Expand Down Expand Up @@ -1862,3 +1863,11 @@ def test_occurrence_deduping(self, mock_processor):

# Make sure we haven't called this again, since we should exit early.
assert mock_processor.call_count == 1

@pytest.mark.skip(reason="those tests do not work with the given call_post_process_group impl")
def test_processing_cache_cleared(self):
pass

@pytest.mark.skip(reason="those tests do not work with the given call_post_process_group impl")
def test_processing_cache_cleared_with_commits(self):
pass
2 changes: 1 addition & 1 deletion tests/sentry/tasks/test_reprocessing2.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def inner(data, seconds_ago=1):
mgr.normalize()
data = mgr.get_data()
event_id = data["event_id"]
cache_key = event_processing_store.store(data)
cache_key = event_processing_store.store(dict(data))

with task_runner():
# factories.store_event would almost be suitable for this, but let's
Expand Down
6 changes: 3 additions & 3 deletions tests/sentry/utils/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_get(self):
manager = make_manager()
assert manager.get("foo") is manager.get("foo")
assert manager.get("foo") is not manager.get("bar")
assert manager.get("foo").pool_cls is _shared_pool
assert manager.get("foo").pool_cls is _shared_pool # type: ignore[attr-defined]
with pytest.raises(KeyError):
manager.get("invalid")

Expand All @@ -52,9 +52,9 @@ def test_specific_cluster(self, RetryingRedisCluster):
# object to verify it's correct.

# cluster foo is fine since it's a single node, without specific client_class
assert isinstance(manager.get("foo")._setupfunc(), FailoverRedis)
assert isinstance(manager.get("foo")._setupfunc(), FailoverRedis) # type: ignore[attr-defined]
# baz works becasue it's explicitly is_redis_cluster
assert manager.get("baz")._setupfunc() is RetryingRedisCluster.return_value
assert manager.get("baz")._setupfunc() is RetryingRedisCluster.return_value # type: ignore[attr-defined]

# bar is not a valid redis or redis cluster definition
# becasue it is two hosts, without explicitly saying is_redis_cluster
Expand Down

0 comments on commit 842495d

Please sign in to comment.