aboutsummaryrefslogtreecommitdiffstats
path: root/lib/warden-stack/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/warden-stack/src')
-rw-r--r--lib/warden-stack/src/cache.rs292
-rw-r--r--lib/warden-stack/src/cache/cluster.rs52
-rw-r--r--lib/warden-stack/src/cache/sentinel.rs65
-rw-r--r--lib/warden-stack/src/config.rs139
-rw-r--r--lib/warden-stack/src/lib.rs95
-rw-r--r--lib/warden-stack/src/nats.rs61
-rw-r--r--lib/warden-stack/src/postgres.rs137
-rw-r--r--lib/warden-stack/src/tracing.rs66
-rw-r--r--lib/warden-stack/src/tracing/loki.rs29
-rw-r--r--lib/warden-stack/src/tracing/telemetry.rs137
10 files changed, 1073 insertions, 0 deletions
diff --git a/lib/warden-stack/src/cache.rs b/lib/warden-stack/src/cache.rs
new file mode 100644
index 0000000..9be3778
--- /dev/null
+++ b/lib/warden-stack/src/cache.rs
@@ -0,0 +1,292 @@
+// https://github.com/svix/svix-webhooks/tree/4ede01a3209658615bb8d3153965c5c3a2e1b7ff/server/svix-server/src/redis
+pub mod cluster;
+pub mod sentinel;
+
+use std::{sync::Arc, time::Duration};
+
+use bb8::{Pool, RunError};
+use bb8_redis::RedisConnectionManager;
+use redis::{
+ AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode,
+ aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo,
+};
+use sentinel::{RedisSentinelConnectionManager, SentinelConfig};
+use serde::Deserialize;
+use tokio::sync::Mutex;
+
+use crate::{
+ ServiceError, ServicesBuilder,
+ services_builder::{IsUnset, SetCache, State},
+};
+
+pub use self::cluster::RedisClusterConnectionManager;
+
+pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2);
+
+impl<S: State> ServicesBuilder<S> {
+ pub async fn cache(
+ self,
+ config: &CacheConfig,
+ ) -> Result<ServicesBuilder<SetCache<S>>, crate::ServiceError>
+ where
+ S::Cache: IsUnset,
+ {
+ Ok(self.cache_internal(RedisManager::new(config).await?))
+ }
+}
+
+fn default_max_conns() -> u16 {
+ 100
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub struct CacheConfig {
+ #[serde(rename = "dsn")]
+ redis_dsn: Arc<str>,
+ #[serde(default)]
+ pooled: bool,
+ #[serde(rename = "type")]
+ kind: RedisVariant,
+ #[serde(default = "default_max_conns")]
+ #[serde(rename = "max-connections")]
+ max_connections: u16,
+}
+
+#[derive(Debug, Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub enum RedisVariant {
+ Clustered,
+ NonClustered,
+ Sentinel(SentinelConfig),
+}
+
+#[derive(Clone)]
+pub enum RedisManager {
+ Clustered(Pool<RedisClusterConnectionManager>),
+ NonClustered(Pool<RedisConnectionManager>),
+ Sentinel(Pool<RedisSentinelConnectionManager>),
+ ClusteredUnpooled(redis::cluster_async::ClusterConnection),
+ NonClusteredUnpooled(redis::aio::ConnectionManager),
+ SentinelUnpooled(Arc<Mutex<redis::sentinel::SentinelClient>>),
+}
+
+impl RedisManager {
+ pub async fn new(config: &CacheConfig) -> Result<Self, ServiceError> {
+ if config.pooled {
+ Self::new_pooled(
+ config.redis_dsn.as_ref(),
+ &config.kind,
+ config.max_connections,
+ )
+ .await
+ } else {
+ Self::new_unpooled(config.redis_dsn.as_ref(), &config.kind).await
+ }
+ }
+ async fn new_pooled(
+ dsn: &str,
+ variant: &RedisVariant,
+ max_conns: u16,
+ ) -> Result<Self, ServiceError> {
+ match variant {
+ RedisVariant::Clustered => {
+ let mgr = RedisClusterConnectionManager::new(dsn)?;
+ let pool = bb8::Pool::builder()
+ .max_size(max_conns.into())
+ .build(mgr)
+ .await?;
+ Ok(RedisManager::Clustered(pool))
+ }
+ RedisVariant::NonClustered => {
+ let mgr = RedisConnectionManager::new(dsn)?;
+ let pool = bb8::Pool::builder()
+ .max_size(max_conns.into())
+ .build(mgr)
+ .await?;
+ Ok(RedisManager::NonClustered(pool))
+ }
+ RedisVariant::Sentinel(cfg) => {
+ let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure);
+ let protocol = if cfg.redis_use_resp3 {
+ ProtocolVersion::RESP3
+ } else {
+ ProtocolVersion::default()
+ };
+ let mgr = RedisSentinelConnectionManager::new(
+ vec![dsn],
+ cfg.service_name.clone(),
+ Some(SentinelNodeConnectionInfo {
+ tls_mode,
+ redis_connection_info: Some(RedisConnectionInfo {
+ db: cfg.redis_db.unwrap_or(0),
+ username: cfg.redis_username.clone(),
+ password: cfg.redis_password.clone(),
+ protocol,
+ }),
+ }),
+ )?;
+ let pool = bb8::Pool::builder()
+ .max_size(max_conns.into())
+ .build(mgr)
+ .await?;
+ Ok(RedisManager::Sentinel(pool))
+ }
+ }
+ }
+
+ async fn new_unpooled(dsn: &str, variant: &RedisVariant) -> Result<Self, ServiceError> {
+ match variant {
+ RedisVariant::Clustered => {
+ let cli = redis::cluster::ClusterClient::builder(vec![dsn])
+ .retries(1)
+ .connection_timeout(REDIS_CONN_TIMEOUT)
+ .build()?;
+ let con = cli.get_async_connection().await?;
+ Ok(RedisManager::ClusteredUnpooled(con))
+ }
+ RedisVariant::NonClustered => {
+ let cli = redis::Client::open(dsn)?;
+ let con = redis::aio::ConnectionManager::new_with_config(
+ cli,
+ ConnectionManagerConfig::new()
+ .set_number_of_retries(1)
+ .set_connection_timeout(REDIS_CONN_TIMEOUT),
+ )
+ .await?;
+ Ok(RedisManager::NonClusteredUnpooled(con))
+ }
+ RedisVariant::Sentinel(cfg) => {
+ let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure);
+ let protocol = if cfg.redis_use_resp3 {
+ ProtocolVersion::RESP3
+ } else {
+ ProtocolVersion::default()
+ };
+ let cli = redis::sentinel::SentinelClient::build(
+ vec![dsn],
+ cfg.service_name.clone(),
+ Some(SentinelNodeConnectionInfo {
+ tls_mode,
+ redis_connection_info: Some(RedisConnectionInfo {
+ db: cfg.redis_db.unwrap_or(0),
+ username: cfg.redis_username.clone(),
+ password: cfg.redis_password.clone(),
+ protocol,
+ }),
+ }),
+ redis::sentinel::SentinelServerType::Master,
+ )?;
+
+ Ok(RedisManager::SentinelUnpooled(Arc::new(Mutex::new(cli))))
+ }
+ }
+ }
+
+ pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
+ match self {
+ Self::Clustered(pool) => Ok(RedisConnection::Clustered(pool.get().await?)),
+ Self::NonClustered(pool) => Ok(RedisConnection::NonClustered(pool.get().await?)),
+ Self::Sentinel(pool) => Ok(RedisConnection::SentinelPooled(pool.get().await?)),
+ Self::ClusteredUnpooled(conn) => Ok(RedisConnection::ClusteredUnpooled(conn.clone())),
+ Self::NonClusteredUnpooled(conn) => {
+ Ok(RedisConnection::NonClusteredUnpooled(conn.clone()))
+ }
+ Self::SentinelUnpooled(conn) => {
+ let mut conn = conn.lock().await;
+ let con = conn
+ .get_async_connection_with_config(
+ &AsyncConnectionConfig::new().set_response_timeout(REDIS_CONN_TIMEOUT),
+ )
+ .await?;
+ Ok(RedisConnection::SentinelUnpooled(con))
+ }
+ }
+ }
+}
+
+pub enum RedisConnection<'a> {
+ Clustered(bb8::PooledConnection<'a, RedisClusterConnectionManager>),
+ NonClustered(bb8::PooledConnection<'a, RedisConnectionManager>),
+ SentinelPooled(bb8::PooledConnection<'a, RedisSentinelConnectionManager>),
+ ClusteredUnpooled(redis::cluster_async::ClusterConnection),
+ NonClusteredUnpooled(redis::aio::ConnectionManager),
+ SentinelUnpooled(redis::aio::MultiplexedConnection),
+}
+
+impl redis::aio::ConnectionLike for RedisConnection<'_> {
+ fn req_packed_command<'a>(
+ &'a mut self,
+ cmd: &'a redis::Cmd,
+ ) -> redis::RedisFuture<'a, redis::Value> {
+ match self {
+ RedisConnection::Clustered(conn) => conn.req_packed_command(cmd),
+ RedisConnection::NonClustered(conn) => conn.req_packed_command(cmd),
+ RedisConnection::ClusteredUnpooled(conn) => conn.req_packed_command(cmd),
+ RedisConnection::NonClusteredUnpooled(conn) => conn.req_packed_command(cmd),
+ RedisConnection::SentinelPooled(conn) => conn.req_packed_command(cmd),
+ RedisConnection::SentinelUnpooled(conn) => conn.req_packed_command(cmd),
+ }
+ }
+
+ fn req_packed_commands<'a>(
+ &'a mut self,
+ cmd: &'a redis::Pipeline,
+ offset: usize,
+ count: usize,
+ ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
+ match self {
+ RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::NonClustered(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::ClusteredUnpooled(conn) => {
+ conn.req_packed_commands(cmd, offset, count)
+ }
+ RedisConnection::NonClusteredUnpooled(conn) => {
+ conn.req_packed_commands(cmd, offset, count)
+ }
+ RedisConnection::SentinelPooled(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::SentinelUnpooled(conn) => conn.req_packed_commands(cmd, offset, count),
+ }
+ }
+
+ fn get_db(&self) -> i64 {
+ match self {
+ RedisConnection::Clustered(conn) => conn.get_db(),
+ RedisConnection::NonClustered(conn) => conn.get_db(),
+ RedisConnection::ClusteredUnpooled(conn) => conn.get_db(),
+ RedisConnection::NonClusteredUnpooled(conn) => conn.get_db(),
+ RedisConnection::SentinelPooled(conn) => conn.get_db(),
+ RedisConnection::SentinelUnpooled(conn) => conn.get_db(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use redis::AsyncCommands;
+
+ use crate::cache::CacheConfig;
+
+ use super::RedisManager;
+
+ // Ensure basic set/get works -- should test sharding as well:
+ #[tokio::test]
+ // run with `cargo test -- --ignored redis` only when redis is up and configured
+ #[ignore]
+ async fn test_set_read_random_keys() {
+ let config = CacheConfig {
+ redis_dsn: "redis://localhost:6379".into(),
+ pooled: false,
+ kind: crate::cache::RedisVariant::NonClustered,
+ max_connections: 10,
+ };
+ let mgr = RedisManager::new(&config).await.unwrap();
+ let mut conn = mgr.get().await.unwrap();
+
+ for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() {
+ let key = key.to_string();
+ let _: () = conn.set(key.clone(), val).await.unwrap();
+ assert_eq!(conn.get::<_, usize>(&key).await.unwrap(), val);
+ }
+ }
+}
diff --git a/lib/warden-stack/src/cache/cluster.rs b/lib/warden-stack/src/cache/cluster.rs
new file mode 100644
index 0000000..91e3b24
--- /dev/null
+++ b/lib/warden-stack/src/cache/cluster.rs
@@ -0,0 +1,52 @@
+use redis::{
+ ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError,
+ cluster::{ClusterClient, ClusterClientBuilder},
+ cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo},
+};
+
+/// ConnectionManager that implements `bb8::ManageConnection` and supports
+/// asynchronous clustered connections via `redis_cluster_async::Connection`
+#[derive(Clone)]
+pub struct RedisClusterConnectionManager {
+ client: ClusterClient,
+}
+
+impl RedisClusterConnectionManager {
+ pub fn new<T: IntoConnectionInfo>(
+ info: T,
+ ) -> Result<RedisClusterConnectionManager, RedisError> {
+ Ok(RedisClusterConnectionManager {
+ client: ClusterClientBuilder::new(vec![info]).retries(0).build()?,
+ })
+ }
+}
+
+impl bb8::ManageConnection for RedisClusterConnectionManager {
+ type Connection = redis::cluster_async::ClusterConnection;
+ type Error = RedisError;
+
+ async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ self.client.get_async_connection().await
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
+ let pong = conn
+ .route_command(
+ &redis::cmd("PING"),
+ RoutingInfo::MultiNode((
+ MultipleNodeRoutingInfo::AllMasters,
+ Some(ResponsePolicy::OneSucceeded),
+ )),
+ )
+ .await
+ .and_then(|v| String::from_redis_value(&v))?;
+ match pong.as_str() {
+ "PONG" => Ok(()),
+ _ => Err((ErrorKind::ResponseError, "ping request").into()),
+ }
+ }
+
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
diff --git a/lib/warden-stack/src/cache/sentinel.rs b/lib/warden-stack/src/cache/sentinel.rs
new file mode 100644
index 0000000..c9f787a
--- /dev/null
+++ b/lib/warden-stack/src/cache/sentinel.rs
@@ -0,0 +1,65 @@
+use redis::{
+ ErrorKind, IntoConnectionInfo, RedisError,
+ sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType},
+};
+use serde::Deserialize;
+use tokio::sync::Mutex;
+
+struct LockedSentinelClient(pub(crate) Mutex<SentinelClient>);
+
+/// ConnectionManager that implements `bb8::ManageConnection` and supports
+/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient`
+pub struct RedisSentinelConnectionManager {
+ client: LockedSentinelClient,
+}
+
+impl RedisSentinelConnectionManager {
+ pub fn new<T: IntoConnectionInfo>(
+ info: Vec<T>,
+ service_name: String,
+ node_connection_info: Option<SentinelNodeConnectionInfo>,
+ ) -> Result<RedisSentinelConnectionManager, RedisError> {
+ Ok(RedisSentinelConnectionManager {
+ client: LockedSentinelClient(Mutex::new(SentinelClient::build(
+ info,
+ service_name,
+ node_connection_info,
+ SentinelServerType::Master,
+ )?)),
+ })
+ }
+}
+
+impl bb8::ManageConnection for RedisSentinelConnectionManager {
+ type Connection = redis::aio::MultiplexedConnection;
+ type Error = RedisError;
+
+ async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ self.client.0.lock().await.get_async_connection().await
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
+ let pong: String = redis::cmd("PING").query_async(conn).await?;
+ match pong.as_str() {
+ "PONG" => Ok(()),
+ _ => Err((ErrorKind::ResponseError, "ping request").into()),
+ }
+ }
+
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
+pub struct SentinelConfig {
+ #[serde(rename = "sentinel_service_name")]
+ pub service_name: String,
+ #[serde(default)]
+ pub redis_tls_mode_secure: bool,
+ pub redis_db: Option<i64>,
+ pub redis_username: Option<String>,
+ pub redis_password: Option<String>,
+ #[serde(default)]
+ pub redis_use_resp3: bool,
+}
diff --git a/lib/warden-stack/src/config.rs b/lib/warden-stack/src/config.rs
new file mode 100644
index 0000000..9f42f43
--- /dev/null
+++ b/lib/warden-stack/src/config.rs
@@ -0,0 +1,139 @@
+use serde::Deserialize;
+
+use std::{fmt::Display, sync::Arc};
+
+#[derive(Clone, Debug, Deserialize)]
+pub struct AppConfig {
+ #[serde(skip)]
+ pub name: Arc<str>,
+ #[serde(skip)]
+ pub version: Arc<str>,
+ #[serde(default)]
+ pub env: Environment,
+ #[cfg(feature = "api")]
+ #[serde(default = "default_port")]
+ pub port: u16,
+}
+
+#[cfg(feature = "api")]
+pub(crate) fn default_port() -> u16 {
+ 2210
+}
+
+#[cfg(feature = "tracing")]
+pub(crate) fn default_log() -> String {
+ #[cfg(debug_assertions)]
+ return "debug".into();
+ #[cfg(not(debug_assertions))]
+ "info".into()
+}
+
+#[derive(Clone, Debug, Deserialize)]
+pub struct Configuration {
+ pub application: AppConfig,
+ #[cfg(feature = "postgres")]
+ pub database: crate::postgres::PostgresConfig,
+ #[cfg(feature = "cache")]
+ pub cache: crate::cache::CacheConfig,
+ #[serde(default)]
+ pub misc: serde_json::Value,
+ #[cfg(feature = "tracing")]
+ pub monitoring: Monitoring,
+ #[cfg(any(feature = "nats-core", feature = "nats-jetstream"))]
+ pub nats: crate::nats::NatsConfig,
+}
+
+#[derive(Clone, Debug, Deserialize)]
+pub struct Monitoring {
+ #[serde(rename = "log-level")]
+ #[cfg(feature = "tracing")]
+ #[serde(default = "default_log")]
+ pub log_level: String,
+ #[cfg(feature = "opentelemetry")]
+ #[serde(rename = "opentelemetry-endpoint")]
+ #[serde(default = "default_opentelemetry")]
+ pub opentelemetry_endpoint: Arc<str>,
+ #[cfg(feature = "tracing-loki")]
+ #[serde(rename = "loki-endpoint")]
+ #[serde(default = "default_loki")]
+ pub loki_endpoint: Arc<str>,
+}
+
+#[cfg(feature = "tracing-loki")]
+pub(crate) fn default_loki() -> Arc<str> {
+ "http://localhost:3100".into()
+}
+
+#[cfg(feature = "opentelemetry")]
+pub(crate) fn default_opentelemetry() -> Arc<str> {
+ "http://localhost:4317".into()
+}
+
+#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Default)]
+#[cfg_attr(test, derive(serde::Serialize))]
+#[serde(rename_all = "lowercase")]
+pub enum Environment {
+ #[default]
+ Development,
+ Production,
+}
+
+impl Display for Environment {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{}",
+ match self {
+ Environment::Development => "development",
+ Environment::Production => "production",
+ }
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ // Test that the enum is correctly serialized and deserialized
+ #[test]
+ fn test_environment_serialization() {
+ // Test serialization for Development
+ let dev = Environment::Development;
+ let dev_json = serde_json::to_string(&dev).unwrap();
+ assert_eq!(dev_json, "\"development\"");
+
+ // Test serialization for Production
+ let prod = Environment::Production;
+ let prod_json = serde_json::to_string(&prod).unwrap();
+ assert_eq!(prod_json, "\"production\"");
+
+ // Test deserialization for Development
+ let dev_str = "\"development\"";
+ let deserialized_dev: Environment = serde_json::from_str(dev_str).unwrap();
+ assert_eq!(deserialized_dev, Environment::Development);
+
+ // Test deserialization for Production
+ let prod_str = "\"production\"";
+ let deserialized_prod: Environment = serde_json::from_str(prod_str).unwrap();
+ assert_eq!(deserialized_prod, Environment::Production);
+ }
+
+ // Test Display implementation
+ #[test]
+ fn test_environment_display() {
+ let dev = Environment::Development;
+ assert_eq!(format!("{}", dev), "development");
+
+ let prod = Environment::Production;
+ assert_eq!(format!("{}", prod), "production");
+ }
+
+ #[test]
+ #[cfg(feature = "api")]
+ fn test_port() {
+ let listen_address =
+ std::net::SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, default_port()));
+ assert_eq!(listen_address.port(), default_port());
+ }
+}
diff --git a/lib/warden-stack/src/lib.rs b/lib/warden-stack/src/lib.rs
new file mode 100644
index 0000000..efd6862
--- /dev/null
+++ b/lib/warden-stack/src/lib.rs
@@ -0,0 +1,95 @@
+#![cfg_attr(docsrs, feature(doc_cfg))]
+
+#[cfg(feature = "tracing")]
+#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
+pub mod tracing;
+
+#[cfg(feature = "cache")]
+#[cfg_attr(docsrs, doc(cfg(feature = "cache")))]
+pub mod cache;
+
+#[cfg(feature = "cache")]
+pub use redis;
+
+#[cfg(feature = "postgres")]
+pub use sqlx;
+
+#[cfg(any(feature = "nats-core", feature = "nats-jetstream"))]
+pub use async_nats;
+
+#[cfg(feature = "opentelemetry")]
+mod otel {
+ pub use opentelemetry;
+ pub use opentelemetry_http;
+ pub use opentelemetry_otlp;
+ pub use opentelemetry_sdk;
+ pub use opentelemetry_semantic_conventions;
+ pub use tracing_opentelemetry;
+}
+
+#[cfg(feature = "opentelemetry")]
+pub use otel::*;
+
+#[cfg(feature = "postgres")]
+#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
+pub mod postgres;
+
+#[cfg(any(feature = "nats-core", feature = "nats-jetstream"))]
+#[cfg_attr(
+ docsrs,
+ doc(cfg(any(feature = "nats-core", feature = "nats-jetstream")))
+)]
+pub mod nats;
+
+mod config;
+pub use config::*;
+
+#[derive(Clone, bon::Builder)]
+pub struct Services {
+ #[cfg(feature = "postgres")]
+ #[builder(setters(vis = "", name = pg_internal))]
+ pub postgres: Option<sqlx::PgPool>,
+ #[cfg(feature = "cache")]
+ #[builder(setters(vis = "", name = cache_internal))]
+ pub cache: Option<cache::RedisManager>,
+ #[cfg(feature = "nats-core")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "nats-core")))]
+ #[builder(setters(vis = "", name = nats_internal))]
+ /// NATS connection handle
+ pub nats: Option<async_nats::Client>,
+ #[cfg(feature = "nats-jetstream")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "nats-jetstream")))]
+ #[builder(setters(vis = "", name = jetstream_internal))]
+ /// NATS-Jetstream connection handle
+ pub jetstream: Option<async_nats::jetstream::Context>,
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum ServiceError {
+ #[error("service was not initialised")]
+ NotInitialised,
+ #[error("unknown data store error")]
+ Unknown,
+ #[error("invalid config `{0}`")]
+ Configuration(String),
+ #[cfg(feature = "postgres")]
+ #[error(transparent)]
+ /// Postgres error
+ Postgres(#[from] sqlx::Error),
+ #[cfg(feature = "cache")]
+ #[error(transparent)]
+ /// Redis error
+ Cache(#[from] redis::RedisError),
+ #[cfg(feature = "opentelemetry")]
+ #[error(transparent)]
+ /// When creating the tracing layer
+ Opentelemetry(#[from] opentelemetry_sdk::trace::TraceError),
+ #[cfg(any(feature = "nats-core", feature = "nats-jetstream"))]
+ #[error(transparent)]
+ /// NATS error
+ Nats(#[from] async_nats::error::Error<async_nats::ConnectErrorKind>),
+ #[cfg(feature = "tracing-loki")]
+ #[error(transparent)]
+ /// When creating the tracing layer
+ Loki(#[from] tracing_loki::Error),
+}
diff --git a/lib/warden-stack/src/nats.rs b/lib/warden-stack/src/nats.rs
new file mode 100644
index 0000000..952490c
--- /dev/null
+++ b/lib/warden-stack/src/nats.rs
@@ -0,0 +1,61 @@
+use std::sync::Arc;
+
+use serde::Deserialize;
+
+#[derive(Deserialize, Clone, Debug)]
+/// Nats configuration
+pub struct NatsConfig {
+ /// Hosts dsn
+ #[serde(default = "nats")]
+ pub hosts: Arc<[String]>,
+}
+
+pub(crate) fn nats() -> Arc<[String]> {
+ let hosts = vec!["nats://localhost:4222".to_string()];
+ hosts.into()
+}
+
+impl NatsConfig {
+ fn hosts(&self) -> Vec<String> {
+ self.hosts.iter().map(ToString::to_string).collect()
+ }
+}
+
+use crate::{
+ ServiceError, ServicesBuilder,
+ services_builder::{IsUnset, State},
+};
+
+#[cfg(feature = "nats-jetstream")]
+impl<S: State> ServicesBuilder<S> {
+ /// create a Jetstream Context using the provided [NatsConfig]
+ pub async fn nats_jetstream(
+ self,
+ config: &NatsConfig,
+ ) -> Result<ServicesBuilder<crate::services_builder::SetJetstream<S>>, ServiceError>
+ where
+ S::Jetstream: IsUnset,
+ {
+ let hosts = config.hosts();
+ let client = async_nats::connect(hosts).await?;
+
+ Ok(self.jetstream_internal(async_nats::jetstream::new(client)))
+ }
+}
+
+#[cfg(feature = "nats-core")]
+impl<S: State> ServicesBuilder<S> {
+ /// create a NATS connection using the provided [NatsConfig]
+ pub async fn nats(
+ self,
+ config: &NatsConfig,
+ ) -> Result<ServicesBuilder<crate::services_builder::SetNats<S>>, ServiceError>
+ where
+ S::Nats: IsUnset,
+ {
+ let hosts = config.hosts();
+ let client = async_nats::connect(hosts).await?;
+
+ Ok(self.nats_internal(client))
+ }
+}
diff --git a/lib/warden-stack/src/postgres.rs b/lib/warden-stack/src/postgres.rs
new file mode 100644
index 0000000..3264368
--- /dev/null
+++ b/lib/warden-stack/src/postgres.rs
@@ -0,0 +1,137 @@
+use std::sync::Arc;
+
+use secrecy::{ExposeSecret, SecretString};
+use serde::Deserialize;
+use url::Url;
+
+use crate::{
+ ServicesBuilder,
+ services_builder::{IsUnset, SetPostgres, State},
+};
+
+#[derive(Debug, Deserialize, Clone)]
+pub struct PostgresConfig {
+ #[serde(default = "default_pool_size")]
+ pool_size: u32,
+ #[serde(default = "default_port")]
+ port: u32,
+ name: Arc<str>,
+ host: Arc<str>,
+ #[serde(default = "user")]
+ user: Arc<str>,
+ password: SecretString,
+}
+
+fn default_pool_size() -> u32 {
+ 100
+}
+
+fn user() -> Arc<str> {
+ "postgres".into()
+}
+
+fn default_port() -> u32 {
+ 5432
+}
+
+impl PostgresConfig {
+ // Getter for size
+ pub fn pool_size(&self) -> u32 {
+ self.pool_size
+ }
+
+ // Getter for port
+ pub fn port(&self) -> u32 {
+ self.port
+ }
+
+ // Getter for name
+ pub fn name(&self) -> &str {
+ self.name.as_ref()
+ }
+
+ // Getter for host
+ pub fn host(&self) -> &str {
+ self.host.as_ref()
+ }
+
+ // Getter for username
+ pub fn username(&self) -> &str {
+ self.user.as_ref()
+ }
+
+ // Getter for password (you may want to return a reference or handle it differently)
+ pub fn password(&self) -> &SecretString {
+ &self.password
+ }
+
+ pub(crate) fn connection_string(&self) -> Result<Url, crate::ServiceError> {
+ Url::parse(&format!(
+ "postgres://{}:{}@{}:{}/{}",
+ self.user,
+ self.password.expose_secret(),
+ self.host,
+ self.port,
+ self.name
+ ))
+ .map_err(|e| crate::ServiceError::Configuration(e.to_string()))
+ }
+}
+
+impl<S: State> ServicesBuilder<S> {
+ pub async fn postgres(
+ self,
+ config: &PostgresConfig,
+ ) -> Result<ServicesBuilder<SetPostgres<S>>, crate::ServiceError>
+ where
+ S::Postgres: IsUnset,
+ {
+ let pg = sqlx::postgres::PgPoolOptions::new()
+ // The default connection limit for a Postgres server is 100 connections, with 3 reserved for superusers.
+ //
+ // If you're deploying your application with multiple replicas, then the total
+ // across all replicas should not exceed the Postgres connection limit
+ // (max_connections postgresql.conf).
+ .max_connections(config.pool_size)
+ .connect(config.connection_string()?.as_ref())
+ .await?;
+ Ok(self.pg_internal(pg))
+ }
+}
+
+#[cfg(all(test, target_os = "linux"))]
+mod test {
+ use super::*;
+ use crate::Services;
+
+ #[tokio::test]
+ async fn docker_stack_db() {
+ let port = default_port();
+ let name = "";
+ let host = "localhost";
+ let user = user();
+ let pool_size = default_pool_size();
+ let password = "postgres";
+
+ let config = PostgresConfig {
+ pool_size,
+ port,
+ name: name.into(),
+ host: host.into(),
+ user: user.clone(),
+ password: secrecy::SecretString::new(password.into()),
+ };
+
+ assert_eq!(config.name(), name);
+ assert_eq!(config.pool_size(), pool_size);
+ assert_eq!(config.username(), user.as_ref());
+ assert_eq!(config.host(), host);
+ assert_eq!(config.port(), port);
+
+ assert_eq!(config.password().expose_secret(), password);
+
+ let service = Services::builder().postgres(&config).await;
+
+ assert!(service.is_ok());
+ }
+}
diff --git a/lib/warden-stack/src/tracing.rs b/lib/warden-stack/src/tracing.rs
new file mode 100644
index 0000000..1a40f4b
--- /dev/null
+++ b/lib/warden-stack/src/tracing.rs
@@ -0,0 +1,66 @@
+#[cfg(feature = "opentelemetry")]
+pub mod telemetry;
+
+#[cfg(feature = "opentelemetry")]
+pub use opentelemetry_sdk::trace::SdkTracerProvider;
+
+#[cfg(feature = "tracing-loki")]
+mod loki;
+
+use tracing_subscriber::{
+ EnvFilter, Layer, Registry, layer::SubscriberExt, util::SubscriberInitExt,
+};
+
+/// Telemetry handle
+#[derive(bon::Builder)]
+#[builder(finish_fn(vis = "", name = build_internal))]
+pub struct Tracing {
+ #[builder(field = vec![tracing_subscriber::fmt::layer().boxed()])]
+ layers: Vec<Box<dyn Layer<Registry> + Sync + Send>>,
+ #[cfg(feature = "tracing-loki")]
+ #[builder(setters(vis = "", name = loki_internal))]
+ pub loki_task: tracing_loki::BackgroundTask,
+ #[cfg(feature = "opentelemetry")]
+ #[builder(setters(vis = "", name = otel_internal))]
+ pub otel_provider: opentelemetry_sdk::trace::SdkTracerProvider,
+}
+
+// Define a custom finishing function as a method on the `UserBuilder`.
+// The builder's state must implement the `IsComplete` trait.
+// See details about it in the tip below this example.
+impl<S: tracing_builder::IsComplete> TracingBuilder<S> {
+ pub fn build(self, config: &crate::Monitoring) -> Tracing {
+ // Delegate to `build_internal()` to get the instance of user.
+ let mut tracing = self.build_internal();
+
+ let layers = std::mem::take(&mut tracing.layers);
+ tracing_subscriber::registry()
+ .with(layers)
+ .with(
+ EnvFilter::try_from_default_env()
+ .unwrap_or_else(|_| config.log_level.to_string().into()),
+ )
+ .try_init()
+ .ok();
+ tracing
+ }
+}
+
+// #[cfg(test)]
+// mod tests {
+// use super::*;
+//
+// #[test]
+// fn build() {
+// let builder = Tracing::builder().build();
+// let level = crate::Monitoring {
+// log_level: "info".to_string(),
+// #[cfg(feature = "opentelemetry")]
+// opentelemetry_endpoint: "http://localhost:4317".into(),
+// #[cfg(feature = "tracing-loki")]
+// loki_endpoint: "http://localhost:3100".into(),
+// };
+// builder.init(&level);
+// builder.loki_task
+// }
+// }
diff --git a/lib/warden-stack/src/tracing/loki.rs b/lib/warden-stack/src/tracing/loki.rs
new file mode 100644
index 0000000..cbf4e40
--- /dev/null
+++ b/lib/warden-stack/src/tracing/loki.rs
@@ -0,0 +1,29 @@
+use crate::Monitoring;
+
+use super::TracingBuilder;
+use super::tracing_builder::{IsUnset, SetLokiTask, State};
+use tracing_subscriber::Layer;
+
+impl<S: State> TracingBuilder<S> {
+ pub fn loki(
+ mut self,
+ config: &crate::AppConfig,
+ monitoring: &Monitoring,
+ ) -> Result<TracingBuilder<SetLokiTask<S>>, crate::ServiceError>
+ where
+ S::LokiTask: IsUnset,
+ {
+ use std::str::FromStr;
+ let url = FromStr::from_str(&monitoring.loki_endpoint.as_ref())
+ .map_err(|_e| crate::ServiceError::Unknown)?;
+
+ let (layer, task) = tracing_loki::builder()
+ .label("service_name", config.name.as_ref())?
+ .extra_field("pid", format!("{}", std::process::id()))?
+ .build_url(url)?;
+
+ self.layers.push(layer.boxed());
+
+ Ok(self.loki_internal(task))
+ }
+}
diff --git a/lib/warden-stack/src/tracing/telemetry.rs b/lib/warden-stack/src/tracing/telemetry.rs
new file mode 100644
index 0000000..b024937
--- /dev/null
+++ b/lib/warden-stack/src/tracing/telemetry.rs
@@ -0,0 +1,137 @@
+#[cfg(any(feature = "nats-jetstream", feature = "nats-core"))]
+pub mod nats {
+ pub mod extractor {
+ pub struct HeaderMap<'a>(pub &'a async_nats::HeaderMap);
+
+ impl opentelemetry::propagation::Extractor for HeaderMap<'_> {
+ fn get(&self, key: &str) -> Option<&str> {
+ self.0
+ .get(async_nats::header::IntoHeaderName::into_header_name(key))
+ .map(|value| value.as_str())
+ }
+
+ fn keys(&self) -> Vec<&str> {
+ self.0.iter().map(|(n, _v)| n.as_ref()).collect()
+ }
+ }
+ }
+
+ pub mod injector {
+ pub struct HeaderMap<'a>(pub &'a mut async_nats::HeaderMap);
+
+ impl opentelemetry::propagation::Injector for HeaderMap<'_> {
+ fn set(&mut self, key: &str, value: String) {
+ self.0.insert(key, value);
+ }
+ }
+ }
+}
+
+#[cfg(feature = "opentelemetry-tonic")]
+pub mod tonic {
+ pub mod extractor {
+ pub struct MetadataMap<'a>(pub &'a tonic::metadata::MetadataMap);
+ impl opentelemetry::propagation::Extractor for MetadataMap<'_> {
+ fn get(&self, key: &str) -> Option<&str> {
+ self.0.get(key).and_then(|metadata| metadata.to_str().ok())
+ }
+
+ /// Collect all the keys from the MetadataMap.
+ fn keys(&self) -> Vec<&str> {
+ self.0
+ .keys()
+ .map(|key| match key {
+ tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
+ tonic::metadata::KeyRef::Binary(v) => v.as_str(),
+ })
+ .collect::<Vec<_>>()
+ }
+ }
+ }
+
+ pub mod injector {
+ pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap);
+
+ impl opentelemetry::propagation::Injector for MetadataMap<'_> {
+ /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs
+ fn set(&mut self, key: &str, value: String) {
+ if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
+ if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) {
+ self.0.insert(key, val);
+ }
+ }
+ }
+ }
+ }
+}
+
+use crate::Monitoring;
+
+use super::TracingBuilder;
+use super::tracing_builder::{IsUnset, SetOtelProvider, State};
+use tracing_subscriber::Layer;
+
+impl<S: State> TracingBuilder<S> {
+ pub fn opentelemetry(
+ mut self,
+ config: &crate::AppConfig,
+ monitoring: &Monitoring,
+ ) -> Result<TracingBuilder<SetOtelProvider<S>>, crate::ServiceError>
+ where
+ S::OtelProvider: IsUnset,
+ {
+ use opentelemetry::{
+ KeyValue,
+ global::{self},
+ trace::TracerProvider,
+ };
+ use opentelemetry_otlp::WithExportConfig;
+ use opentelemetry_sdk::{
+ Resource,
+ trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
+ };
+ use opentelemetry_semantic_conventions::{
+ SCHEMA_URL,
+ resource::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION},
+ };
+ use tracing_opentelemetry::OpenTelemetryLayer;
+
+ global::set_text_map_propagator(
+ opentelemetry_sdk::propagation::TraceContextPropagator::new(),
+ );
+
+ let resource = Resource::builder()
+ .with_schema_url(
+ [
+ KeyValue::new(SERVICE_NAME, config.name.to_owned()),
+ KeyValue::new(SERVICE_VERSION, config.version.to_owned()),
+ KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.env.to_string()),
+ ],
+ SCHEMA_URL,
+ )
+ .with_service_name(config.name.to_string())
+ .build();
+
+ let exporter = opentelemetry_otlp::SpanExporter::builder()
+ .with_tonic()
+ .with_endpoint(monitoring.opentelemetry_endpoint.as_ref())
+ .build()
+ .unwrap();
+
+ let provider = SdkTracerProvider::builder()
+ .with_batch_exporter(exporter)
+ .with_resource(resource)
+ .with_id_generator(RandomIdGenerator::default())
+ .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
+ 1.0,
+ ))))
+ .build();
+
+ global::set_tracer_provider(provider.clone());
+
+ let layer = OpenTelemetryLayer::new(provider.tracer(config.name.as_ref().to_string()));
+ self.layers.push(layer.boxed());
+
+ Ok(self.otel_internal(provider))
+ }
+}