aboutsummaryrefslogtreecommitdiffstats
path: root/lib/warden-stack/src/cache/cluster.rs
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-10 12:55:43 +0200
committerrtkay123 <dev@kanjala.com>2025-08-10 12:55:43 +0200
commitbd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04 (patch)
tree50b63525480da0bee2ce713d69f02617c20bee8d /lib/warden-stack/src/cache/cluster.rs
parent8deeab3e11f707677609047f5577a256cf28ed63 (diff)
downloadwarden-bd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04.tar.bz2
warden-bd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04.zip
chore: collapse stack-up
Diffstat (limited to 'lib/warden-stack/src/cache/cluster.rs')
-rw-r--r--lib/warden-stack/src/cache/cluster.rs52
1 files changed, 52 insertions, 0 deletions
diff --git a/lib/warden-stack/src/cache/cluster.rs b/lib/warden-stack/src/cache/cluster.rs
new file mode 100644
index 0000000..91e3b24
--- /dev/null
+++ b/lib/warden-stack/src/cache/cluster.rs
@@ -0,0 +1,52 @@
+use redis::{
+ ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError,
+ cluster::{ClusterClient, ClusterClientBuilder},
+ cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo},
+};
+
+/// ConnectionManager that implements `bb8::ManageConnection` and supports
+/// asynchronous clustered connections via `redis_cluster_async::Connection`
+#[derive(Clone)]
+pub struct RedisClusterConnectionManager {
+ client: ClusterClient,
+}
+
+impl RedisClusterConnectionManager {
+ pub fn new<T: IntoConnectionInfo>(
+ info: T,
+ ) -> Result<RedisClusterConnectionManager, RedisError> {
+ Ok(RedisClusterConnectionManager {
+ client: ClusterClientBuilder::new(vec![info]).retries(0).build()?,
+ })
+ }
+}
+
+impl bb8::ManageConnection for RedisClusterConnectionManager {
+ type Connection = redis::cluster_async::ClusterConnection;
+ type Error = RedisError;
+
+ async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ self.client.get_async_connection().await
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
+ let pong = conn
+ .route_command(
+ &redis::cmd("PING"),
+ RoutingInfo::MultiNode((
+ MultipleNodeRoutingInfo::AllMasters,
+ Some(ResponsePolicy::OneSucceeded),
+ )),
+ )
+ .await
+ .and_then(|v| String::from_redis_value(&v))?;
+ match pong.as_str() {
+ "PONG" => Ok(()),
+ _ => Err((ErrorKind::ResponseError, "ping request").into()),
+ }
+ }
+
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}