diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-10 12:55:43 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-10 12:55:43 +0200 |
commit | bd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04 (patch) | |
tree | 50b63525480da0bee2ce713d69f02617c20bee8d /lib/warden-stack/src | |
parent | 8deeab3e11f707677609047f5577a256cf28ed63 (diff) | |
download | warden-bd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04.tar.bz2 warden-bd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04.zip |
chore: collapse stack-up
Diffstat (limited to 'lib/warden-stack/src')
-rw-r--r-- | lib/warden-stack/src/cache.rs | 292 | ||||
-rw-r--r-- | lib/warden-stack/src/cache/cluster.rs | 52 | ||||
-rw-r--r-- | lib/warden-stack/src/cache/sentinel.rs | 65 | ||||
-rw-r--r-- | lib/warden-stack/src/config.rs | 139 | ||||
-rw-r--r-- | lib/warden-stack/src/lib.rs | 95 | ||||
-rw-r--r-- | lib/warden-stack/src/nats.rs | 61 | ||||
-rw-r--r-- | lib/warden-stack/src/postgres.rs | 137 | ||||
-rw-r--r-- | lib/warden-stack/src/tracing.rs | 66 | ||||
-rw-r--r-- | lib/warden-stack/src/tracing/loki.rs | 29 | ||||
-rw-r--r-- | lib/warden-stack/src/tracing/telemetry.rs | 137 |
10 files changed, 1073 insertions, 0 deletions
diff --git a/lib/warden-stack/src/cache.rs b/lib/warden-stack/src/cache.rs new file mode 100644 index 0000000..9be3778 --- /dev/null +++ b/lib/warden-stack/src/cache.rs @@ -0,0 +1,292 @@ +// https://github.com/svix/svix-webhooks/tree/4ede01a3209658615bb8d3153965c5c3a2e1b7ff/server/svix-server/src/redis +pub mod cluster; +pub mod sentinel; + +use std::{sync::Arc, time::Duration}; + +use bb8::{Pool, RunError}; +use bb8_redis::RedisConnectionManager; +use redis::{ + AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode, + aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo, +}; +use sentinel::{RedisSentinelConnectionManager, SentinelConfig}; +use serde::Deserialize; +use tokio::sync::Mutex; + +use crate::{ + ServiceError, ServicesBuilder, + services_builder::{IsUnset, SetCache, State}, +}; + +pub use self::cluster::RedisClusterConnectionManager; + +pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2); + +impl<S: State> ServicesBuilder<S> { + pub async fn cache( + self, + config: &CacheConfig, + ) -> Result<ServicesBuilder<SetCache<S>>, crate::ServiceError> + where + S::Cache: IsUnset, + { + Ok(self.cache_internal(RedisManager::new(config).await?)) + } +} + +fn default_max_conns() -> u16 { + 100 +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "lowercase")] +pub struct CacheConfig { + #[serde(rename = "dsn")] + redis_dsn: Arc<str>, + #[serde(default)] + pooled: bool, + #[serde(rename = "type")] + kind: RedisVariant, + #[serde(default = "default_max_conns")] + #[serde(rename = "max-connections")] + max_connections: u16, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub enum RedisVariant { + Clustered, + NonClustered, + Sentinel(SentinelConfig), +} + +#[derive(Clone)] +pub enum RedisManager { + Clustered(Pool<RedisClusterConnectionManager>), + NonClustered(Pool<RedisConnectionManager>), + Sentinel(Pool<RedisSentinelConnectionManager>), + ClusteredUnpooled(redis::cluster_async::ClusterConnection), + NonClusteredUnpooled(redis::aio::ConnectionManager), + SentinelUnpooled(Arc<Mutex<redis::sentinel::SentinelClient>>), +} + +impl RedisManager { + pub async fn new(config: &CacheConfig) -> Result<Self, ServiceError> { + if config.pooled { + Self::new_pooled( + config.redis_dsn.as_ref(), + &config.kind, + config.max_connections, + ) + .await + } else { + Self::new_unpooled(config.redis_dsn.as_ref(), &config.kind).await + } + } + async fn new_pooled( + dsn: &str, + variant: &RedisVariant, + max_conns: u16, + ) -> Result<Self, ServiceError> { + match variant { + RedisVariant::Clustered => { + let mgr = RedisClusterConnectionManager::new(dsn)?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::Clustered(pool)) + } + RedisVariant::NonClustered => { + let mgr = RedisConnectionManager::new(dsn)?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::NonClustered(pool)) + } + RedisVariant::Sentinel(cfg) => { + let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure); + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + let mgr = RedisSentinelConnectionManager::new( + vec![dsn], + cfg.service_name.clone(), + Some(SentinelNodeConnectionInfo { + tls_mode, + redis_connection_info: Some(RedisConnectionInfo { + db: cfg.redis_db.unwrap_or(0), + username: cfg.redis_username.clone(), + password: cfg.redis_password.clone(), + protocol, + }), + }), + )?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::Sentinel(pool)) + } + } + } + + async fn new_unpooled(dsn: &str, variant: &RedisVariant) -> Result<Self, ServiceError> { + match variant { + RedisVariant::Clustered => { + let cli = redis::cluster::ClusterClient::builder(vec![dsn]) + .retries(1) + .connection_timeout(REDIS_CONN_TIMEOUT) + .build()?; + let con = cli.get_async_connection().await?; + Ok(RedisManager::ClusteredUnpooled(con)) + } + RedisVariant::NonClustered => { + let cli = redis::Client::open(dsn)?; + let con = redis::aio::ConnectionManager::new_with_config( + cli, + ConnectionManagerConfig::new() + .set_number_of_retries(1) + .set_connection_timeout(REDIS_CONN_TIMEOUT), + ) + .await?; + Ok(RedisManager::NonClusteredUnpooled(con)) + } + RedisVariant::Sentinel(cfg) => { + let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure); + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + let cli = redis::sentinel::SentinelClient::build( + vec![dsn], + cfg.service_name.clone(), + Some(SentinelNodeConnectionInfo { + tls_mode, + redis_connection_info: Some(RedisConnectionInfo { + db: cfg.redis_db.unwrap_or(0), + username: cfg.redis_username.clone(), + password: cfg.redis_password.clone(), + protocol, + }), + }), + redis::sentinel::SentinelServerType::Master, + )?; + + Ok(RedisManager::SentinelUnpooled(Arc::new(Mutex::new(cli)))) + } + } + } + + pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> { + match self { + Self::Clustered(pool) => Ok(RedisConnection::Clustered(pool.get().await?)), + Self::NonClustered(pool) => Ok(RedisConnection::NonClustered(pool.get().await?)), + Self::Sentinel(pool) => Ok(RedisConnection::SentinelPooled(pool.get().await?)), + Self::ClusteredUnpooled(conn) => Ok(RedisConnection::ClusteredUnpooled(conn.clone())), + Self::NonClusteredUnpooled(conn) => { + Ok(RedisConnection::NonClusteredUnpooled(conn.clone())) + } + Self::SentinelUnpooled(conn) => { + let mut conn = conn.lock().await; + let con = conn + .get_async_connection_with_config( + &AsyncConnectionConfig::new().set_response_timeout(REDIS_CONN_TIMEOUT), + ) + .await?; + Ok(RedisConnection::SentinelUnpooled(con)) + } + } + } +} + +pub enum RedisConnection<'a> { + Clustered(bb8::PooledConnection<'a, RedisClusterConnectionManager>), + NonClustered(bb8::PooledConnection<'a, RedisConnectionManager>), + SentinelPooled(bb8::PooledConnection<'a, RedisSentinelConnectionManager>), + ClusteredUnpooled(redis::cluster_async::ClusterConnection), + NonClusteredUnpooled(redis::aio::ConnectionManager), + SentinelUnpooled(redis::aio::MultiplexedConnection), +} + +impl redis::aio::ConnectionLike for RedisConnection<'_> { + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_command(cmd), + RedisConnection::NonClustered(conn) => conn.req_packed_command(cmd), + RedisConnection::ClusteredUnpooled(conn) => conn.req_packed_command(cmd), + RedisConnection::NonClusteredUnpooled(conn) => conn.req_packed_command(cmd), + RedisConnection::SentinelPooled(conn) => conn.req_packed_command(cmd), + RedisConnection::SentinelUnpooled(conn) => conn.req_packed_command(cmd), + } + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec<redis::Value>> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::NonClustered(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::ClusteredUnpooled(conn) => { + conn.req_packed_commands(cmd, offset, count) + } + RedisConnection::NonClusteredUnpooled(conn) => { + conn.req_packed_commands(cmd, offset, count) + } + RedisConnection::SentinelPooled(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::SentinelUnpooled(conn) => conn.req_packed_commands(cmd, offset, count), + } + } + + fn get_db(&self) -> i64 { + match self { + RedisConnection::Clustered(conn) => conn.get_db(), + RedisConnection::NonClustered(conn) => conn.get_db(), + RedisConnection::ClusteredUnpooled(conn) => conn.get_db(), + RedisConnection::NonClusteredUnpooled(conn) => conn.get_db(), + RedisConnection::SentinelPooled(conn) => conn.get_db(), + RedisConnection::SentinelUnpooled(conn) => conn.get_db(), + } + } +} + +#[cfg(test)] +mod tests { + use redis::AsyncCommands; + + use crate::cache::CacheConfig; + + use super::RedisManager; + + // Ensure basic set/get works -- should test sharding as well: + #[tokio::test] + // run with `cargo test -- --ignored redis` only when redis is up and configured + #[ignore] + async fn test_set_read_random_keys() { + let config = CacheConfig { + redis_dsn: "redis://localhost:6379".into(), + pooled: false, + kind: crate::cache::RedisVariant::NonClustered, + max_connections: 10, + }; + let mgr = RedisManager::new(&config).await.unwrap(); + let mut conn = mgr.get().await.unwrap(); + + for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() { + let key = key.to_string(); + let _: () = conn.set(key.clone(), val).await.unwrap(); + assert_eq!(conn.get::<_, usize>(&key).await.unwrap(), val); + } + } +} diff --git a/lib/warden-stack/src/cache/cluster.rs b/lib/warden-stack/src/cache/cluster.rs new file mode 100644 index 0000000..91e3b24 --- /dev/null +++ b/lib/warden-stack/src/cache/cluster.rs @@ -0,0 +1,52 @@ +use redis::{ + ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError, + cluster::{ClusterClient, ClusterClientBuilder}, + cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo}, +}; + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous clustered connections via `redis_cluster_async::Connection` +#[derive(Clone)] +pub struct RedisClusterConnectionManager { + client: ClusterClient, +} + +impl RedisClusterConnectionManager { + pub fn new<T: IntoConnectionInfo>( + info: T, + ) -> Result<RedisClusterConnectionManager, RedisError> { + Ok(RedisClusterConnectionManager { + client: ClusterClientBuilder::new(vec![info]).retries(0).build()?, + }) + } +} + +impl bb8::ManageConnection for RedisClusterConnectionManager { + type Connection = redis::cluster_async::ClusterConnection; + type Error = RedisError; + + async fn connect(&self) -> Result<Self::Connection, Self::Error> { + self.client.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let pong = conn + .route_command( + &redis::cmd("PING"), + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllMasters, + Some(ResponsePolicy::OneSucceeded), + )), + ) + .await + .and_then(|v| String::from_redis_value(&v))?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err((ErrorKind::ResponseError, "ping request").into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} diff --git a/lib/warden-stack/src/cache/sentinel.rs b/lib/warden-stack/src/cache/sentinel.rs new file mode 100644 index 0000000..c9f787a --- /dev/null +++ b/lib/warden-stack/src/cache/sentinel.rs @@ -0,0 +1,65 @@ +use redis::{ + ErrorKind, IntoConnectionInfo, RedisError, + sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType}, +}; +use serde::Deserialize; +use tokio::sync::Mutex; + +struct LockedSentinelClient(pub(crate) Mutex<SentinelClient>); + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient` +pub struct RedisSentinelConnectionManager { + client: LockedSentinelClient, +} + +impl RedisSentinelConnectionManager { + pub fn new<T: IntoConnectionInfo>( + info: Vec<T>, + service_name: String, + node_connection_info: Option<SentinelNodeConnectionInfo>, + ) -> Result<RedisSentinelConnectionManager, RedisError> { + Ok(RedisSentinelConnectionManager { + client: LockedSentinelClient(Mutex::new(SentinelClient::build( + info, + service_name, + node_connection_info, + SentinelServerType::Master, + )?)), + }) + } +} + +impl bb8::ManageConnection for RedisSentinelConnectionManager { + type Connection = redis::aio::MultiplexedConnection; + type Error = RedisError; + + async fn connect(&self) -> Result<Self::Connection, Self::Error> { + self.client.0.lock().await.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let pong: String = redis::cmd("PING").query_async(conn).await?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err((ErrorKind::ResponseError, "ping request").into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +pub struct SentinelConfig { + #[serde(rename = "sentinel_service_name")] + pub service_name: String, + #[serde(default)] + pub redis_tls_mode_secure: bool, + pub redis_db: Option<i64>, + pub redis_username: Option<String>, + pub redis_password: Option<String>, + #[serde(default)] + pub redis_use_resp3: bool, +} diff --git a/lib/warden-stack/src/config.rs b/lib/warden-stack/src/config.rs new file mode 100644 index 0000000..9f42f43 --- /dev/null +++ b/lib/warden-stack/src/config.rs @@ -0,0 +1,139 @@ +use serde::Deserialize; + +use std::{fmt::Display, sync::Arc}; + +#[derive(Clone, Debug, Deserialize)] +pub struct AppConfig { + #[serde(skip)] + pub name: Arc<str>, + #[serde(skip)] + pub version: Arc<str>, + #[serde(default)] + pub env: Environment, + #[cfg(feature = "api")] + #[serde(default = "default_port")] + pub port: u16, +} + +#[cfg(feature = "api")] +pub(crate) fn default_port() -> u16 { + 2210 +} + +#[cfg(feature = "tracing")] +pub(crate) fn default_log() -> String { + #[cfg(debug_assertions)] + return "debug".into(); + #[cfg(not(debug_assertions))] + "info".into() +} + +#[derive(Clone, Debug, Deserialize)] +pub struct Configuration { + pub application: AppConfig, + #[cfg(feature = "postgres")] + pub database: crate::postgres::PostgresConfig, + #[cfg(feature = "cache")] + pub cache: crate::cache::CacheConfig, + #[serde(default)] + pub misc: serde_json::Value, + #[cfg(feature = "tracing")] + pub monitoring: Monitoring, + #[cfg(any(feature = "nats-core", feature = "nats-jetstream"))] + pub nats: crate::nats::NatsConfig, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct Monitoring { + #[serde(rename = "log-level")] + #[cfg(feature = "tracing")] + #[serde(default = "default_log")] + pub log_level: String, + #[cfg(feature = "opentelemetry")] + #[serde(rename = "opentelemetry-endpoint")] + #[serde(default = "default_opentelemetry")] + pub opentelemetry_endpoint: Arc<str>, + #[cfg(feature = "tracing-loki")] + #[serde(rename = "loki-endpoint")] + #[serde(default = "default_loki")] + pub loki_endpoint: Arc<str>, +} + +#[cfg(feature = "tracing-loki")] +pub(crate) fn default_loki() -> Arc<str> { + "http://localhost:3100".into() +} + +#[cfg(feature = "opentelemetry")] +pub(crate) fn default_opentelemetry() -> Arc<str> { + "http://localhost:4317".into() +} + +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Default)] +#[cfg_attr(test, derive(serde::Serialize))] +#[serde(rename_all = "lowercase")] +pub enum Environment { + #[default] + Development, + Production, +} + +impl Display for Environment { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + Environment::Development => "development", + Environment::Production => "production", + } + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Test that the enum is correctly serialized and deserialized + #[test] + fn test_environment_serialization() { + // Test serialization for Development + let dev = Environment::Development; + let dev_json = serde_json::to_string(&dev).unwrap(); + assert_eq!(dev_json, "\"development\""); + + // Test serialization for Production + let prod = Environment::Production; + let prod_json = serde_json::to_string(&prod).unwrap(); + assert_eq!(prod_json, "\"production\""); + + // Test deserialization for Development + let dev_str = "\"development\""; + let deserialized_dev: Environment = serde_json::from_str(dev_str).unwrap(); + assert_eq!(deserialized_dev, Environment::Development); + + // Test deserialization for Production + let prod_str = "\"production\""; + let deserialized_prod: Environment = serde_json::from_str(prod_str).unwrap(); + assert_eq!(deserialized_prod, Environment::Production); + } + + // Test Display implementation + #[test] + fn test_environment_display() { + let dev = Environment::Development; + assert_eq!(format!("{}", dev), "development"); + + let prod = Environment::Production; + assert_eq!(format!("{}", prod), "production"); + } + + #[test] + #[cfg(feature = "api")] + fn test_port() { + let listen_address = + std::net::SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, default_port())); + assert_eq!(listen_address.port(), default_port()); + } +} diff --git a/lib/warden-stack/src/lib.rs b/lib/warden-stack/src/lib.rs new file mode 100644 index 0000000..efd6862 --- /dev/null +++ b/lib/warden-stack/src/lib.rs @@ -0,0 +1,95 @@ +#![cfg_attr(docsrs, feature(doc_cfg))] + +#[cfg(feature = "tracing")] +#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))] +pub mod tracing; + +#[cfg(feature = "cache")] +#[cfg_attr(docsrs, doc(cfg(feature = "cache")))] +pub mod cache; + +#[cfg(feature = "cache")] +pub use redis; + +#[cfg(feature = "postgres")] +pub use sqlx; + +#[cfg(any(feature = "nats-core", feature = "nats-jetstream"))] +pub use async_nats; + +#[cfg(feature = "opentelemetry")] +mod otel { + pub use opentelemetry; + pub use opentelemetry_http; + pub use opentelemetry_otlp; + pub use opentelemetry_sdk; + pub use opentelemetry_semantic_conventions; + pub use tracing_opentelemetry; +} + +#[cfg(feature = "opentelemetry")] +pub use otel::*; + +#[cfg(feature = "postgres")] +#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))] +pub mod postgres; + +#[cfg(any(feature = "nats-core", feature = "nats-jetstream"))] +#[cfg_attr( + docsrs, + doc(cfg(any(feature = "nats-core", feature = "nats-jetstream"))) +)] +pub mod nats; + +mod config; +pub use config::*; + +#[derive(Clone, bon::Builder)] +pub struct Services { + #[cfg(feature = "postgres")] + #[builder(setters(vis = "", name = pg_internal))] + pub postgres: Option<sqlx::PgPool>, + #[cfg(feature = "cache")] + #[builder(setters(vis = "", name = cache_internal))] + pub cache: Option<cache::RedisManager>, + #[cfg(feature = "nats-core")] + #[cfg_attr(docsrs, doc(cfg(feature = "nats-core")))] + #[builder(setters(vis = "", name = nats_internal))] + /// NATS connection handle + pub nats: Option<async_nats::Client>, + #[cfg(feature = "nats-jetstream")] + #[cfg_attr(docsrs, doc(cfg(feature = "nats-jetstream")))] + #[builder(setters(vis = "", name = jetstream_internal))] + /// NATS-Jetstream connection handle + pub jetstream: Option<async_nats::jetstream::Context>, +} + +#[derive(thiserror::Error, Debug)] +pub enum ServiceError { + #[error("service was not initialised")] + NotInitialised, + #[error("unknown data store error")] + Unknown, + #[error("invalid config `{0}`")] + Configuration(String), + #[cfg(feature = "postgres")] + #[error(transparent)] + /// Postgres error + Postgres(#[from] sqlx::Error), + #[cfg(feature = "cache")] + #[error(transparent)] + /// Redis error + Cache(#[from] redis::RedisError), + #[cfg(feature = "opentelemetry")] + #[error(transparent)] + /// When creating the tracing layer + Opentelemetry(#[from] opentelemetry_sdk::trace::TraceError), + #[cfg(any(feature = "nats-core", feature = "nats-jetstream"))] + #[error(transparent)] + /// NATS error + Nats(#[from] async_nats::error::Error<async_nats::ConnectErrorKind>), + #[cfg(feature = "tracing-loki")] + #[error(transparent)] + /// When creating the tracing layer + Loki(#[from] tracing_loki::Error), +} diff --git a/lib/warden-stack/src/nats.rs b/lib/warden-stack/src/nats.rs new file mode 100644 index 0000000..952490c --- /dev/null +++ b/lib/warden-stack/src/nats.rs @@ -0,0 +1,61 @@ +use std::sync::Arc; + +use serde::Deserialize; + +#[derive(Deserialize, Clone, Debug)] +/// Nats configuration +pub struct NatsConfig { + /// Hosts dsn + #[serde(default = "nats")] + pub hosts: Arc<[String]>, +} + +pub(crate) fn nats() -> Arc<[String]> { + let hosts = vec!["nats://localhost:4222".to_string()]; + hosts.into() +} + +impl NatsConfig { + fn hosts(&self) -> Vec<String> { + self.hosts.iter().map(ToString::to_string).collect() + } +} + +use crate::{ + ServiceError, ServicesBuilder, + services_builder::{IsUnset, State}, +}; + +#[cfg(feature = "nats-jetstream")] +impl<S: State> ServicesBuilder<S> { + /// create a Jetstream Context using the provided [NatsConfig] + pub async fn nats_jetstream( + self, + config: &NatsConfig, + ) -> Result<ServicesBuilder<crate::services_builder::SetJetstream<S>>, ServiceError> + where + S::Jetstream: IsUnset, + { + let hosts = config.hosts(); + let client = async_nats::connect(hosts).await?; + + Ok(self.jetstream_internal(async_nats::jetstream::new(client))) + } +} + +#[cfg(feature = "nats-core")] +impl<S: State> ServicesBuilder<S> { + /// create a NATS connection using the provided [NatsConfig] + pub async fn nats( + self, + config: &NatsConfig, + ) -> Result<ServicesBuilder<crate::services_builder::SetNats<S>>, ServiceError> + where + S::Nats: IsUnset, + { + let hosts = config.hosts(); + let client = async_nats::connect(hosts).await?; + + Ok(self.nats_internal(client)) + } +} diff --git a/lib/warden-stack/src/postgres.rs b/lib/warden-stack/src/postgres.rs new file mode 100644 index 0000000..3264368 --- /dev/null +++ b/lib/warden-stack/src/postgres.rs @@ -0,0 +1,137 @@ +use std::sync::Arc; + +use secrecy::{ExposeSecret, SecretString}; +use serde::Deserialize; +use url::Url; + +use crate::{ + ServicesBuilder, + services_builder::{IsUnset, SetPostgres, State}, +}; + +#[derive(Debug, Deserialize, Clone)] +pub struct PostgresConfig { + #[serde(default = "default_pool_size")] + pool_size: u32, + #[serde(default = "default_port")] + port: u32, + name: Arc<str>, + host: Arc<str>, + #[serde(default = "user")] + user: Arc<str>, + password: SecretString, +} + +fn default_pool_size() -> u32 { + 100 +} + +fn user() -> Arc<str> { + "postgres".into() +} + +fn default_port() -> u32 { + 5432 +} + +impl PostgresConfig { + // Getter for size + pub fn pool_size(&self) -> u32 { + self.pool_size + } + + // Getter for port + pub fn port(&self) -> u32 { + self.port + } + + // Getter for name + pub fn name(&self) -> &str { + self.name.as_ref() + } + + // Getter for host + pub fn host(&self) -> &str { + self.host.as_ref() + } + + // Getter for username + pub fn username(&self) -> &str { + self.user.as_ref() + } + + // Getter for password (you may want to return a reference or handle it differently) + pub fn password(&self) -> &SecretString { + &self.password + } + + pub(crate) fn connection_string(&self) -> Result<Url, crate::ServiceError> { + Url::parse(&format!( + "postgres://{}:{}@{}:{}/{}", + self.user, + self.password.expose_secret(), + self.host, + self.port, + self.name + )) + .map_err(|e| crate::ServiceError::Configuration(e.to_string())) + } +} + +impl<S: State> ServicesBuilder<S> { + pub async fn postgres( + self, + config: &PostgresConfig, + ) -> Result<ServicesBuilder<SetPostgres<S>>, crate::ServiceError> + where + S::Postgres: IsUnset, + { + let pg = sqlx::postgres::PgPoolOptions::new() + // The default connection limit for a Postgres server is 100 connections, with 3 reserved for superusers. + // + // If you're deploying your application with multiple replicas, then the total + // across all replicas should not exceed the Postgres connection limit + // (max_connections postgresql.conf). + .max_connections(config.pool_size) + .connect(config.connection_string()?.as_ref()) + .await?; + Ok(self.pg_internal(pg)) + } +} + +#[cfg(all(test, target_os = "linux"))] +mod test { + use super::*; + use crate::Services; + + #[tokio::test] + async fn docker_stack_db() { + let port = default_port(); + let name = ""; + let host = "localhost"; + let user = user(); + let pool_size = default_pool_size(); + let password = "postgres"; + + let config = PostgresConfig { + pool_size, + port, + name: name.into(), + host: host.into(), + user: user.clone(), + password: secrecy::SecretString::new(password.into()), + }; + + assert_eq!(config.name(), name); + assert_eq!(config.pool_size(), pool_size); + assert_eq!(config.username(), user.as_ref()); + assert_eq!(config.host(), host); + assert_eq!(config.port(), port); + + assert_eq!(config.password().expose_secret(), password); + + let service = Services::builder().postgres(&config).await; + + assert!(service.is_ok()); + } +} diff --git a/lib/warden-stack/src/tracing.rs b/lib/warden-stack/src/tracing.rs new file mode 100644 index 0000000..1a40f4b --- /dev/null +++ b/lib/warden-stack/src/tracing.rs @@ -0,0 +1,66 @@ +#[cfg(feature = "opentelemetry")] +pub mod telemetry; + +#[cfg(feature = "opentelemetry")] +pub use opentelemetry_sdk::trace::SdkTracerProvider; + +#[cfg(feature = "tracing-loki")] +mod loki; + +use tracing_subscriber::{ + EnvFilter, Layer, Registry, layer::SubscriberExt, util::SubscriberInitExt, +}; + +/// Telemetry handle +#[derive(bon::Builder)] +#[builder(finish_fn(vis = "", name = build_internal))] +pub struct Tracing { + #[builder(field = vec![tracing_subscriber::fmt::layer().boxed()])] + layers: Vec<Box<dyn Layer<Registry> + Sync + Send>>, + #[cfg(feature = "tracing-loki")] + #[builder(setters(vis = "", name = loki_internal))] + pub loki_task: tracing_loki::BackgroundTask, + #[cfg(feature = "opentelemetry")] + #[builder(setters(vis = "", name = otel_internal))] + pub otel_provider: opentelemetry_sdk::trace::SdkTracerProvider, +} + +// Define a custom finishing function as a method on the `UserBuilder`. +// The builder's state must implement the `IsComplete` trait. +// See details about it in the tip below this example. +impl<S: tracing_builder::IsComplete> TracingBuilder<S> { + pub fn build(self, config: &crate::Monitoring) -> Tracing { + // Delegate to `build_internal()` to get the instance of user. + let mut tracing = self.build_internal(); + + let layers = std::mem::take(&mut tracing.layers); + tracing_subscriber::registry() + .with(layers) + .with( + EnvFilter::try_from_default_env() + .unwrap_or_else(|_| config.log_level.to_string().into()), + ) + .try_init() + .ok(); + tracing + } +} + +// #[cfg(test)] +// mod tests { +// use super::*; +// +// #[test] +// fn build() { +// let builder = Tracing::builder().build(); +// let level = crate::Monitoring { +// log_level: "info".to_string(), +// #[cfg(feature = "opentelemetry")] +// opentelemetry_endpoint: "http://localhost:4317".into(), +// #[cfg(feature = "tracing-loki")] +// loki_endpoint: "http://localhost:3100".into(), +// }; +// builder.init(&level); +// builder.loki_task +// } +// } diff --git a/lib/warden-stack/src/tracing/loki.rs b/lib/warden-stack/src/tracing/loki.rs new file mode 100644 index 0000000..cbf4e40 --- /dev/null +++ b/lib/warden-stack/src/tracing/loki.rs @@ -0,0 +1,29 @@ +use crate::Monitoring; + +use super::TracingBuilder; +use super::tracing_builder::{IsUnset, SetLokiTask, State}; +use tracing_subscriber::Layer; + +impl<S: State> TracingBuilder<S> { + pub fn loki( + mut self, + config: &crate::AppConfig, + monitoring: &Monitoring, + ) -> Result<TracingBuilder<SetLokiTask<S>>, crate::ServiceError> + where + S::LokiTask: IsUnset, + { + use std::str::FromStr; + let url = FromStr::from_str(&monitoring.loki_endpoint.as_ref()) + .map_err(|_e| crate::ServiceError::Unknown)?; + + let (layer, task) = tracing_loki::builder() + .label("service_name", config.name.as_ref())? + .extra_field("pid", format!("{}", std::process::id()))? + .build_url(url)?; + + self.layers.push(layer.boxed()); + + Ok(self.loki_internal(task)) + } +} diff --git a/lib/warden-stack/src/tracing/telemetry.rs b/lib/warden-stack/src/tracing/telemetry.rs new file mode 100644 index 0000000..b024937 --- /dev/null +++ b/lib/warden-stack/src/tracing/telemetry.rs @@ -0,0 +1,137 @@ +#[cfg(any(feature = "nats-jetstream", feature = "nats-core"))] +pub mod nats { + pub mod extractor { + pub struct HeaderMap<'a>(pub &'a async_nats::HeaderMap); + + impl opentelemetry::propagation::Extractor for HeaderMap<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0 + .get(async_nats::header::IntoHeaderName::into_header_name(key)) + .map(|value| value.as_str()) + } + + fn keys(&self) -> Vec<&str> { + self.0.iter().map(|(n, _v)| n.as_ref()).collect() + } + } + } + + pub mod injector { + pub struct HeaderMap<'a>(pub &'a mut async_nats::HeaderMap); + + impl opentelemetry::propagation::Injector for HeaderMap<'_> { + fn set(&mut self, key: &str, value: String) { + self.0.insert(key, value); + } + } + } +} + +#[cfg(feature = "opentelemetry-tonic")] +pub mod tonic { + pub mod extractor { + pub struct MetadataMap<'a>(pub &'a tonic::metadata::MetadataMap); + impl opentelemetry::propagation::Extractor for MetadataMap<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + /// Collect all the keys from the MetadataMap. + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|key| match key { + tonic::metadata::KeyRef::Ascii(v) => v.as_str(), + tonic::metadata::KeyRef::Binary(v) => v.as_str(), + }) + .collect::<Vec<_>>() + } + } + } + + pub mod injector { + pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap); + + impl opentelemetry::propagation::Injector for MetadataMap<'_> { + /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs + fn set(&mut self, key: &str, value: String) { + if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { + if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) { + self.0.insert(key, val); + } + } + } + } + } +} + +use crate::Monitoring; + +use super::TracingBuilder; +use super::tracing_builder::{IsUnset, SetOtelProvider, State}; +use tracing_subscriber::Layer; + +impl<S: State> TracingBuilder<S> { + pub fn opentelemetry( + mut self, + config: &crate::AppConfig, + monitoring: &Monitoring, + ) -> Result<TracingBuilder<SetOtelProvider<S>>, crate::ServiceError> + where + S::OtelProvider: IsUnset, + { + use opentelemetry::{ + KeyValue, + global::{self}, + trace::TracerProvider, + }; + use opentelemetry_otlp::WithExportConfig; + use opentelemetry_sdk::{ + Resource, + trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, + }; + use opentelemetry_semantic_conventions::{ + SCHEMA_URL, + resource::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION}, + }; + use tracing_opentelemetry::OpenTelemetryLayer; + + global::set_text_map_propagator( + opentelemetry_sdk::propagation::TraceContextPropagator::new(), + ); + + let resource = Resource::builder() + .with_schema_url( + [ + KeyValue::new(SERVICE_NAME, config.name.to_owned()), + KeyValue::new(SERVICE_VERSION, config.version.to_owned()), + KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.env.to_string()), + ], + SCHEMA_URL, + ) + .with_service_name(config.name.to_string()) + .build(); + + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(monitoring.opentelemetry_endpoint.as_ref()) + .build() + .unwrap(); + + let provider = SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .with_resource(resource) + .with_id_generator(RandomIdGenerator::default()) + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + 1.0, + )))) + .build(); + + global::set_tracer_provider(provider.clone()); + + let layer = OpenTelemetryLayer::new(provider.tracer(config.name.as_ref().to_string())); + self.layers.push(layer.boxed()); + + Ok(self.otel_internal(provider)) + } +} |