use bb8_redis::bb8; use redis::{ ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError, cluster::{ClusterClient, ClusterClientBuilder}, cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo}, }; /// ConnectionManager that implements `bb8::ManageConnection` and supports /// asynchronous clustered connections via `redis_cluster_async::Connection` #[derive(Clone)] pub struct RedisClusterConnectionManager { client: ClusterClient, } impl RedisClusterConnectionManager { pub fn new( info: T, ) -> Result { Ok(RedisClusterConnectionManager { client: ClusterClientBuilder::new(vec![info]).retries(0).build()?, }) } } impl bb8::ManageConnection for RedisClusterConnectionManager { type Connection = redis::cluster_async::ClusterConnection; type Error = RedisError; async fn connect(&self) -> Result { self.client.get_async_connection().await } async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { let cmd = redis::cmd("PING"); let pong = conn .route_command( cmd, RoutingInfo::MultiNode(( MultipleNodeRoutingInfo::AllMasters, Some(ResponsePolicy::OneSucceeded), )), ) .await .and_then(|v| Ok(String::from_redis_value(v)?))?; match pong.as_str() { "PONG" => Ok(()), _ => Err(( ErrorKind::Server(redis::ServerErrorKind::ResponseError), "ping request", ) .into()), } } fn has_broken(&self, _: &mut Self::Connection) -> bool { false } }