diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-10 12:55:43 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-10 12:55:43 +0200 |
commit | bd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04 (patch) | |
tree | 50b63525480da0bee2ce713d69f02617c20bee8d /lib/warden-stack/src/cache/cluster.rs | |
parent | 8deeab3e11f707677609047f5577a256cf28ed63 (diff) | |
download | warden-bd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04.tar.bz2 warden-bd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04.zip |
chore: collapse stack-up
Diffstat (limited to 'lib/warden-stack/src/cache/cluster.rs')
-rw-r--r-- | lib/warden-stack/src/cache/cluster.rs | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/lib/warden-stack/src/cache/cluster.rs b/lib/warden-stack/src/cache/cluster.rs new file mode 100644 index 0000000..91e3b24 --- /dev/null +++ b/lib/warden-stack/src/cache/cluster.rs @@ -0,0 +1,52 @@ +use redis::{ + ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError, + cluster::{ClusterClient, ClusterClientBuilder}, + cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo}, +}; + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous clustered connections via `redis_cluster_async::Connection` +#[derive(Clone)] +pub struct RedisClusterConnectionManager { + client: ClusterClient, +} + +impl RedisClusterConnectionManager { + pub fn new<T: IntoConnectionInfo>( + info: T, + ) -> Result<RedisClusterConnectionManager, RedisError> { + Ok(RedisClusterConnectionManager { + client: ClusterClientBuilder::new(vec![info]).retries(0).build()?, + }) + } +} + +impl bb8::ManageConnection for RedisClusterConnectionManager { + type Connection = redis::cluster_async::ClusterConnection; + type Error = RedisError; + + async fn connect(&self) -> Result<Self::Connection, Self::Error> { + self.client.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let pong = conn + .route_command( + &redis::cmd("PING"), + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllMasters, + Some(ResponsePolicy::OneSucceeded), + )), + ) + .await + .and_then(|v| String::from_redis_value(&v))?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err((ErrorKind::ResponseError, "ping request").into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} |