aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2026-02-09 13:25:17 +0200
committerrtkay123 <dev@kanjala.com>2026-02-09 13:25:17 +0200
commitbac76a98bf4e90610d0a7105d2ebe6872dc147d4 (patch)
tree341a1a31bd20741f781d48e01f7ded253bf34caf /lib
parent253c5631ae09fd5ad9fd6b3eff104e6099d4676c (diff)
downloadsellershut-bac76a98bf4e90610d0a7105d2ebe6872dc147d4.tar.bz2
sellershut-bac76a98bf4e90610d0a7105d2ebe6872dc147d4.zip
feat: svc crate
Diffstat (limited to 'lib')
-rw-r--r--lib/auth-service/Cargo.toml7
-rw-r--r--lib/auth-service/src/client/mod.rs5
-rw-r--r--lib/auth-service/src/lib.rs2
-rw-r--r--lib/auth-service/src/service/mod.rs51
-rw-r--r--lib/shared-svc/Cargo.toml28
-rw-r--r--lib/shared-svc/src/cache/cluster.rs58
-rw-r--r--lib/shared-svc/src/cache/config.rs18
-rw-r--r--lib/shared-svc/src/cache/mod.rs250
-rw-r--r--lib/shared-svc/src/cache/sentinel.rs67
-rw-r--r--lib/shared-svc/src/lib.rs16
10 files changed, 499 insertions, 3 deletions
diff --git a/lib/auth-service/Cargo.toml b/lib/auth-service/Cargo.toml
index 8efdc57..e972312 100644
--- a/lib/auth-service/Cargo.toml
+++ b/lib/auth-service/Cargo.toml
@@ -7,8 +7,13 @@ readme.workspace = true
documentation.workspace = true
[dependencies]
-secrecy = "0.10.3"
+async-session = "3.0.0"
+async-trait.workspace = true
oauth2 = "5.0.0"
+secrecy = "0.10.3"
+shared-svc = { workspace = true, features = ["cache"] }
+sqlx.workspace = true
thiserror.workspace = true
+time.workspace = true
tracing.workspace = true
url = { workspace = true, features = ["serde"] }
diff --git a/lib/auth-service/src/client/mod.rs b/lib/auth-service/src/client/mod.rs
index af581b0..45e7e4d 100644
--- a/lib/auth-service/src/client/mod.rs
+++ b/lib/auth-service/src/client/mod.rs
@@ -5,6 +5,7 @@ use url::Url;
use crate::AuthServiceError;
+#[derive(Debug, Clone)]
pub struct OauthClient(
oauth2::basic::BasicClient<
EndpointSet,
@@ -38,10 +39,10 @@ impl ClientConfig {
}
}
-impl TryFrom<ClientConfig> for OauthClient {
+impl TryFrom<&ClientConfig> for OauthClient {
type Error = AuthServiceError;
- fn try_from(value: ClientConfig) -> Result<Self, Self::Error> {
+ fn try_from(value: &ClientConfig) -> Result<Self, Self::Error> {
debug!("creating oauth client");
Ok(Self(
oauth2::basic::BasicClient::new(ClientId::new(value.client_id.to_string()))
diff --git a/lib/auth-service/src/lib.rs b/lib/auth-service/src/lib.rs
index f7b9e80..308ce0f 100644
--- a/lib/auth-service/src/lib.rs
+++ b/lib/auth-service/src/lib.rs
@@ -1,4 +1,6 @@
pub mod client;
+mod service;
+pub use service::*;
use thiserror::Error;
diff --git a/lib/auth-service/src/service/mod.rs b/lib/auth-service/src/service/mod.rs
new file mode 100644
index 0000000..3d45523
--- /dev/null
+++ b/lib/auth-service/src/service/mod.rs
@@ -0,0 +1,51 @@
+use async_session::{Result, Session, SessionStore};
+use async_trait::async_trait;
+use shared_svc::cache::RedisManager;
+use tracing::instrument;
+
+#[derive(Debug, Clone)]
+pub struct AuthService {
+ cache: RedisManager,
+}
+
+impl AuthService {
+ pub fn new(cache: &RedisManager) -> Self {
+ Self {
+ cache: cache.clone(),
+ }
+ }
+}
+
+#[async_trait]
+impl SessionStore for AuthService {
+ #[doc = " Get a session from the storage backend."]
+ #[doc = ""]
+ #[doc = " The input is expected to be the value of an identifying"]
+ #[doc = " cookie. This will then be parsed by the session middleware"]
+ #[doc = " into a session if possible"]
+ #[instrument(skip(self))]
+ async fn load_session(&self, cookie_value: String) -> Result<Option<Session>> {
+ todo!()
+ }
+
+ #[doc = " Store a session on the storage backend."]
+ #[doc = ""]
+ #[doc = " The return value is the value of the cookie to store for the"]
+ #[doc = " user that represents this session"]
+ #[instrument(skip(self))]
+ async fn store_session(&self, session: Session) -> Result<Option<String>> {
+ todo!()
+ }
+
+ #[doc = " Remove a session from the session store"]
+ #[instrument(skip(self))]
+ async fn destroy_session(&self, session: Session) -> Result {
+ todo!()
+ }
+
+ #[doc = " Empties the entire store, destroying all sessions"]
+ #[instrument(skip(self))]
+ async fn clear_store(&self) -> Result {
+ todo!()
+ }
+}
diff --git a/lib/shared-svc/Cargo.toml b/lib/shared-svc/Cargo.toml
new file mode 100644
index 0000000..d95d3ef
--- /dev/null
+++ b/lib/shared-svc/Cargo.toml
@@ -0,0 +1,28 @@
+[package]
+name = "shared-svc"
+version = "0.1.0"
+edition = "2024"
+license.workspace = true
+readme.workspace = true
+documentation.workspace = true
+
+[dependencies]
+bb8-redis = { version = "0.26.0", optional = true }
+log = "0.4.29"
+redis = { version = "1.0.3", optional = true }
+secrecy.workspace = true
+thiserror.workspace = true
+tokio = { workspace = true, optional = true }
+tracing.workspace = true
+url.workspace = true
+
+[features]
+default = []
+cache = [
+ "bb8-redis",
+ "redis/cluster-async",
+ "redis/connection-manager",
+ "redis/sentinel",
+ "redis/tokio-comp",
+ "tokio/sync"
+]
diff --git a/lib/shared-svc/src/cache/cluster.rs b/lib/shared-svc/src/cache/cluster.rs
new file mode 100644
index 0000000..ea71954
--- /dev/null
+++ b/lib/shared-svc/src/cache/cluster.rs
@@ -0,0 +1,58 @@
+use bb8_redis::bb8;
+use redis::{
+ ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError,
+ cluster::{ClusterClient, ClusterClientBuilder},
+ cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo},
+};
+
+/// ConnectionManager that implements `bb8::ManageConnection` and supports
+/// asynchronous clustered connections via `redis_cluster_async::Connection`
+#[derive(Clone)]
+pub struct RedisClusterConnectionManager {
+ client: ClusterClient,
+}
+
+impl RedisClusterConnectionManager {
+ pub fn new<T: IntoConnectionInfo>(
+ info: T,
+ ) -> Result<RedisClusterConnectionManager, RedisError> {
+ Ok(RedisClusterConnectionManager {
+ client: ClusterClientBuilder::new(vec![info]).retries(0).build()?,
+ })
+ }
+}
+
+impl bb8::ManageConnection for RedisClusterConnectionManager {
+ type Connection = redis::cluster_async::ClusterConnection;
+ type Error = RedisError;
+
+ async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ self.client.get_async_connection().await
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
+ let cmd = redis::cmd("PING");
+ let pong = conn
+ .route_command(
+ cmd,
+ RoutingInfo::MultiNode((
+ MultipleNodeRoutingInfo::AllMasters,
+ Some(ResponsePolicy::OneSucceeded),
+ )),
+ )
+ .await
+ .and_then(|v| Ok(String::from_redis_value(v)?))?;
+ match pong.as_str() {
+ "PONG" => Ok(()),
+ _ => Err((
+ ErrorKind::Server(redis::ServerErrorKind::ResponseError),
+ "ping request",
+ )
+ .into()),
+ }
+ }
+
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
diff --git a/lib/shared-svc/src/cache/config.rs b/lib/shared-svc/src/cache/config.rs
new file mode 100644
index 0000000..79a80d7
--- /dev/null
+++ b/lib/shared-svc/src/cache/config.rs
@@ -0,0 +1,18 @@
+use url::Url;
+
+use crate::cache::sentinel::SentinelConfig;
+
+#[derive(Debug, Clone)]
+pub struct CacheConfig {
+ pub redis_dsn: Url,
+ pub pooled: bool,
+ pub kind: RedisVariant,
+ pub max_connections: u16,
+}
+
+#[derive(Debug, Clone)]
+pub enum RedisVariant {
+ Clustered,
+ NonClustered,
+ Sentinel(SentinelConfig),
+}
diff --git a/lib/shared-svc/src/cache/mod.rs b/lib/shared-svc/src/cache/mod.rs
new file mode 100644
index 0000000..cd15463
--- /dev/null
+++ b/lib/shared-svc/src/cache/mod.rs
@@ -0,0 +1,250 @@
+mod cluster;
+mod config;
+mod sentinel;
+pub use sentinel::SentinelConfig;
+
+pub use config::*;
+
+use redis::{
+ AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode,
+ aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo,
+};
+use secrecy::ExposeSecret;
+use std::{fmt::Debug, sync::Arc};
+use tracing::debug;
+
+use bb8_redis::{
+ RedisConnectionManager,
+ bb8::{self, Pool, RunError},
+};
+use tokio::sync::Mutex;
+
+use crate::{
+ ServiceError,
+ cache::{cluster::RedisClusterConnectionManager, sentinel::RedisSentinelConnectionManager},
+};
+
+const REDIS_CONN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
+
+#[derive(Clone)]
+pub enum RedisManager {
+ Clustered(Pool<RedisClusterConnectionManager>),
+ NonClustered(Pool<RedisConnectionManager>),
+ Sentinel(Pool<RedisSentinelConnectionManager>),
+ ClusteredUnpooled(redis::cluster_async::ClusterConnection),
+ NonClusteredUnpooled(redis::aio::ConnectionManager),
+ SentinelUnpooled(Arc<Mutex<redis::sentinel::SentinelClient>>),
+}
+
+impl Debug for RedisManager {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::Clustered(arg0) => f.debug_tuple("Clustered").field(arg0).finish(),
+ Self::NonClustered(arg0) => f.debug_tuple("NonClustered").field(arg0).finish(),
+ Self::Sentinel(arg0) => f.debug_tuple("Sentinel").field(arg0).finish(),
+ Self::ClusteredUnpooled(_arg0) => f.debug_tuple("ClusteredUnpooled").finish(),
+ Self::NonClusteredUnpooled(arg0) => {
+ f.debug_tuple("NonClusteredUnpooled").field(arg0).finish()
+ }
+ Self::SentinelUnpooled(_arg0) => f.debug_tuple("SentinelUnpooled").finish(),
+ }
+ }
+}
+
+pub enum RedisConnection<'a> {
+ Clustered(bb8::PooledConnection<'a, RedisClusterConnectionManager>),
+ NonClustered(bb8::PooledConnection<'a, RedisConnectionManager>),
+ SentinelPooled(bb8::PooledConnection<'a, RedisSentinelConnectionManager>),
+ ClusteredUnpooled(redis::cluster_async::ClusterConnection),
+ NonClusteredUnpooled(redis::aio::ConnectionManager),
+ SentinelUnpooled(redis::aio::MultiplexedConnection),
+}
+
+impl redis::aio::ConnectionLike for RedisConnection<'_> {
+ fn req_packed_command<'a>(
+ &'a mut self,
+ cmd: &'a redis::Cmd,
+ ) -> redis::RedisFuture<'a, redis::Value> {
+ match self {
+ RedisConnection::Clustered(conn) => conn.req_packed_command(cmd),
+ RedisConnection::NonClustered(conn) => conn.req_packed_command(cmd),
+ RedisConnection::ClusteredUnpooled(conn) => conn.req_packed_command(cmd),
+ RedisConnection::NonClusteredUnpooled(conn) => conn.req_packed_command(cmd),
+ RedisConnection::SentinelPooled(conn) => conn.req_packed_command(cmd),
+ RedisConnection::SentinelUnpooled(conn) => conn.req_packed_command(cmd),
+ }
+ }
+
+ fn req_packed_commands<'a>(
+ &'a mut self,
+ cmd: &'a redis::Pipeline,
+ offset: usize,
+ count: usize,
+ ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
+ match self {
+ RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::NonClustered(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::ClusteredUnpooled(conn) => {
+ conn.req_packed_commands(cmd, offset, count)
+ }
+ RedisConnection::NonClusteredUnpooled(conn) => {
+ conn.req_packed_commands(cmd, offset, count)
+ }
+ RedisConnection::SentinelPooled(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::SentinelUnpooled(conn) => conn.req_packed_commands(cmd, offset, count),
+ }
+ }
+
+ fn get_db(&self) -> i64 {
+ match self {
+ RedisConnection::Clustered(conn) => conn.get_db(),
+ RedisConnection::NonClustered(conn) => conn.get_db(),
+ RedisConnection::ClusteredUnpooled(conn) => conn.get_db(),
+ RedisConnection::NonClusteredUnpooled(conn) => conn.get_db(),
+ RedisConnection::SentinelPooled(conn) => conn.get_db(),
+ RedisConnection::SentinelUnpooled(conn) => conn.get_db(),
+ }
+ }
+}
+
+impl RedisManager {
+ pub async fn new(config: &CacheConfig) -> Result<Self, ServiceError> {
+ let u = config.redis_dsn.host_str();
+ debug!(url = u, "connecting to cache");
+ if config.pooled {
+ Self::new_pooled(
+ config.redis_dsn.as_ref(),
+ &config.kind,
+ config.max_connections,
+ )
+ .await
+ } else {
+ Self::new_unpooled(config.redis_dsn.as_ref(), &config.kind).await
+ }
+ }
+ async fn new_pooled(
+ dsn: &str,
+ variant: &RedisVariant,
+ max_conns: u16,
+ ) -> Result<Self, ServiceError> {
+ debug!(variant = ?variant, "creating pooled cache connection");
+ match variant {
+ RedisVariant::Clustered => {
+ let mgr = RedisClusterConnectionManager::new(dsn)?;
+ let pool = bb8::Pool::builder()
+ .max_size(max_conns.into())
+ .build(mgr)
+ .await?;
+ Ok(RedisManager::Clustered(pool))
+ }
+ RedisVariant::NonClustered => {
+ let mgr = RedisConnectionManager::new(dsn)?;
+ let pool = bb8::Pool::builder()
+ .max_size(max_conns.into())
+ .build(mgr)
+ .await?;
+ Ok(RedisManager::NonClustered(pool))
+ }
+ RedisVariant::Sentinel(cfg) => {
+ let mgr = RedisSentinelConnectionManager::new(
+ vec![dsn],
+ cfg.service_name.clone(),
+ Some(create_config(cfg)),
+ )?;
+ let pool = bb8::Pool::builder()
+ .max_size(max_conns.into())
+ .build(mgr)
+ .await?;
+ Ok(RedisManager::Sentinel(pool))
+ }
+ }
+ }
+
+ async fn new_unpooled(dsn: &str, variant: &RedisVariant) -> Result<Self, ServiceError> {
+ match variant {
+ RedisVariant::Clustered => {
+ let cli = redis::cluster::ClusterClient::builder(vec![dsn])
+ .retries(1)
+ .connection_timeout(REDIS_CONN_TIMEOUT)
+ .build()?;
+ let con = cli.get_async_connection().await?;
+ Ok(RedisManager::ClusteredUnpooled(con))
+ }
+ RedisVariant::NonClustered => {
+ let cli = redis::Client::open(dsn)?;
+ let con = redis::aio::ConnectionManager::new_with_config(
+ cli,
+ ConnectionManagerConfig::new()
+ .set_number_of_retries(1)
+ .set_connection_timeout(Some(REDIS_CONN_TIMEOUT)),
+ )
+ .await?;
+ Ok(RedisManager::NonClusteredUnpooled(con))
+ }
+ RedisVariant::Sentinel(cfg) => {
+ let cli = redis::sentinel::SentinelClient::build(
+ vec![dsn],
+ cfg.service_name.clone(),
+ Some(create_config(cfg)),
+ redis::sentinel::SentinelServerType::Master,
+ )?;
+
+ Ok(RedisManager::SentinelUnpooled(Arc::new(Mutex::new(cli))))
+ }
+ }
+ }
+
+ pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
+ match self {
+ Self::Clustered(pool) => Ok(RedisConnection::Clustered(pool.get().await?)),
+ Self::NonClustered(pool) => Ok(RedisConnection::NonClustered(pool.get().await?)),
+ Self::Sentinel(pool) => Ok(RedisConnection::SentinelPooled(pool.get().await?)),
+ Self::ClusteredUnpooled(conn) => Ok(RedisConnection::ClusteredUnpooled(conn.clone())),
+ Self::NonClusteredUnpooled(conn) => {
+ Ok(RedisConnection::NonClusteredUnpooled(conn.clone()))
+ }
+ Self::SentinelUnpooled(conn) => {
+ let mut conn = conn.lock().await;
+ let con = conn
+ .get_async_connection_with_config(
+ &AsyncConnectionConfig::new()
+ .set_response_timeout(Some(REDIS_CONN_TIMEOUT)),
+ )
+ .await?;
+ Ok(RedisConnection::SentinelUnpooled(con))
+ }
+ }
+ }
+}
+
+fn create_config(cfg: &SentinelConfig) -> SentinelNodeConnectionInfo {
+ let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure);
+ let protocol = if cfg.redis_use_resp3 {
+ ProtocolVersion::RESP3
+ } else {
+ ProtocolVersion::default()
+ };
+ let info = RedisConnectionInfo::default();
+ let info = if let Some(pass) = &cfg.redis_password {
+ info.set_password(pass.expose_secret())
+ } else {
+ info
+ };
+
+ let info = if let Some(user) = &cfg.redis_username {
+ info.set_username(user.clone())
+ } else {
+ info
+ }
+ .set_protocol(protocol)
+ .set_db(cfg.redis_db.unwrap_or(0));
+
+ let sent_info = SentinelNodeConnectionInfo::default();
+
+ if let Some(tls) = tls_mode {
+ sent_info.set_tls_mode(tls)
+ } else {
+ sent_info
+ }
+ .set_redis_connection_info(info)
+}
diff --git a/lib/shared-svc/src/cache/sentinel.rs b/lib/shared-svc/src/cache/sentinel.rs
new file mode 100644
index 0000000..9e76423
--- /dev/null
+++ b/lib/shared-svc/src/cache/sentinel.rs
@@ -0,0 +1,67 @@
+use bb8_redis::bb8;
+use redis::{
+ ErrorKind, IntoConnectionInfo, RedisError,
+ sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType},
+};
+use secrecy::SecretString;
+use tokio::sync::Mutex;
+
+struct LockedSentinelClient(pub(crate) Mutex<SentinelClient>);
+
+/// ConnectionManager that implements `bb8::ManageConnection` and supports
+/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient`
+pub struct RedisSentinelConnectionManager {
+ client: LockedSentinelClient,
+}
+
+impl RedisSentinelConnectionManager {
+ pub fn new<T: IntoConnectionInfo>(
+ info: Vec<T>,
+ service_name: String,
+ node_connection_info: Option<SentinelNodeConnectionInfo>,
+ ) -> Result<RedisSentinelConnectionManager, RedisError> {
+ Ok(RedisSentinelConnectionManager {
+ client: LockedSentinelClient(Mutex::new(SentinelClient::build(
+ info,
+ service_name,
+ node_connection_info,
+ SentinelServerType::Master,
+ )?)),
+ })
+ }
+}
+
+impl bb8::ManageConnection for RedisSentinelConnectionManager {
+ type Connection = redis::aio::MultiplexedConnection;
+ type Error = RedisError;
+
+ async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ self.client.0.lock().await.get_async_connection().await
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
+ let pong: String = redis::cmd("PING").query_async(conn).await?;
+ match pong.as_str() {
+ "PONG" => Ok(()),
+ _ => Err((
+ ErrorKind::Server(redis::ServerErrorKind::ResponseError),
+ "ping request",
+ )
+ .into()),
+ }
+ }
+
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
+
+#[derive(Clone, Debug)]
+pub struct SentinelConfig {
+ pub service_name: String,
+ pub redis_tls_mode_secure: bool,
+ pub redis_db: Option<i64>,
+ pub redis_username: Option<String>,
+ pub redis_password: Option<SecretString>,
+ pub redis_use_resp3: bool,
+}
diff --git a/lib/shared-svc/src/lib.rs b/lib/shared-svc/src/lib.rs
new file mode 100644
index 0000000..92d9c32
--- /dev/null
+++ b/lib/shared-svc/src/lib.rs
@@ -0,0 +1,16 @@
+#[cfg(feature = "cache")]
+pub mod cache;
+
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+pub enum ServiceError {
+ #[error("data store disconnected")]
+ Cache(#[from] redis::RedisError),
+ #[error("the data for key `{0}` is not available")]
+ Redaction(String),
+ #[error("invalid header (expected {expected:?}, found {found:?})")]
+ InvalidHeader { expected: String, found: String },
+ #[error("unknown data store error")]
+ Unknown,
+}