From 3f708c5fffed105b27965f8e844a26de6bdf9662 Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Sun, 5 Apr 2026 15:17:55 +0200 Subject: feat(cli): cache --- crates/sh-util/src/cache/cluster.rs | 56 +++++++++++ crates/sh-util/src/cache/mod.rs | 176 +++++++++++++++++++++++++++++++++++ crates/sh-util/src/cache/sentinel.rs | 66 +++++++++++++ crates/sh-util/src/lib.rs | 2 + 4 files changed, 300 insertions(+) create mode 100644 crates/sh-util/src/cache/cluster.rs create mode 100644 crates/sh-util/src/cache/mod.rs create mode 100644 crates/sh-util/src/cache/sentinel.rs create mode 100644 crates/sh-util/src/lib.rs (limited to 'crates/sh-util/src') diff --git a/crates/sh-util/src/cache/cluster.rs b/crates/sh-util/src/cache/cluster.rs new file mode 100644 index 0000000..de13629 --- /dev/null +++ b/crates/sh-util/src/cache/cluster.rs @@ -0,0 +1,56 @@ +use redis::{ + ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError, + cluster::{ClusterClient, ClusterClientBuilder}, + cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo}, +}; + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous clustered connections via `redis_cluster_async::Connection` +#[derive(Clone)] +pub struct RedisClusterConnectionManager { + client: ClusterClient, +} + +impl RedisClusterConnectionManager { + pub fn new( + info: T, + ) -> Result { + Ok(RedisClusterConnectionManager { + client: ClusterClientBuilder::new(vec![info]).retries(0).build()?, + }) + } +} + +impl bb8::ManageConnection for RedisClusterConnectionManager { + type Connection = redis::cluster_async::ClusterConnection; + type Error = RedisError; + + async fn connect(&self) -> Result { + self.client.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let pong = conn + .route_command( + redis::cmd("PING"), + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllMasters, + Some(ResponsePolicy::OneSucceeded), + )), + ) + .await + .and_then(|v| Ok(String::from_redis_value(v)?))?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err(( + ErrorKind::Server(redis::ServerErrorKind::ResponseError), + "ping request", + ) + .into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} diff --git a/crates/sh-util/src/cache/mod.rs b/crates/sh-util/src/cache/mod.rs new file mode 100644 index 0000000..67a5121 --- /dev/null +++ b/crates/sh-util/src/cache/mod.rs @@ -0,0 +1,176 @@ +mod cluster; +mod sentinel; +pub use sentinel::SentinelConfig; + +use std::{sync::Arc, time::Duration}; + +use bb8::RunError; +// use bb8_redis::RedisConnectionManager; +use futures_util::lock::Mutex; +use redis::{ + AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode, + aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo, +}; + +pub use self::cluster::RedisClusterConnectionManager; + +pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2); + +pub enum RedisVariant { + Clustered, + NonClustered, + Sentinel(sentinel::SentinelConfig), +} + +#[derive(Clone)] +pub enum RedisManager { + Clustered(redis::cluster_async::ClusterConnection), + NonClustered(redis::aio::ConnectionManager), + Sentinel(Arc>), +} + +impl RedisManager { + pub async fn new(dsn: &str, variant: RedisVariant) -> Self { + match variant { + RedisVariant::Clustered => { + let cli = redis::cluster::ClusterClient::builder(vec![dsn]) + .retries(1) + .connection_timeout(REDIS_CONN_TIMEOUT) + .build() + .expect("Error initializing redis-unpooled cluster client"); + let con = cli + .get_async_connection() + .await + .expect("Failed to get redis-cluster-unpooled connection"); + RedisManager::Clustered(con) + } + RedisVariant::NonClustered => { + let cli = + redis::Client::open(dsn).expect("Error initializing redis unpooled client"); + let con = redis::aio::ConnectionManager::new_with_config( + cli, + ConnectionManagerConfig::new() + .set_number_of_retries(1) + .set_connection_timeout(Some(REDIS_CONN_TIMEOUT)), + ) + .await + .expect("Failed to get redis-unpooled connection manager"); + RedisManager::NonClustered(con) + } + RedisVariant::Sentinel(cfg) => { + let tls_mode = if cfg.redis_tls_mode_secure { + TlsMode::Secure + } else { + TlsMode::Insecure + }; + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + + let redis_connection_info = RedisConnectionInfo::default() + .set_db(cfg.redis_db.unwrap_or(0)) + .set_protocol(protocol) + .set_username(cfg.redis_username.clone()) + .set_password(cfg.redis_password.clone()); + let sentinel = SentinelNodeConnectionInfo::default() + .set_redis_connection_info(redis_connection_info) + .set_tls_mode(tls_mode); + + let cli = redis::sentinel::SentinelClient::build( + vec![dsn], + cfg.service_name.clone(), + Some(sentinel), + redis::sentinel::SentinelServerType::Master, + ) + .expect("Failed to build sentinel client"); + + RedisManager::Sentinel(Arc::new(Mutex::new(cli))) + } + } + } + + pub async fn get(&self) -> Result> { + match self { + Self::Clustered(conn) => Ok(RedisConnection::Clustered(conn.clone())), + Self::NonClustered(conn) => Ok(RedisConnection::NonClustere(conn.clone())), + Self::Sentinel(conn) => { + let mut conn = conn.lock().await; + let con = conn + .get_async_connection_with_config( + &AsyncConnectionConfig::new() + .set_response_timeout(Some(REDIS_CONN_TIMEOUT)), + ) + .await?; + Ok(RedisConnection::Sentinel(con)) + } + } + } +} + +pub enum RedisConnection { + Clustered(redis::cluster_async::ClusterConnection), + NonClustere(redis::aio::ConnectionManager), + Sentinel(redis::aio::MultiplexedConnection), +} + +impl redis::aio::ConnectionLike for RedisConnection { + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_command(cmd), + RedisConnection::NonClustere(conn) => conn.req_packed_command(cmd), + RedisConnection::Sentinel(conn) => conn.req_packed_command(cmd), + } + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::NonClustere(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::Sentinel(conn) => conn.req_packed_commands(cmd, offset, count), + } + } + + fn get_db(&self) -> i64 { + match self { + RedisConnection::Clustered(conn) => conn.get_db(), + RedisConnection::NonClustere(conn) => conn.get_db(), + RedisConnection::Sentinel(conn) => conn.get_db(), + } + } +} + +#[cfg(test)] +mod tests { + use redis::AsyncCommands; + + use super::RedisManager; + + // Ensure basic set/get works -- should test sharding as well: + #[tokio::test] + // run with `cargo test -- --ignored redis` only when redis is up and configured + #[ignore] + async fn test_set_read_random_keys() { + let mgr = RedisManager::new( + "redis://127.0.0.1:6379/0", + super::RedisVariant::NonClustered, + ) + .await; + let mut conn = mgr.get().await.unwrap(); + + for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() { + let key = key.to_string(); + let _: () = conn.set(key.clone(), val).await.unwrap(); + assert_eq!(conn.get::<_, usize>(&key).await.unwrap(), val); + } + } +} diff --git a/crates/sh-util/src/cache/sentinel.rs b/crates/sh-util/src/cache/sentinel.rs new file mode 100644 index 0000000..e52b043 --- /dev/null +++ b/crates/sh-util/src/cache/sentinel.rs @@ -0,0 +1,66 @@ +use futures_util::lock::Mutex; +use redis::{ + ErrorKind, IntoConnectionInfo, RedisError, + sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType}, +}; +use serde::Deserialize; + +struct LockedSentinelClient(pub(crate) Mutex); + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SentinelConfig { + pub service_name: String, + pub redis_tls_mode_secure: bool, + pub redis_db: Option, + pub redis_username: String, + pub redis_password: String, + pub redis_use_resp3: bool, +} + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient` +pub struct RedisSentinelConnectionManager { + client: LockedSentinelClient, +} + +impl RedisSentinelConnectionManager { + pub fn new( + info: Vec, + service_name: String, + node_connection_info: Option, + ) -> Result { + Ok(RedisSentinelConnectionManager { + client: LockedSentinelClient(Mutex::new(SentinelClient::build( + info, + service_name, + node_connection_info, + SentinelServerType::Master, + )?)), + }) + } +} + +impl bb8::ManageConnection for RedisSentinelConnectionManager { + type Connection = redis::aio::MultiplexedConnection; + type Error = RedisError; + + async fn connect(&self) -> Result { + self.client.0.lock().await.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let pong: String = redis::cmd("PING").query_async(conn).await?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err(( + ErrorKind::Server(redis::ServerErrorKind::ResponseError), + "ping request", + ) + .into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} diff --git a/crates/sh-util/src/lib.rs b/crates/sh-util/src/lib.rs new file mode 100644 index 0000000..5501a81 --- /dev/null +++ b/crates/sh-util/src/lib.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "cache")] +pub mod cache; -- cgit v1.2.3