From 3f708c5fffed105b27965f8e844a26de6bdf9662 Mon Sep 17 00:00:00 2001 From: rtkay123 Date: Sun, 5 Apr 2026 15:17:55 +0200 Subject: feat(cli): cache --- crates/sh-util/src/cache/mod.rs | 176 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 crates/sh-util/src/cache/mod.rs (limited to 'crates/sh-util/src/cache/mod.rs') diff --git a/crates/sh-util/src/cache/mod.rs b/crates/sh-util/src/cache/mod.rs new file mode 100644 index 0000000..67a5121 --- /dev/null +++ b/crates/sh-util/src/cache/mod.rs @@ -0,0 +1,176 @@ +mod cluster; +mod sentinel; +pub use sentinel::SentinelConfig; + +use std::{sync::Arc, time::Duration}; + +use bb8::RunError; +// use bb8_redis::RedisConnectionManager; +use futures_util::lock::Mutex; +use redis::{ + AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode, + aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo, +}; + +pub use self::cluster::RedisClusterConnectionManager; + +pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2); + +pub enum RedisVariant { + Clustered, + NonClustered, + Sentinel(sentinel::SentinelConfig), +} + +#[derive(Clone)] +pub enum RedisManager { + Clustered(redis::cluster_async::ClusterConnection), + NonClustered(redis::aio::ConnectionManager), + Sentinel(Arc>), +} + +impl RedisManager { + pub async fn new(dsn: &str, variant: RedisVariant) -> Self { + match variant { + RedisVariant::Clustered => { + let cli = redis::cluster::ClusterClient::builder(vec![dsn]) + .retries(1) + .connection_timeout(REDIS_CONN_TIMEOUT) + .build() + .expect("Error initializing redis-unpooled cluster client"); + let con = cli + .get_async_connection() + .await + .expect("Failed to get redis-cluster-unpooled connection"); + RedisManager::Clustered(con) + } + RedisVariant::NonClustered => { + let cli = + redis::Client::open(dsn).expect("Error initializing redis unpooled client"); + let con = redis::aio::ConnectionManager::new_with_config( + cli, + ConnectionManagerConfig::new() + .set_number_of_retries(1) + .set_connection_timeout(Some(REDIS_CONN_TIMEOUT)), + ) + .await + .expect("Failed to get redis-unpooled connection manager"); + RedisManager::NonClustered(con) + } + RedisVariant::Sentinel(cfg) => { + let tls_mode = if cfg.redis_tls_mode_secure { + TlsMode::Secure + } else { + TlsMode::Insecure + }; + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + + let redis_connection_info = RedisConnectionInfo::default() + .set_db(cfg.redis_db.unwrap_or(0)) + .set_protocol(protocol) + .set_username(cfg.redis_username.clone()) + .set_password(cfg.redis_password.clone()); + let sentinel = SentinelNodeConnectionInfo::default() + .set_redis_connection_info(redis_connection_info) + .set_tls_mode(tls_mode); + + let cli = redis::sentinel::SentinelClient::build( + vec![dsn], + cfg.service_name.clone(), + Some(sentinel), + redis::sentinel::SentinelServerType::Master, + ) + .expect("Failed to build sentinel client"); + + RedisManager::Sentinel(Arc::new(Mutex::new(cli))) + } + } + } + + pub async fn get(&self) -> Result> { + match self { + Self::Clustered(conn) => Ok(RedisConnection::Clustered(conn.clone())), + Self::NonClustered(conn) => Ok(RedisConnection::NonClustere(conn.clone())), + Self::Sentinel(conn) => { + let mut conn = conn.lock().await; + let con = conn + .get_async_connection_with_config( + &AsyncConnectionConfig::new() + .set_response_timeout(Some(REDIS_CONN_TIMEOUT)), + ) + .await?; + Ok(RedisConnection::Sentinel(con)) + } + } + } +} + +pub enum RedisConnection { + Clustered(redis::cluster_async::ClusterConnection), + NonClustere(redis::aio::ConnectionManager), + Sentinel(redis::aio::MultiplexedConnection), +} + +impl redis::aio::ConnectionLike for RedisConnection { + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_command(cmd), + RedisConnection::NonClustere(conn) => conn.req_packed_command(cmd), + RedisConnection::Sentinel(conn) => conn.req_packed_command(cmd), + } + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::NonClustere(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::Sentinel(conn) => conn.req_packed_commands(cmd, offset, count), + } + } + + fn get_db(&self) -> i64 { + match self { + RedisConnection::Clustered(conn) => conn.get_db(), + RedisConnection::NonClustere(conn) => conn.get_db(), + RedisConnection::Sentinel(conn) => conn.get_db(), + } + } +} + +#[cfg(test)] +mod tests { + use redis::AsyncCommands; + + use super::RedisManager; + + // Ensure basic set/get works -- should test sharding as well: + #[tokio::test] + // run with `cargo test -- --ignored redis` only when redis is up and configured + #[ignore] + async fn test_set_read_random_keys() { + let mgr = RedisManager::new( + "redis://127.0.0.1:6379/0", + super::RedisVariant::NonClustered, + ) + .await; + let mut conn = mgr.get().await.unwrap(); + + for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() { + let key = key.to_string(); + let _: () = conn.set(key.clone(), val).await.unwrap(); + assert_eq!(conn.get::<_, usize>(&key).await.unwrap(), val); + } + } +} -- cgit v1.2.3