mod cluster; mod sentinel; use anyhow::Result; use redis::{ AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode, aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo, }; use std::{fmt::Debug, sync::Arc}; use bb8_redis::{ RedisConnectionManager, bb8::{self, Pool, RunError}, }; use tokio::sync::Mutex; use crate::{ config::cache::{CacheConfig, RedisVariant}, server::state::cache::{ cluster::RedisClusterConnectionManager, sentinel::RedisSentinelConnectionManager, }, }; const REDIS_CONN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2); #[derive(Clone)] pub enum RedisManager { Clustered(Pool), NonClustered(Pool), Sentinel(Pool), ClusteredUnpooled(redis::cluster_async::ClusterConnection), NonClusteredUnpooled(redis::aio::ConnectionManager), SentinelUnpooled(Arc>), } impl Debug for RedisManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Clustered(arg0) => f.debug_tuple("Clustered").field(arg0).finish(), Self::NonClustered(arg0) => f.debug_tuple("NonClustered").field(arg0).finish(), Self::Sentinel(arg0) => f.debug_tuple("Sentinel").field(arg0).finish(), Self::ClusteredUnpooled(_arg0) => f.debug_tuple("ClusteredUnpooled").finish(), Self::NonClusteredUnpooled(arg0) => { f.debug_tuple("NonClusteredUnpooled").field(arg0).finish() } Self::SentinelUnpooled(_arg0) => f.debug_tuple("SentinelUnpooled").finish(), } } } pub enum RedisConnection<'a> { Clustered(bb8::PooledConnection<'a, RedisClusterConnectionManager>), NonClustered(bb8::PooledConnection<'a, RedisConnectionManager>), SentinelPooled(bb8::PooledConnection<'a, RedisSentinelConnectionManager>), ClusteredUnpooled(redis::cluster_async::ClusterConnection), NonClusteredUnpooled(redis::aio::ConnectionManager), SentinelUnpooled(redis::aio::MultiplexedConnection), } impl redis::aio::ConnectionLike for RedisConnection<'_> { fn req_packed_command<'a>( &'a mut self, cmd: &'a redis::Cmd, ) -> redis::RedisFuture<'a, redis::Value> { match self { RedisConnection::Clustered(conn) => conn.req_packed_command(cmd), RedisConnection::NonClustered(conn) => conn.req_packed_command(cmd), RedisConnection::ClusteredUnpooled(conn) => conn.req_packed_command(cmd), RedisConnection::NonClusteredUnpooled(conn) => conn.req_packed_command(cmd), RedisConnection::SentinelPooled(conn) => conn.req_packed_command(cmd), RedisConnection::SentinelUnpooled(conn) => conn.req_packed_command(cmd), } } fn req_packed_commands<'a>( &'a mut self, cmd: &'a redis::Pipeline, offset: usize, count: usize, ) -> redis::RedisFuture<'a, Vec> { match self { RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count), RedisConnection::NonClustered(conn) => conn.req_packed_commands(cmd, offset, count), RedisConnection::ClusteredUnpooled(conn) => { conn.req_packed_commands(cmd, offset, count) } RedisConnection::NonClusteredUnpooled(conn) => { conn.req_packed_commands(cmd, offset, count) } RedisConnection::SentinelPooled(conn) => conn.req_packed_commands(cmd, offset, count), RedisConnection::SentinelUnpooled(conn) => conn.req_packed_commands(cmd, offset, count), } } fn get_db(&self) -> i64 { match self { RedisConnection::Clustered(conn) => conn.get_db(), RedisConnection::NonClustered(conn) => conn.get_db(), RedisConnection::ClusteredUnpooled(conn) => conn.get_db(), RedisConnection::NonClusteredUnpooled(conn) => conn.get_db(), RedisConnection::SentinelPooled(conn) => conn.get_db(), RedisConnection::SentinelUnpooled(conn) => conn.get_db(), } } } impl RedisManager { pub async fn new(config: &CacheConfig) -> Result { if config.pooled { Self::new_pooled( config.redis_dsn.as_ref(), &config.kind, config.max_connections, ) .await } else { Self::new_unpooled(config.redis_dsn.as_ref(), &config.kind).await } } async fn new_pooled(dsn: &str, variant: &RedisVariant, max_conns: u16) -> Result { match variant { RedisVariant::Clustered => { let mgr = RedisClusterConnectionManager::new(dsn)?; let pool = bb8::Pool::builder() .max_size(max_conns.into()) .build(mgr) .await?; Ok(RedisManager::Clustered(pool)) } RedisVariant::NonClustered => { let mgr = RedisConnectionManager::new(dsn)?; let pool = bb8::Pool::builder() .max_size(max_conns.into()) .build(mgr) .await?; Ok(RedisManager::NonClustered(pool)) } RedisVariant::Sentinel(cfg) => { let mgr = RedisSentinelConnectionManager::new( vec![dsn], cfg.service_name.clone(), Some(create_config(cfg)), )?; let pool = bb8::Pool::builder() .max_size(max_conns.into()) .build(mgr) .await?; Ok(RedisManager::Sentinel(pool)) } } } async fn new_unpooled(dsn: &str, variant: &RedisVariant) -> Result { match variant { RedisVariant::Clustered => { let cli = redis::cluster::ClusterClient::builder(vec![dsn]) .retries(1) .connection_timeout(REDIS_CONN_TIMEOUT) .build()?; let con = cli.get_async_connection().await?; Ok(RedisManager::ClusteredUnpooled(con)) } RedisVariant::NonClustered => { let cli = redis::Client::open(dsn)?; let con = redis::aio::ConnectionManager::new_with_config( cli, ConnectionManagerConfig::new() .set_number_of_retries(1) .set_connection_timeout(Some(REDIS_CONN_TIMEOUT)), ) .await?; Ok(RedisManager::NonClusteredUnpooled(con)) } RedisVariant::Sentinel(cfg) => { let cli = redis::sentinel::SentinelClient::build( vec![dsn], cfg.service_name.clone(), Some(create_config(cfg)), redis::sentinel::SentinelServerType::Master, )?; Ok(RedisManager::SentinelUnpooled(Arc::new(Mutex::new(cli)))) } } } pub async fn get(&self) -> Result, RunError> { match self { Self::Clustered(pool) => Ok(RedisConnection::Clustered(pool.get().await?)), Self::NonClustered(pool) => Ok(RedisConnection::NonClustered(pool.get().await?)), Self::Sentinel(pool) => Ok(RedisConnection::SentinelPooled(pool.get().await?)), Self::ClusteredUnpooled(conn) => Ok(RedisConnection::ClusteredUnpooled(conn.clone())), Self::NonClusteredUnpooled(conn) => { Ok(RedisConnection::NonClusteredUnpooled(conn.clone())) } Self::SentinelUnpooled(conn) => { let mut conn = conn.lock().await; let con = conn .get_async_connection_with_config( &AsyncConnectionConfig::new() .set_response_timeout(Some(REDIS_CONN_TIMEOUT)), ) .await?; Ok(RedisConnection::SentinelUnpooled(con)) } } } } fn create_config(cfg: &crate::config::cache::SentinelConfig) -> SentinelNodeConnectionInfo { let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure); let protocol = if cfg.redis_use_resp3 { ProtocolVersion::RESP3 } else { ProtocolVersion::default() }; let info = RedisConnectionInfo::default(); let info = if let Some(pass) = &cfg.redis_password { info.set_password(pass.clone()) } else { info }; let info = if let Some(user) = &cfg.redis_username { info.set_username(user.clone()) } else { info } .set_protocol(protocol.clone()) .set_db(cfg.redis_db.unwrap_or(0)); let sent_info = SentinelNodeConnectionInfo::default(); if let Some(tls) = tls_mode { sent_info.set_tls_mode(tls) } else { sent_info } .set_redis_connection_info(info) }