Skip to content

Commit

Permalink
Add read_from_replicas support for Redis cluster configuration
Browse files Browse the repository at this point in the history
This closes #351
  • Loading branch information
fruscianteee committed Sep 9, 2024
1 parent c4c03d7 commit b559365
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 9 deletions.
2 changes: 1 addition & 1 deletion r2d2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[0.4.0]: https://github.com/bikeshedder/deadpool/compare/deadpool-r2d2-v0.3.0...deadpool-r2d2-v0.4.0
[0.3.0]: https://github.com/bikeshedder/deadpool/compare/deadpool-r2d2-v0.2.0...deadpool-r2d2-v0.3.0
[0.2.0]: https://github.com/bikeshedder/deadpool/compare/deadpool-r2d2-v0.1.0...deadpool-r2d2-v0.2.0
[0.1.0]: https://github.com/bikeshedder/deadpool/releases/tag/deadpool-r2d2-v0.1.0
[0.1.0]: https://github.com/bikeshedder/deadpool/releases/tag/deadpool-r2d2-v0.1.0
2 changes: 2 additions & 0 deletions redis/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- __Breaking:__ Modified the `Config` struct to support `read_from_replicas` for Redis clusters, using the redis-rs crate's functionality.
- Add support for enabling `read_from_replicas` via the environment variable REDIS_CLUSTER__READ_FROM_REPLICAS.
- Add support for `redis::sentinel`

## [0.16.0] - 2024-08-05
Expand Down
28 changes: 24 additions & 4 deletions redis/src/cluster/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use super::{CreatePoolError, Pool, PoolBuilder, PoolConfig, Runtime};
/// [`config`](https://crates.io/crates/config) crate as following:
/// ```env
/// REDIS_CLUSTER__URLS=redis://127.0.0.1:7000,redis://127.0.0.1:7001
/// REDIS_CLUSTER__READ_FROM_REPLICAS=true
/// REDIS_CLUSTER__POOL__MAX_SIZE=16
/// REDIS_CLUSTER__POOL__TIMEOUTS__WAIT__SECS=2
/// REDIS_CLUSTER__POOL__TIMEOUTS__WAIT__NANOS=0
Expand Down Expand Up @@ -48,6 +49,18 @@ pub struct Config {

/// Pool configuration.
pub pool: Option<PoolConfig>,

/// Enables or disables reading from replica nodes in a Redis cluster.
///
/// When set to `true`, read operations may be distributed across
/// replica nodes, which can help in load balancing read requests.
/// When set to `false`, all read operations will be directed to the
/// master node(s). This option is particularly useful in a high-availability
/// setup where read scalability is needed.
///
/// Default is `false`.
#[serde(default)]
pub read_from_replicas: bool,
}

impl Config {
Expand All @@ -71,11 +84,16 @@ impl Config {
/// See [`ConfigError`] for details.
pub fn builder(&self) -> Result<PoolBuilder, ConfigError> {
let manager = match (&self.urls, &self.connections) {
(Some(urls), None) => {
super::Manager::new(urls.iter().map(|url| url.as_str()).collect())?
(Some(urls), None) => super::Manager::new(
urls.iter().map(|url| url.as_str()).collect(),
self.read_from_replicas,
)?,
(None, Some(connections)) => {
super::Manager::new(connections.clone(), self.read_from_replicas)?
}
(None, None) => {
super::Manager::new(vec![ConnectionInfo::default()], self.read_from_replicas)?
}
(None, Some(connections)) => super::Manager::new(connections.clone())?,
(None, None) => super::Manager::new(vec![ConnectionInfo::default()])?,
(Some(_), Some(_)) => return Err(ConfigError::UrlAndConnectionSpecified),
};
let pool_config = self.get_pool_config();
Expand All @@ -97,6 +115,7 @@ impl Config {
urls: Some(urls.into()),
connections: None,
pool: None,
read_from_replicas: false,
}
}
}
Expand All @@ -107,6 +126,7 @@ impl Default for Config {
urls: None,
connections: Some(vec![ConnectionInfo::default()]),
pool: None,
read_from_replicas: false,
}
}
}
15 changes: 11 additions & 4 deletions redis/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use deadpool::managed;
use redis::{aio::ConnectionLike, IntoConnectionInfo, RedisError, RedisResult};

use redis;
pub use redis::cluster::ClusterClient;
pub use redis::cluster::{ClusterClient, ClusterClientBuilder};
pub use redis::cluster_async::ClusterConnection;

pub use self::config::{Config, ConfigError};
Expand Down Expand Up @@ -122,10 +122,17 @@ impl Manager {
///
/// # Errors
///
/// If establishing a new [`ClusterClient`] fails.
pub fn new<T: IntoConnectionInfo>(params: Vec<T>) -> RedisResult<Self> {
/// If establishing a new [`ClusterClientBuilder`] fails.
pub fn new<T: IntoConnectionInfo>(
params: Vec<T>,
read_from_replicas: bool,
) -> RedisResult<Self> {
let mut client = ClusterClientBuilder::new(params);
if read_from_replicas {
client = client.read_from_replicas();
}
Ok(Self {
client: ClusterClient::new(params)?,
client: client.build()?,
ping_number: AtomicUsize::new(0),
})
}
Expand Down
25 changes: 25 additions & 0 deletions redis/tests/redis_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,31 @@ async fn test_pipeline() {
assert_eq!(value, "42".to_string());
}

#[tokio::test]
async fn test_read_from_replicas() {
use deadpool_redis::redis::pipe;
let mut cfg = Config::from_env();
cfg.redis_cluster.read_from_replicas = true;
assert_eq!(cfg.redis_cluster.read_from_replicas, true);

let pool = cfg
.redis_cluster
.create_pool(Some(Runtime::Tokio1))
.unwrap();
let mut conn = pool.get().await.unwrap();
let (value,): (String,) = pipe()
.cmd("SET")
.arg("deadpool/pipeline_test_key")
.arg("42")
.ignore()
.cmd("GET")
.arg("deadpool/pipeline_test_key")
.query_async(&mut conn)
.await
.unwrap();
assert_eq!(value, "42".to_string());
}

#[tokio::test]
async fn test_high_level_commands() {
use deadpool_redis::redis::AsyncCommands;
Expand Down

0 comments on commit b559365

Please sign in to comment.