mod key; pub use key::*; mod cluster; mod sentinel; pub use sentinel::SentinelConfig; use std::{sync::Arc, time::Duration}; use bb8::RunError; // use bb8_redis::RedisConnectionManager; use futures_util::lock::Mutex; use redis::{ AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode, aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo, }; pub use self::cluster::RedisClusterConnectionManager; pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2); pub enum RedisVariant { Clustered, NonClustered, Sentinel(sentinel::SentinelConfig), } #[derive(Clone)] pub enum RedisManager { Clustered(redis::cluster_async::ClusterConnection), NonClustered(redis::aio::ConnectionManager), Sentinel(Arc>), } impl RedisManager { pub async fn new(dsn: &str, variant: RedisVariant) -> Self { match variant { RedisVariant::Clustered => { let cli = redis::cluster::ClusterClient::builder(vec![dsn]) .retries(1) .connection_timeout(REDIS_CONN_TIMEOUT) .build() .expect("Error initializing redis-unpooled cluster client"); let con = cli .get_async_connection() .await .expect("Failed to get redis-cluster-unpooled connection"); RedisManager::Clustered(con) } RedisVariant::NonClustered => { let cli = redis::Client::open(dsn).expect("Error initializing redis unpooled client"); let con = redis::aio::ConnectionManager::new_with_config( cli, ConnectionManagerConfig::new() .set_number_of_retries(1) .set_connection_timeout(Some(REDIS_CONN_TIMEOUT)), ) .await .expect("Failed to get redis-unpooled connection manager"); RedisManager::NonClustered(con) } RedisVariant::Sentinel(cfg) => { let tls_mode = if cfg.redis_tls_mode_secure { TlsMode::Secure } else { TlsMode::Insecure }; let protocol = if cfg.redis_use_resp3 { ProtocolVersion::RESP3 } else { ProtocolVersion::default() }; let redis_connection_info = RedisConnectionInfo::default() .set_db(cfg.redis_db.unwrap_or(0)) .set_protocol(protocol) .set_username(cfg.redis_username.clone()) .set_password(cfg.redis_password.clone()); let sentinel = SentinelNodeConnectionInfo::default() .set_redis_connection_info(redis_connection_info) .set_tls_mode(tls_mode); let cli = redis::sentinel::SentinelClient::build( vec![dsn], cfg.service_name.clone(), Some(sentinel), redis::sentinel::SentinelServerType::Master, ) .expect("Failed to build sentinel client"); RedisManager::Sentinel(Arc::new(Mutex::new(cli))) } } } pub async fn get(&self) -> Result> { match self { Self::Clustered(conn) => Ok(RedisConnection::Clustered(conn.clone())), Self::NonClustered(conn) => Ok(RedisConnection::NonClustere(conn.clone())), Self::Sentinel(conn) => { let mut conn = conn.lock().await; let con = conn .get_async_connection_with_config( &AsyncConnectionConfig::new() .set_response_timeout(Some(REDIS_CONN_TIMEOUT)), ) .await?; Ok(RedisConnection::Sentinel(con)) } } } } pub enum RedisConnection { Clustered(redis::cluster_async::ClusterConnection), NonClustere(redis::aio::ConnectionManager), Sentinel(redis::aio::MultiplexedConnection), } impl redis::aio::ConnectionLike for RedisConnection { fn req_packed_command<'a>( &'a mut self, cmd: &'a redis::Cmd, ) -> redis::RedisFuture<'a, redis::Value> { match self { RedisConnection::Clustered(conn) => conn.req_packed_command(cmd), RedisConnection::NonClustere(conn) => conn.req_packed_command(cmd), RedisConnection::Sentinel(conn) => conn.req_packed_command(cmd), } } fn req_packed_commands<'a>( &'a mut self, cmd: &'a redis::Pipeline, offset: usize, count: usize, ) -> redis::RedisFuture<'a, Vec> { match self { RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count), RedisConnection::NonClustere(conn) => conn.req_packed_commands(cmd, offset, count), RedisConnection::Sentinel(conn) => conn.req_packed_commands(cmd, offset, count), } } fn get_db(&self) -> i64 { match self { RedisConnection::Clustered(conn) => conn.get_db(), RedisConnection::NonClustere(conn) => conn.get_db(), RedisConnection::Sentinel(conn) => conn.get_db(), } } } #[cfg(test)] mod tests { use redis::AsyncCommands; use super::RedisManager; // Ensure basic set/get works -- should test sharding as well: #[tokio::test] // run with `cargo test -- --ignored redis` only when redis is up and configured #[ignore] async fn test_set_read_random_keys() { let mgr = RedisManager::new( "redis://127.0.0.1:6379/0", super::RedisVariant::NonClustered, ) .await; let mut conn = mgr.get().await.unwrap(); for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() { let key = key.to_string(); let _: () = conn.set(key.clone(), val).await.unwrap(); assert_eq!(conn.get::<_, usize>(&key).await.unwrap(), val); } } }