diff options
| author | rtkay123 <dev@kanjala.com> | 2026-02-09 13:25:17 +0200 |
|---|---|---|
| committer | rtkay123 <dev@kanjala.com> | 2026-02-09 13:25:17 +0200 |
| commit | bac76a98bf4e90610d0a7105d2ebe6872dc147d4 (patch) | |
| tree | 341a1a31bd20741f781d48e01f7ded253bf34caf /lib | |
| parent | 253c5631ae09fd5ad9fd6b3eff104e6099d4676c (diff) | |
| download | sellershut-bac76a98bf4e90610d0a7105d2ebe6872dc147d4.tar.bz2 sellershut-bac76a98bf4e90610d0a7105d2ebe6872dc147d4.zip | |
feat: svc crate
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/auth-service/Cargo.toml | 7 | ||||
| -rw-r--r-- | lib/auth-service/src/client/mod.rs | 5 | ||||
| -rw-r--r-- | lib/auth-service/src/lib.rs | 2 | ||||
| -rw-r--r-- | lib/auth-service/src/service/mod.rs | 51 | ||||
| -rw-r--r-- | lib/shared-svc/Cargo.toml | 28 | ||||
| -rw-r--r-- | lib/shared-svc/src/cache/cluster.rs | 58 | ||||
| -rw-r--r-- | lib/shared-svc/src/cache/config.rs | 18 | ||||
| -rw-r--r-- | lib/shared-svc/src/cache/mod.rs | 250 | ||||
| -rw-r--r-- | lib/shared-svc/src/cache/sentinel.rs | 67 | ||||
| -rw-r--r-- | lib/shared-svc/src/lib.rs | 16 |
10 files changed, 499 insertions, 3 deletions
diff --git a/lib/auth-service/Cargo.toml b/lib/auth-service/Cargo.toml index 8efdc57..e972312 100644 --- a/lib/auth-service/Cargo.toml +++ b/lib/auth-service/Cargo.toml @@ -7,8 +7,13 @@ readme.workspace = true documentation.workspace = true [dependencies] -secrecy = "0.10.3" +async-session = "3.0.0" +async-trait.workspace = true oauth2 = "5.0.0" +secrecy = "0.10.3" +shared-svc = { workspace = true, features = ["cache"] } +sqlx.workspace = true thiserror.workspace = true +time.workspace = true tracing.workspace = true url = { workspace = true, features = ["serde"] } diff --git a/lib/auth-service/src/client/mod.rs b/lib/auth-service/src/client/mod.rs index af581b0..45e7e4d 100644 --- a/lib/auth-service/src/client/mod.rs +++ b/lib/auth-service/src/client/mod.rs @@ -5,6 +5,7 @@ use url::Url; use crate::AuthServiceError; +#[derive(Debug, Clone)] pub struct OauthClient( oauth2::basic::BasicClient< EndpointSet, @@ -38,10 +39,10 @@ impl ClientConfig { } } -impl TryFrom<ClientConfig> for OauthClient { +impl TryFrom<&ClientConfig> for OauthClient { type Error = AuthServiceError; - fn try_from(value: ClientConfig) -> Result<Self, Self::Error> { + fn try_from(value: &ClientConfig) -> Result<Self, Self::Error> { debug!("creating oauth client"); Ok(Self( oauth2::basic::BasicClient::new(ClientId::new(value.client_id.to_string())) diff --git a/lib/auth-service/src/lib.rs b/lib/auth-service/src/lib.rs index f7b9e80..308ce0f 100644 --- a/lib/auth-service/src/lib.rs +++ b/lib/auth-service/src/lib.rs @@ -1,4 +1,6 @@ pub mod client; +mod service; +pub use service::*; use thiserror::Error; diff --git a/lib/auth-service/src/service/mod.rs b/lib/auth-service/src/service/mod.rs new file mode 100644 index 0000000..3d45523 --- /dev/null +++ b/lib/auth-service/src/service/mod.rs @@ -0,0 +1,51 @@ +use async_session::{Result, Session, SessionStore}; +use async_trait::async_trait; +use shared_svc::cache::RedisManager; +use tracing::instrument; + +#[derive(Debug, Clone)] +pub struct AuthService { + cache: RedisManager, +} + +impl AuthService { + pub fn new(cache: &RedisManager) -> Self { + Self { + cache: cache.clone(), + } + } +} + +#[async_trait] +impl SessionStore for AuthService { + #[doc = " Get a session from the storage backend."] + #[doc = ""] + #[doc = " The input is expected to be the value of an identifying"] + #[doc = " cookie. This will then be parsed by the session middleware"] + #[doc = " into a session if possible"] + #[instrument(skip(self))] + async fn load_session(&self, cookie_value: String) -> Result<Option<Session>> { + todo!() + } + + #[doc = " Store a session on the storage backend."] + #[doc = ""] + #[doc = " The return value is the value of the cookie to store for the"] + #[doc = " user that represents this session"] + #[instrument(skip(self))] + async fn store_session(&self, session: Session) -> Result<Option<String>> { + todo!() + } + + #[doc = " Remove a session from the session store"] + #[instrument(skip(self))] + async fn destroy_session(&self, session: Session) -> Result { + todo!() + } + + #[doc = " Empties the entire store, destroying all sessions"] + #[instrument(skip(self))] + async fn clear_store(&self) -> Result { + todo!() + } +} diff --git a/lib/shared-svc/Cargo.toml b/lib/shared-svc/Cargo.toml new file mode 100644 index 0000000..d95d3ef --- /dev/null +++ b/lib/shared-svc/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "shared-svc" +version = "0.1.0" +edition = "2024" +license.workspace = true +readme.workspace = true +documentation.workspace = true + +[dependencies] +bb8-redis = { version = "0.26.0", optional = true } +log = "0.4.29" +redis = { version = "1.0.3", optional = true } +secrecy.workspace = true +thiserror.workspace = true +tokio = { workspace = true, optional = true } +tracing.workspace = true +url.workspace = true + +[features] +default = [] +cache = [ + "bb8-redis", + "redis/cluster-async", + "redis/connection-manager", + "redis/sentinel", + "redis/tokio-comp", + "tokio/sync" +] diff --git a/lib/shared-svc/src/cache/cluster.rs b/lib/shared-svc/src/cache/cluster.rs new file mode 100644 index 0000000..ea71954 --- /dev/null +++ b/lib/shared-svc/src/cache/cluster.rs @@ -0,0 +1,58 @@ +use bb8_redis::bb8; +use redis::{ + ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError, + cluster::{ClusterClient, ClusterClientBuilder}, + cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo}, +}; + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous clustered connections via `redis_cluster_async::Connection` +#[derive(Clone)] +pub struct RedisClusterConnectionManager { + client: ClusterClient, +} + +impl RedisClusterConnectionManager { + pub fn new<T: IntoConnectionInfo>( + info: T, + ) -> Result<RedisClusterConnectionManager, RedisError> { + Ok(RedisClusterConnectionManager { + client: ClusterClientBuilder::new(vec![info]).retries(0).build()?, + }) + } +} + +impl bb8::ManageConnection for RedisClusterConnectionManager { + type Connection = redis::cluster_async::ClusterConnection; + type Error = RedisError; + + async fn connect(&self) -> Result<Self::Connection, Self::Error> { + self.client.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let cmd = redis::cmd("PING"); + let pong = conn + .route_command( + cmd, + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllMasters, + Some(ResponsePolicy::OneSucceeded), + )), + ) + .await + .and_then(|v| Ok(String::from_redis_value(v)?))?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err(( + ErrorKind::Server(redis::ServerErrorKind::ResponseError), + "ping request", + ) + .into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} diff --git a/lib/shared-svc/src/cache/config.rs b/lib/shared-svc/src/cache/config.rs new file mode 100644 index 0000000..79a80d7 --- /dev/null +++ b/lib/shared-svc/src/cache/config.rs @@ -0,0 +1,18 @@ +use url::Url; + +use crate::cache::sentinel::SentinelConfig; + +#[derive(Debug, Clone)] +pub struct CacheConfig { + pub redis_dsn: Url, + pub pooled: bool, + pub kind: RedisVariant, + pub max_connections: u16, +} + +#[derive(Debug, Clone)] +pub enum RedisVariant { + Clustered, + NonClustered, + Sentinel(SentinelConfig), +} diff --git a/lib/shared-svc/src/cache/mod.rs b/lib/shared-svc/src/cache/mod.rs new file mode 100644 index 0000000..cd15463 --- /dev/null +++ b/lib/shared-svc/src/cache/mod.rs @@ -0,0 +1,250 @@ +mod cluster; +mod config; +mod sentinel; +pub use sentinel::SentinelConfig; + +pub use config::*; + +use redis::{ + AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode, + aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo, +}; +use secrecy::ExposeSecret; +use std::{fmt::Debug, sync::Arc}; +use tracing::debug; + +use bb8_redis::{ + RedisConnectionManager, + bb8::{self, Pool, RunError}, +}; +use tokio::sync::Mutex; + +use crate::{ + ServiceError, + cache::{cluster::RedisClusterConnectionManager, sentinel::RedisSentinelConnectionManager}, +}; + +const REDIS_CONN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2); + +#[derive(Clone)] +pub enum RedisManager { + Clustered(Pool<RedisClusterConnectionManager>), + NonClustered(Pool<RedisConnectionManager>), + Sentinel(Pool<RedisSentinelConnectionManager>), + ClusteredUnpooled(redis::cluster_async::ClusterConnection), + NonClusteredUnpooled(redis::aio::ConnectionManager), + SentinelUnpooled(Arc<Mutex<redis::sentinel::SentinelClient>>), +} + +impl Debug for RedisManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Clustered(arg0) => f.debug_tuple("Clustered").field(arg0).finish(), + Self::NonClustered(arg0) => f.debug_tuple("NonClustered").field(arg0).finish(), + Self::Sentinel(arg0) => f.debug_tuple("Sentinel").field(arg0).finish(), + Self::ClusteredUnpooled(_arg0) => f.debug_tuple("ClusteredUnpooled").finish(), + Self::NonClusteredUnpooled(arg0) => { + f.debug_tuple("NonClusteredUnpooled").field(arg0).finish() + } + Self::SentinelUnpooled(_arg0) => f.debug_tuple("SentinelUnpooled").finish(), + } + } +} + +pub enum RedisConnection<'a> { + Clustered(bb8::PooledConnection<'a, RedisClusterConnectionManager>), + NonClustered(bb8::PooledConnection<'a, RedisConnectionManager>), + SentinelPooled(bb8::PooledConnection<'a, RedisSentinelConnectionManager>), + ClusteredUnpooled(redis::cluster_async::ClusterConnection), + NonClusteredUnpooled(redis::aio::ConnectionManager), + SentinelUnpooled(redis::aio::MultiplexedConnection), +} + +impl redis::aio::ConnectionLike for RedisConnection<'_> { + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_command(cmd), + RedisConnection::NonClustered(conn) => conn.req_packed_command(cmd), + RedisConnection::ClusteredUnpooled(conn) => conn.req_packed_command(cmd), + RedisConnection::NonClusteredUnpooled(conn) => conn.req_packed_command(cmd), + RedisConnection::SentinelPooled(conn) => conn.req_packed_command(cmd), + RedisConnection::SentinelUnpooled(conn) => conn.req_packed_command(cmd), + } + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec<redis::Value>> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::NonClustered(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::ClusteredUnpooled(conn) => { + conn.req_packed_commands(cmd, offset, count) + } + RedisConnection::NonClusteredUnpooled(conn) => { + conn.req_packed_commands(cmd, offset, count) + } + RedisConnection::SentinelPooled(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::SentinelUnpooled(conn) => conn.req_packed_commands(cmd, offset, count), + } + } + + fn get_db(&self) -> i64 { + match self { + RedisConnection::Clustered(conn) => conn.get_db(), + RedisConnection::NonClustered(conn) => conn.get_db(), + RedisConnection::ClusteredUnpooled(conn) => conn.get_db(), + RedisConnection::NonClusteredUnpooled(conn) => conn.get_db(), + RedisConnection::SentinelPooled(conn) => conn.get_db(), + RedisConnection::SentinelUnpooled(conn) => conn.get_db(), + } + } +} + +impl RedisManager { + pub async fn new(config: &CacheConfig) -> Result<Self, ServiceError> { + let u = config.redis_dsn.host_str(); + debug!(url = u, "connecting to cache"); + if config.pooled { + Self::new_pooled( + config.redis_dsn.as_ref(), + &config.kind, + config.max_connections, + ) + .await + } else { + Self::new_unpooled(config.redis_dsn.as_ref(), &config.kind).await + } + } + async fn new_pooled( + dsn: &str, + variant: &RedisVariant, + max_conns: u16, + ) -> Result<Self, ServiceError> { + debug!(variant = ?variant, "creating pooled cache connection"); + match variant { + RedisVariant::Clustered => { + let mgr = RedisClusterConnectionManager::new(dsn)?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::Clustered(pool)) + } + RedisVariant::NonClustered => { + let mgr = RedisConnectionManager::new(dsn)?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::NonClustered(pool)) + } + RedisVariant::Sentinel(cfg) => { + let mgr = RedisSentinelConnectionManager::new( + vec![dsn], + cfg.service_name.clone(), + Some(create_config(cfg)), + )?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::Sentinel(pool)) + } + } + } + + async fn new_unpooled(dsn: &str, variant: &RedisVariant) -> Result<Self, ServiceError> { + match variant { + RedisVariant::Clustered => { + let cli = redis::cluster::ClusterClient::builder(vec![dsn]) + .retries(1) + .connection_timeout(REDIS_CONN_TIMEOUT) + .build()?; + let con = cli.get_async_connection().await?; + Ok(RedisManager::ClusteredUnpooled(con)) + } + RedisVariant::NonClustered => { + let cli = redis::Client::open(dsn)?; + let con = redis::aio::ConnectionManager::new_with_config( + cli, + ConnectionManagerConfig::new() + .set_number_of_retries(1) + .set_connection_timeout(Some(REDIS_CONN_TIMEOUT)), + ) + .await?; + Ok(RedisManager::NonClusteredUnpooled(con)) + } + RedisVariant::Sentinel(cfg) => { + let cli = redis::sentinel::SentinelClient::build( + vec![dsn], + cfg.service_name.clone(), + Some(create_config(cfg)), + redis::sentinel::SentinelServerType::Master, + )?; + + Ok(RedisManager::SentinelUnpooled(Arc::new(Mutex::new(cli)))) + } + } + } + + pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> { + match self { + Self::Clustered(pool) => Ok(RedisConnection::Clustered(pool.get().await?)), + Self::NonClustered(pool) => Ok(RedisConnection::NonClustered(pool.get().await?)), + Self::Sentinel(pool) => Ok(RedisConnection::SentinelPooled(pool.get().await?)), + Self::ClusteredUnpooled(conn) => Ok(RedisConnection::ClusteredUnpooled(conn.clone())), + Self::NonClusteredUnpooled(conn) => { + Ok(RedisConnection::NonClusteredUnpooled(conn.clone())) + } + Self::SentinelUnpooled(conn) => { + let mut conn = conn.lock().await; + let con = conn + .get_async_connection_with_config( + &AsyncConnectionConfig::new() + .set_response_timeout(Some(REDIS_CONN_TIMEOUT)), + ) + .await?; + Ok(RedisConnection::SentinelUnpooled(con)) + } + } + } +} + +fn create_config(cfg: &SentinelConfig) -> SentinelNodeConnectionInfo { + let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure); + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + let info = RedisConnectionInfo::default(); + let info = if let Some(pass) = &cfg.redis_password { + info.set_password(pass.expose_secret()) + } else { + info + }; + + let info = if let Some(user) = &cfg.redis_username { + info.set_username(user.clone()) + } else { + info + } + .set_protocol(protocol) + .set_db(cfg.redis_db.unwrap_or(0)); + + let sent_info = SentinelNodeConnectionInfo::default(); + + if let Some(tls) = tls_mode { + sent_info.set_tls_mode(tls) + } else { + sent_info + } + .set_redis_connection_info(info) +} diff --git a/lib/shared-svc/src/cache/sentinel.rs b/lib/shared-svc/src/cache/sentinel.rs new file mode 100644 index 0000000..9e76423 --- /dev/null +++ b/lib/shared-svc/src/cache/sentinel.rs @@ -0,0 +1,67 @@ +use bb8_redis::bb8; +use redis::{ + ErrorKind, IntoConnectionInfo, RedisError, + sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType}, +}; +use secrecy::SecretString; +use tokio::sync::Mutex; + +struct LockedSentinelClient(pub(crate) Mutex<SentinelClient>); + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient` +pub struct RedisSentinelConnectionManager { + client: LockedSentinelClient, +} + +impl RedisSentinelConnectionManager { + pub fn new<T: IntoConnectionInfo>( + info: Vec<T>, + service_name: String, + node_connection_info: Option<SentinelNodeConnectionInfo>, + ) -> Result<RedisSentinelConnectionManager, RedisError> { + Ok(RedisSentinelConnectionManager { + client: LockedSentinelClient(Mutex::new(SentinelClient::build( + info, + service_name, + node_connection_info, + SentinelServerType::Master, + )?)), + }) + } +} + +impl bb8::ManageConnection for RedisSentinelConnectionManager { + type Connection = redis::aio::MultiplexedConnection; + type Error = RedisError; + + async fn connect(&self) -> Result<Self::Connection, Self::Error> { + self.client.0.lock().await.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let pong: String = redis::cmd("PING").query_async(conn).await?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err(( + ErrorKind::Server(redis::ServerErrorKind::ResponseError), + "ping request", + ) + .into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} + +#[derive(Clone, Debug)] +pub struct SentinelConfig { + pub service_name: String, + pub redis_tls_mode_secure: bool, + pub redis_db: Option<i64>, + pub redis_username: Option<String>, + pub redis_password: Option<SecretString>, + pub redis_use_resp3: bool, +} diff --git a/lib/shared-svc/src/lib.rs b/lib/shared-svc/src/lib.rs new file mode 100644 index 0000000..92d9c32 --- /dev/null +++ b/lib/shared-svc/src/lib.rs @@ -0,0 +1,16 @@ +#[cfg(feature = "cache")] +pub mod cache; + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ServiceError { + #[error("data store disconnected")] + Cache(#[from] redis::RedisError), + #[error("the data for key `{0}` is not available")] + Redaction(String), + #[error("invalid header (expected {expected:?}, found {found:?})")] + InvalidHeader { expected: String, found: String }, + #[error("unknown data store error")] + Unknown, +} |
