aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2025-08-10 12:55:43 +0200
committerrtkay123 <dev@kanjala.com>2025-08-10 12:55:43 +0200
commitbd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04 (patch)
tree50b63525480da0bee2ce713d69f02617c20bee8d /lib
parent8deeab3e11f707677609047f5577a256cf28ed63 (diff)
downloadwarden-bd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04.tar.bz2
warden-bd31dc85f8e9cb01c1e1a4e49fd4735d24a6da04.zip
chore: collapse stack-up
Diffstat (limited to 'lib')
-rw-r--r--lib/warden-core/build.rs10
-rw-r--r--lib/warden-stack/Cargo.toml75
-rw-r--r--lib/warden-stack/LICENSE-APACHE201
-rw-r--r--lib/warden-stack/LICENSE-MIT21
-rw-r--r--lib/warden-stack/README.md22
-rw-r--r--lib/warden-stack/examples/tracing.rs10
-rw-r--r--lib/warden-stack/src/cache.rs292
-rw-r--r--lib/warden-stack/src/cache/cluster.rs52
-rw-r--r--lib/warden-stack/src/cache/sentinel.rs65
-rw-r--r--lib/warden-stack/src/config.rs139
-rw-r--r--lib/warden-stack/src/lib.rs95
-rw-r--r--lib/warden-stack/src/nats.rs61
-rw-r--r--lib/warden-stack/src/postgres.rs137
-rw-r--r--lib/warden-stack/src/tracing.rs66
-rw-r--r--lib/warden-stack/src/tracing/loki.rs29
-rw-r--r--lib/warden-stack/src/tracing/telemetry.rs137
16 files changed, 1404 insertions, 8 deletions
diff --git a/lib/warden-core/build.rs b/lib/warden-core/build.rs
index 57e20e0..6d5efbb 100644
--- a/lib/warden-core/build.rs
+++ b/lib/warden-core/build.rs
@@ -9,17 +9,11 @@ enum Entity {
#[cfg(any(feature = "message", feature = "pseudonyms"))]
impl Entity {
fn protos(&self) -> Vec<&'static str> {
- let mut res: Vec<&'static str> = vec![
- // "proto/googleapis/google/type/date.proto",
- // "proto/googleapis/google/type/money.proto",
- // "proto/googleapis/google/type/latlng.proto",
- ];
+ let mut res: Vec<&'static str> = vec![];
#[cfg(feature = "message")]
fn iso20022_protos() -> Vec<&'static str> {
vec![
- // "proto/iso20022/pacs_008_001_12.proto",
- // "proto/iso20022/pacs_002_001_12.proto",
"proto/warden_message.proto",
]
}
@@ -60,7 +54,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(feature = "pseudonyms")]
protos.extend(Entity::Pseudonyms.protos());
-#[cfg(any(feature = "message", feature = "pseudonyms"))]
+ #[cfg(any(feature = "message", feature = "pseudonyms"))]
build_proto(&protos)?;
Ok(())
diff --git a/lib/warden-stack/Cargo.toml b/lib/warden-stack/Cargo.toml
new file mode 100644
index 0000000..d7c1eb8
--- /dev/null
+++ b/lib/warden-stack/Cargo.toml
@@ -0,0 +1,75 @@
+[package]
+name = "warden-stack"
+version = "0.1.0"
+edition = "2024"
+license = "MIT OR Apache-2.0"
+
+[dependencies]
+async-nats = { workspace = true, optional = true }
+bb8 = { version = "0.9.0", optional = true }
+bb8-redis = { version = "0.24.0", optional = true }
+bon.workspace = true
+opentelemetry = { workspace = true, optional = true }
+opentelemetry-http = { workspace = true, optional = true }
+opentelemetry-otlp = { workspace = true, optional = true }
+opentelemetry-semantic-conventions = { workspace = true, optional = true }
+opentelemetry_sdk = { workspace = true, optional = true }
+redis = { workspace = true, optional = true }
+secrecy = { workspace = true, optional = true }
+serde = { workspace = true, features = ["derive", "rc"] }
+serde_json.workspace = true
+sqlx = { workspace = true, optional = true }
+thiserror.workspace = true
+tokio = { workspace = true, optional = true }
+tonic = { workspace = true, optional = true }
+tracing = { workspace = true, optional = true }
+tracing-loki = { version = "0.2.6", optional = true, default-features = false, features = ["compat-0-2-1", "rustls"] }
+tracing-opentelemetry = { workspace = true, optional = true }
+tracing-subscriber = { version = "0.3.19", optional = true }
+url = { workspace = true, optional = true }
+
+[features]
+default = []
+api = []
+cache = [
+ "dep:redis",
+ "redis/cluster-async",
+ "redis/connection-manager",
+ "redis/tokio-comp",
+ "redis/sentinel",
+ "tokio/sync",
+ "dep:bb8",
+ "dep:bb8-redis",
+ "url/serde",
+]
+nats-core = ["dep:async-nats"]
+nats-jetstream = ["dep:async-nats"]
+opentelemetry = [
+ "dep:opentelemetry",
+ "dep:tracing-opentelemetry",
+ "tracing",
+ "opentelemetry_sdk/rt-tokio",
+ "opentelemetry_sdk/trace",
+ "opentelemetry/trace",
+ "opentelemetry-http",
+ "opentelemetry-otlp/grpc-tonic",
+ "opentelemetry-otlp/http-proto",
+ "opentelemetry-semantic-conventions/semconv_experimental",
+]
+postgres = ["sqlx/postgres", "url/serde", "secrecy/serde"]
+tracing = ["dep:tracing", "tracing-subscriber/env-filter"]
+opentelemetry-tonic = ["dep:tonic"]
+tracing-loki = ["dep:tracing-loki", "tracing"]
+
+[[example]]
+name = "tracing"
+path = "examples/tracing.rs"
+required-features = ["tracing"]
+
+[dev-dependencies]
+tokio = { version = "*", features = ["macros", "rt"] }
+sqlx = { version = "*", features = ["runtime-tokio"] }
+
+[package.metadata.docs.rs]
+all-features = true
+rustdoc-args = ["--cfg", "docsrs"]
diff --git a/lib/warden-stack/LICENSE-APACHE b/lib/warden-stack/LICENSE-APACHE
new file mode 100644
index 0000000..4c986ff
--- /dev/null
+++ b/lib/warden-stack/LICENSE-APACHE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2025 rtkay
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/lib/warden-stack/LICENSE-MIT b/lib/warden-stack/LICENSE-MIT
new file mode 100644
index 0000000..7eed760
--- /dev/null
+++ b/lib/warden-stack/LICENSE-MIT
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2025 rtkay
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/lib/warden-stack/README.md b/lib/warden-stack/README.md
new file mode 100644
index 0000000..12f9f3a
--- /dev/null
+++ b/lib/warden-stack/README.md
@@ -0,0 +1,22 @@
+warden-stack!
+
+A crate centralising the configuration and initialisation of various services I find myself using in a lot of projects.
+
+# TL:DR
+Find a feature, activate it, initialise it in your builder, then you're good to go
+
+## Features
+
+Each feature unlocks configuration methods on the `ServicesBuilder`, allowing you to selectively wire up what you need.
+
+| Feature | Description |
+|----------------|-----------------------------------------------------------|
+| `api` | Enables `port` configuration |
+| `cache` | Enables redis/valkey caching support |
+| `nats-core` | Enables core NATS messaging via `async-nats` |
+| `nats-jetstream`| Enables NATS JetStream support via `async-nats` |
+| `opentelemetry`| Enables distributed tracing with OpenTelemetry |
+| `postgres` | Enables PostgreSQL support using `sqlx` |
+| `tracing` | Enables tracing setup via `tracing` and `tracing-subscriber` |
+| `opentelemetry-tonic` | Enables opentelemetry injector and extractor utilities for `tonic` |
+| `tracing-loki` | Enables tracing output to Loki. |
diff --git a/lib/warden-stack/examples/tracing.rs b/lib/warden-stack/examples/tracing.rs
new file mode 100644
index 0000000..7ebe41e
--- /dev/null
+++ b/lib/warden-stack/examples/tracing.rs
@@ -0,0 +1,10 @@
+use warden_stack::{Monitoring, tracing::TracingBuilder};
+
+fn main() {
+ let config = Monitoring {
+ log_level: "info".to_string(),
+ };
+ let _tracing = TracingBuilder::default().build(&config);
+
+ tracing::info!("hello from tracing");
+}
diff --git a/lib/warden-stack/src/cache.rs b/lib/warden-stack/src/cache.rs
new file mode 100644
index 0000000..9be3778
--- /dev/null
+++ b/lib/warden-stack/src/cache.rs
@@ -0,0 +1,292 @@
+// https://github.com/svix/svix-webhooks/tree/4ede01a3209658615bb8d3153965c5c3a2e1b7ff/server/svix-server/src/redis
+pub mod cluster;
+pub mod sentinel;
+
+use std::{sync::Arc, time::Duration};
+
+use bb8::{Pool, RunError};
+use bb8_redis::RedisConnectionManager;
+use redis::{
+ AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode,
+ aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo,
+};
+use sentinel::{RedisSentinelConnectionManager, SentinelConfig};
+use serde::Deserialize;
+use tokio::sync::Mutex;
+
+use crate::{
+ ServiceError, ServicesBuilder,
+ services_builder::{IsUnset, SetCache, State},
+};
+
+pub use self::cluster::RedisClusterConnectionManager;
+
+pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2);
+
+impl<S: State> ServicesBuilder<S> {
+ pub async fn cache(
+ self,
+ config: &CacheConfig,
+ ) -> Result<ServicesBuilder<SetCache<S>>, crate::ServiceError>
+ where
+ S::Cache: IsUnset,
+ {
+ Ok(self.cache_internal(RedisManager::new(config).await?))
+ }
+}
+
+fn default_max_conns() -> u16 {
+ 100
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub struct CacheConfig {
+ #[serde(rename = "dsn")]
+ redis_dsn: Arc<str>,
+ #[serde(default)]
+ pooled: bool,
+ #[serde(rename = "type")]
+ kind: RedisVariant,
+ #[serde(default = "default_max_conns")]
+ #[serde(rename = "max-connections")]
+ max_connections: u16,
+}
+
+#[derive(Debug, Deserialize, Clone)]
+#[serde(rename_all = "kebab-case")]
+pub enum RedisVariant {
+ Clustered,
+ NonClustered,
+ Sentinel(SentinelConfig),
+}
+
+#[derive(Clone)]
+pub enum RedisManager {
+ Clustered(Pool<RedisClusterConnectionManager>),
+ NonClustered(Pool<RedisConnectionManager>),
+ Sentinel(Pool<RedisSentinelConnectionManager>),
+ ClusteredUnpooled(redis::cluster_async::ClusterConnection),
+ NonClusteredUnpooled(redis::aio::ConnectionManager),
+ SentinelUnpooled(Arc<Mutex<redis::sentinel::SentinelClient>>),
+}
+
+impl RedisManager {
+ pub async fn new(config: &CacheConfig) -> Result<Self, ServiceError> {
+ if config.pooled {
+ Self::new_pooled(
+ config.redis_dsn.as_ref(),
+ &config.kind,
+ config.max_connections,
+ )
+ .await
+ } else {
+ Self::new_unpooled(config.redis_dsn.as_ref(), &config.kind).await
+ }
+ }
+ async fn new_pooled(
+ dsn: &str,
+ variant: &RedisVariant,
+ max_conns: u16,
+ ) -> Result<Self, ServiceError> {
+ match variant {
+ RedisVariant::Clustered => {
+ let mgr = RedisClusterConnectionManager::new(dsn)?;
+ let pool = bb8::Pool::builder()
+ .max_size(max_conns.into())
+ .build(mgr)
+ .await?;
+ Ok(RedisManager::Clustered(pool))
+ }
+ RedisVariant::NonClustered => {
+ let mgr = RedisConnectionManager::new(dsn)?;
+ let pool = bb8::Pool::builder()
+ .max_size(max_conns.into())
+ .build(mgr)
+ .await?;
+ Ok(RedisManager::NonClustered(pool))
+ }
+ RedisVariant::Sentinel(cfg) => {
+ let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure);
+ let protocol = if cfg.redis_use_resp3 {
+ ProtocolVersion::RESP3
+ } else {
+ ProtocolVersion::default()
+ };
+ let mgr = RedisSentinelConnectionManager::new(
+ vec![dsn],
+ cfg.service_name.clone(),
+ Some(SentinelNodeConnectionInfo {
+ tls_mode,
+ redis_connection_info: Some(RedisConnectionInfo {
+ db: cfg.redis_db.unwrap_or(0),
+ username: cfg.redis_username.clone(),
+ password: cfg.redis_password.clone(),
+ protocol,
+ }),
+ }),
+ )?;
+ let pool = bb8::Pool::builder()
+ .max_size(max_conns.into())
+ .build(mgr)
+ .await?;
+ Ok(RedisManager::Sentinel(pool))
+ }
+ }
+ }
+
+ async fn new_unpooled(dsn: &str, variant: &RedisVariant) -> Result<Self, ServiceError> {
+ match variant {
+ RedisVariant::Clustered => {
+ let cli = redis::cluster::ClusterClient::builder(vec![dsn])
+ .retries(1)
+ .connection_timeout(REDIS_CONN_TIMEOUT)
+ .build()?;
+ let con = cli.get_async_connection().await?;
+ Ok(RedisManager::ClusteredUnpooled(con))
+ }
+ RedisVariant::NonClustered => {
+ let cli = redis::Client::open(dsn)?;
+ let con = redis::aio::ConnectionManager::new_with_config(
+ cli,
+ ConnectionManagerConfig::new()
+ .set_number_of_retries(1)
+ .set_connection_timeout(REDIS_CONN_TIMEOUT),
+ )
+ .await?;
+ Ok(RedisManager::NonClusteredUnpooled(con))
+ }
+ RedisVariant::Sentinel(cfg) => {
+ let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure);
+ let protocol = if cfg.redis_use_resp3 {
+ ProtocolVersion::RESP3
+ } else {
+ ProtocolVersion::default()
+ };
+ let cli = redis::sentinel::SentinelClient::build(
+ vec![dsn],
+ cfg.service_name.clone(),
+ Some(SentinelNodeConnectionInfo {
+ tls_mode,
+ redis_connection_info: Some(RedisConnectionInfo {
+ db: cfg.redis_db.unwrap_or(0),
+ username: cfg.redis_username.clone(),
+ password: cfg.redis_password.clone(),
+ protocol,
+ }),
+ }),
+ redis::sentinel::SentinelServerType::Master,
+ )?;
+
+ Ok(RedisManager::SentinelUnpooled(Arc::new(Mutex::new(cli))))
+ }
+ }
+ }
+
+ pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> {
+ match self {
+ Self::Clustered(pool) => Ok(RedisConnection::Clustered(pool.get().await?)),
+ Self::NonClustered(pool) => Ok(RedisConnection::NonClustered(pool.get().await?)),
+ Self::Sentinel(pool) => Ok(RedisConnection::SentinelPooled(pool.get().await?)),
+ Self::ClusteredUnpooled(conn) => Ok(RedisConnection::ClusteredUnpooled(conn.clone())),
+ Self::NonClusteredUnpooled(conn) => {
+ Ok(RedisConnection::NonClusteredUnpooled(conn.clone()))
+ }
+ Self::SentinelUnpooled(conn) => {
+ let mut conn = conn.lock().await;
+ let con = conn
+ .get_async_connection_with_config(
+ &AsyncConnectionConfig::new().set_response_timeout(REDIS_CONN_TIMEOUT),
+ )
+ .await?;
+ Ok(RedisConnection::SentinelUnpooled(con))
+ }
+ }
+ }
+}
+
+pub enum RedisConnection<'a> {
+ Clustered(bb8::PooledConnection<'a, RedisClusterConnectionManager>),
+ NonClustered(bb8::PooledConnection<'a, RedisConnectionManager>),
+ SentinelPooled(bb8::PooledConnection<'a, RedisSentinelConnectionManager>),
+ ClusteredUnpooled(redis::cluster_async::ClusterConnection),
+ NonClusteredUnpooled(redis::aio::ConnectionManager),
+ SentinelUnpooled(redis::aio::MultiplexedConnection),
+}
+
+impl redis::aio::ConnectionLike for RedisConnection<'_> {
+ fn req_packed_command<'a>(
+ &'a mut self,
+ cmd: &'a redis::Cmd,
+ ) -> redis::RedisFuture<'a, redis::Value> {
+ match self {
+ RedisConnection::Clustered(conn) => conn.req_packed_command(cmd),
+ RedisConnection::NonClustered(conn) => conn.req_packed_command(cmd),
+ RedisConnection::ClusteredUnpooled(conn) => conn.req_packed_command(cmd),
+ RedisConnection::NonClusteredUnpooled(conn) => conn.req_packed_command(cmd),
+ RedisConnection::SentinelPooled(conn) => conn.req_packed_command(cmd),
+ RedisConnection::SentinelUnpooled(conn) => conn.req_packed_command(cmd),
+ }
+ }
+
+ fn req_packed_commands<'a>(
+ &'a mut self,
+ cmd: &'a redis::Pipeline,
+ offset: usize,
+ count: usize,
+ ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
+ match self {
+ RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::NonClustered(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::ClusteredUnpooled(conn) => {
+ conn.req_packed_commands(cmd, offset, count)
+ }
+ RedisConnection::NonClusteredUnpooled(conn) => {
+ conn.req_packed_commands(cmd, offset, count)
+ }
+ RedisConnection::SentinelPooled(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::SentinelUnpooled(conn) => conn.req_packed_commands(cmd, offset, count),
+ }
+ }
+
+ fn get_db(&self) -> i64 {
+ match self {
+ RedisConnection::Clustered(conn) => conn.get_db(),
+ RedisConnection::NonClustered(conn) => conn.get_db(),
+ RedisConnection::ClusteredUnpooled(conn) => conn.get_db(),
+ RedisConnection::NonClusteredUnpooled(conn) => conn.get_db(),
+ RedisConnection::SentinelPooled(conn) => conn.get_db(),
+ RedisConnection::SentinelUnpooled(conn) => conn.get_db(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use redis::AsyncCommands;
+
+ use crate::cache::CacheConfig;
+
+ use super::RedisManager;
+
+ // Ensure basic set/get works -- should test sharding as well:
+ #[tokio::test]
+ // run with `cargo test -- --ignored redis` only when redis is up and configured
+ #[ignore]
+ async fn test_set_read_random_keys() {
+ let config = CacheConfig {
+ redis_dsn: "redis://localhost:6379".into(),
+ pooled: false,
+ kind: crate::cache::RedisVariant::NonClustered,
+ max_connections: 10,
+ };
+ let mgr = RedisManager::new(&config).await.unwrap();
+ let mut conn = mgr.get().await.unwrap();
+
+ for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() {
+ let key = key.to_string();
+ let _: () = conn.set(key.clone(), val).await.unwrap();
+ assert_eq!(conn.get::<_, usize>(&key).await.unwrap(), val);
+ }
+ }
+}
diff --git a/lib/warden-stack/src/cache/cluster.rs b/lib/warden-stack/src/cache/cluster.rs
new file mode 100644
index 0000000..91e3b24
--- /dev/null
+++ b/lib/warden-stack/src/cache/cluster.rs
@@ -0,0 +1,52 @@
+use redis::{
+ ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError,
+ cluster::{ClusterClient, ClusterClientBuilder},
+ cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo},
+};
+
+/// ConnectionManager that implements `bb8::ManageConnection` and supports
+/// asynchronous clustered connections via `redis_cluster_async::Connection`
+#[derive(Clone)]
+pub struct RedisClusterConnectionManager {
+ client: ClusterClient,
+}
+
+impl RedisClusterConnectionManager {
+ pub fn new<T: IntoConnectionInfo>(
+ info: T,
+ ) -> Result<RedisClusterConnectionManager, RedisError> {
+ Ok(RedisClusterConnectionManager {
+ client: ClusterClientBuilder::new(vec![info]).retries(0).build()?,
+ })
+ }
+}
+
+impl bb8::ManageConnection for RedisClusterConnectionManager {
+ type Connection = redis::cluster_async::ClusterConnection;
+ type Error = RedisError;
+
+ async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ self.client.get_async_connection().await
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
+ let pong = conn
+ .route_command(
+ &redis::cmd("PING"),
+ RoutingInfo::MultiNode((
+ MultipleNodeRoutingInfo::AllMasters,
+ Some(ResponsePolicy::OneSucceeded),
+ )),
+ )
+ .await
+ .and_then(|v| String::from_redis_value(&v))?;
+ match pong.as_str() {
+ "PONG" => Ok(()),
+ _ => Err((ErrorKind::ResponseError, "ping request").into()),
+ }
+ }
+
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
diff --git a/lib/warden-stack/src/cache/sentinel.rs b/lib/warden-stack/src/cache/sentinel.rs
new file mode 100644
index 0000000..c9f787a
--- /dev/null
+++ b/lib/warden-stack/src/cache/sentinel.rs
@@ -0,0 +1,65 @@
+use redis::{
+ ErrorKind, IntoConnectionInfo, RedisError,
+ sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType},
+};
+use serde::Deserialize;
+use tokio::sync::Mutex;
+
+struct LockedSentinelClient(pub(crate) Mutex<SentinelClient>);
+
+/// ConnectionManager that implements `bb8::ManageConnection` and supports
+/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient`
+pub struct RedisSentinelConnectionManager {
+ client: LockedSentinelClient,
+}
+
+impl RedisSentinelConnectionManager {
+ pub fn new<T: IntoConnectionInfo>(
+ info: Vec<T>,
+ service_name: String,
+ node_connection_info: Option<SentinelNodeConnectionInfo>,
+ ) -> Result<RedisSentinelConnectionManager, RedisError> {
+ Ok(RedisSentinelConnectionManager {
+ client: LockedSentinelClient(Mutex::new(SentinelClient::build(
+ info,
+ service_name,
+ node_connection_info,
+ SentinelServerType::Master,
+ )?)),
+ })
+ }
+}
+
+impl bb8::ManageConnection for RedisSentinelConnectionManager {
+ type Connection = redis::aio::MultiplexedConnection;
+ type Error = RedisError;
+
+ async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ self.client.0.lock().await.get_async_connection().await
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
+ let pong: String = redis::cmd("PING").query_async(conn).await?;
+ match pong.as_str() {
+ "PONG" => Ok(()),
+ _ => Err((ErrorKind::ResponseError, "ping request").into()),
+ }
+ }
+
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
+pub struct SentinelConfig {
+ #[serde(rename = "sentinel_service_name")]
+ pub service_name: String,
+ #[serde(default)]
+ pub redis_tls_mode_secure: bool,
+ pub redis_db: Option<i64>,
+ pub redis_username: Option<String>,
+ pub redis_password: Option<String>,
+ #[serde(default)]
+ pub redis_use_resp3: bool,
+}
diff --git a/lib/warden-stack/src/config.rs b/lib/warden-stack/src/config.rs
new file mode 100644
index 0000000..9f42f43
--- /dev/null
+++ b/lib/warden-stack/src/config.rs
@@ -0,0 +1,139 @@
+use serde::Deserialize;
+
+use std::{fmt::Display, sync::Arc};
+
+#[derive(Clone, Debug, Deserialize)]
+pub struct AppConfig {
+ #[serde(skip)]
+ pub name: Arc<str>,
+ #[serde(skip)]
+ pub version: Arc<str>,
+ #[serde(default)]
+ pub env: Environment,
+ #[cfg(feature = "api")]
+ #[serde(default = "default_port")]
+ pub port: u16,
+}
+
+#[cfg(feature = "api")]
+pub(crate) fn default_port() -> u16 {
+ 2210
+}
+
+#[cfg(feature = "tracing")]
+pub(crate) fn default_log() -> String {
+ #[cfg(debug_assertions)]
+ return "debug".into();
+ #[cfg(not(debug_assertions))]
+ "info".into()
+}
+
+#[derive(Clone, Debug, Deserialize)]
+pub struct Configuration {
+ pub application: AppConfig,
+ #[cfg(feature = "postgres")]
+ pub database: crate::postgres::PostgresConfig,
+ #[cfg(feature = "cache")]
+ pub cache: crate::cache::CacheConfig,
+ #[serde(default)]
+ pub misc: serde_json::Value,
+ #[cfg(feature = "tracing")]
+ pub monitoring: Monitoring,
+ #[cfg(any(feature = "nats-core", feature = "nats-jetstream"))]
+ pub nats: crate::nats::NatsConfig,
+}
+
+#[derive(Clone, Debug, Deserialize)]
+pub struct Monitoring {
+ #[serde(rename = "log-level")]
+ #[cfg(feature = "tracing")]
+ #[serde(default = "default_log")]
+ pub log_level: String,
+ #[cfg(feature = "opentelemetry")]
+ #[serde(rename = "opentelemetry-endpoint")]
+ #[serde(default = "default_opentelemetry")]
+ pub opentelemetry_endpoint: Arc<str>,
+ #[cfg(feature = "tracing-loki")]
+ #[serde(rename = "loki-endpoint")]
+ #[serde(default = "default_loki")]
+ pub loki_endpoint: Arc<str>,
+}
+
+#[cfg(feature = "tracing-loki")]
+pub(crate) fn default_loki() -> Arc<str> {
+ "http://localhost:3100".into()
+}
+
+#[cfg(feature = "opentelemetry")]
+pub(crate) fn default_opentelemetry() -> Arc<str> {
+ "http://localhost:4317".into()
+}
+
+#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Default)]
+#[cfg_attr(test, derive(serde::Serialize))]
+#[serde(rename_all = "lowercase")]
+pub enum Environment {
+ #[default]
+ Development,
+ Production,
+}
+
+impl Display for Environment {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{}",
+ match self {
+ Environment::Development => "development",
+ Environment::Production => "production",
+ }
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ // Test that the enum is correctly serialized and deserialized
+ #[test]
+ fn test_environment_serialization() {
+ // Test serialization for Development
+ let dev = Environment::Development;
+ let dev_json = serde_json::to_string(&dev).unwrap();
+ assert_eq!(dev_json, "\"development\"");
+
+ // Test serialization for Production
+ let prod = Environment::Production;
+ let prod_json = serde_json::to_string(&prod).unwrap();
+ assert_eq!(prod_json, "\"production\"");
+
+ // Test deserialization for Development
+ let dev_str = "\"development\"";
+ let deserialized_dev: Environment = serde_json::from_str(dev_str).unwrap();
+ assert_eq!(deserialized_dev, Environment::Development);
+
+ // Test deserialization for Production
+ let prod_str = "\"production\"";
+ let deserialized_prod: Environment = serde_json::from_str(prod_str).unwrap();
+ assert_eq!(deserialized_prod, Environment::Production);
+ }
+
+ // Test Display implementation
+ #[test]
+ fn test_environment_display() {
+ let dev = Environment::Development;
+ assert_eq!(format!("{}", dev), "development");
+
+ let prod = Environment::Production;
+ assert_eq!(format!("{}", prod), "production");
+ }
+
+ #[test]
+ #[cfg(feature = "api")]
+ fn test_port() {
+ let listen_address =
+ std::net::SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, default_port()));
+ assert_eq!(listen_address.port(), default_port());
+ }
+}
diff --git a/lib/warden-stack/src/lib.rs b/lib/warden-stack/src/lib.rs
new file mode 100644
index 0000000..efd6862
--- /dev/null
+++ b/lib/warden-stack/src/lib.rs
@@ -0,0 +1,95 @@
+#![cfg_attr(docsrs, feature(doc_cfg))]
+
+#[cfg(feature = "tracing")]
+#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
+pub mod tracing;
+
+#[cfg(feature = "cache")]
+#[cfg_attr(docsrs, doc(cfg(feature = "cache")))]
+pub mod cache;
+
+#[cfg(feature = "cache")]
+pub use redis;
+
+#[cfg(feature = "postgres")]
+pub use sqlx;
+
+#[cfg(any(feature = "nats-core", feature = "nats-jetstream"))]
+pub use async_nats;
+
+#[cfg(feature = "opentelemetry")]
+mod otel {
+ pub use opentelemetry;
+ pub use opentelemetry_http;
+ pub use opentelemetry_otlp;
+ pub use opentelemetry_sdk;
+ pub use opentelemetry_semantic_conventions;
+ pub use tracing_opentelemetry;
+}
+
+#[cfg(feature = "opentelemetry")]
+pub use otel::*;
+
+#[cfg(feature = "postgres")]
+#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
+pub mod postgres;
+
+#[cfg(any(feature = "nats-core", feature = "nats-jetstream"))]
+#[cfg_attr(
+ docsrs,
+ doc(cfg(any(feature = "nats-core", feature = "nats-jetstream")))
+)]
+pub mod nats;
+
+mod config;
+pub use config::*;
+
+#[derive(Clone, bon::Builder)]
+pub struct Services {
+ #[cfg(feature = "postgres")]
+ #[builder(setters(vis = "", name = pg_internal))]
+ pub postgres: Option<sqlx::PgPool>,
+ #[cfg(feature = "cache")]
+ #[builder(setters(vis = "", name = cache_internal))]
+ pub cache: Option<cache::RedisManager>,
+ #[cfg(feature = "nats-core")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "nats-core")))]
+ #[builder(setters(vis = "", name = nats_internal))]
+ /// NATS connection handle
+ pub nats: Option<async_nats::Client>,
+ #[cfg(feature = "nats-jetstream")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "nats-jetstream")))]
+ #[builder(setters(vis = "", name = jetstream_internal))]
+ /// NATS-Jetstream connection handle
+ pub jetstream: Option<async_nats::jetstream::Context>,
+}
+
+#[derive(thiserror::Error, Debug)]
+pub enum ServiceError {
+ #[error("service was not initialised")]
+ NotInitialised,
+ #[error("unknown data store error")]
+ Unknown,
+ #[error("invalid config `{0}`")]
+ Configuration(String),
+ #[cfg(feature = "postgres")]
+ #[error(transparent)]
+ /// Postgres error
+ Postgres(#[from] sqlx::Error),
+ #[cfg(feature = "cache")]
+ #[error(transparent)]
+ /// Redis error
+ Cache(#[from] redis::RedisError),
+ #[cfg(feature = "opentelemetry")]
+ #[error(transparent)]
+ /// When creating the tracing layer
+ Opentelemetry(#[from] opentelemetry_sdk::trace::TraceError),
+ #[cfg(any(feature = "nats-core", feature = "nats-jetstream"))]
+ #[error(transparent)]
+ /// NATS error
+ Nats(#[from] async_nats::error::Error<async_nats::ConnectErrorKind>),
+ #[cfg(feature = "tracing-loki")]
+ #[error(transparent)]
+ /// When creating the tracing layer
+ Loki(#[from] tracing_loki::Error),
+}
diff --git a/lib/warden-stack/src/nats.rs b/lib/warden-stack/src/nats.rs
new file mode 100644
index 0000000..952490c
--- /dev/null
+++ b/lib/warden-stack/src/nats.rs
@@ -0,0 +1,61 @@
+use std::sync::Arc;
+
+use serde::Deserialize;
+
+#[derive(Deserialize, Clone, Debug)]
+/// Nats configuration
+pub struct NatsConfig {
+ /// Hosts dsn
+ #[serde(default = "nats")]
+ pub hosts: Arc<[String]>,
+}
+
+pub(crate) fn nats() -> Arc<[String]> {
+ let hosts = vec!["nats://localhost:4222".to_string()];
+ hosts.into()
+}
+
+impl NatsConfig {
+ fn hosts(&self) -> Vec<String> {
+ self.hosts.iter().map(ToString::to_string).collect()
+ }
+}
+
+use crate::{
+ ServiceError, ServicesBuilder,
+ services_builder::{IsUnset, State},
+};
+
+#[cfg(feature = "nats-jetstream")]
+impl<S: State> ServicesBuilder<S> {
+ /// create a Jetstream Context using the provided [NatsConfig]
+ pub async fn nats_jetstream(
+ self,
+ config: &NatsConfig,
+ ) -> Result<ServicesBuilder<crate::services_builder::SetJetstream<S>>, ServiceError>
+ where
+ S::Jetstream: IsUnset,
+ {
+ let hosts = config.hosts();
+ let client = async_nats::connect(hosts).await?;
+
+ Ok(self.jetstream_internal(async_nats::jetstream::new(client)))
+ }
+}
+
+#[cfg(feature = "nats-core")]
+impl<S: State> ServicesBuilder<S> {
+ /// create a NATS connection using the provided [NatsConfig]
+ pub async fn nats(
+ self,
+ config: &NatsConfig,
+ ) -> Result<ServicesBuilder<crate::services_builder::SetNats<S>>, ServiceError>
+ where
+ S::Nats: IsUnset,
+ {
+ let hosts = config.hosts();
+ let client = async_nats::connect(hosts).await?;
+
+ Ok(self.nats_internal(client))
+ }
+}
diff --git a/lib/warden-stack/src/postgres.rs b/lib/warden-stack/src/postgres.rs
new file mode 100644
index 0000000..3264368
--- /dev/null
+++ b/lib/warden-stack/src/postgres.rs
@@ -0,0 +1,137 @@
+use std::sync::Arc;
+
+use secrecy::{ExposeSecret, SecretString};
+use serde::Deserialize;
+use url::Url;
+
+use crate::{
+ ServicesBuilder,
+ services_builder::{IsUnset, SetPostgres, State},
+};
+
+#[derive(Debug, Deserialize, Clone)]
+pub struct PostgresConfig {
+ #[serde(default = "default_pool_size")]
+ pool_size: u32,
+ #[serde(default = "default_port")]
+ port: u32,
+ name: Arc<str>,
+ host: Arc<str>,
+ #[serde(default = "user")]
+ user: Arc<str>,
+ password: SecretString,
+}
+
+fn default_pool_size() -> u32 {
+ 100
+}
+
+fn user() -> Arc<str> {
+ "postgres".into()
+}
+
+fn default_port() -> u32 {
+ 5432
+}
+
+impl PostgresConfig {
+ // Getter for size
+ pub fn pool_size(&self) -> u32 {
+ self.pool_size
+ }
+
+ // Getter for port
+ pub fn port(&self) -> u32 {
+ self.port
+ }
+
+ // Getter for name
+ pub fn name(&self) -> &str {
+ self.name.as_ref()
+ }
+
+ // Getter for host
+ pub fn host(&self) -> &str {
+ self.host.as_ref()
+ }
+
+ // Getter for username
+ pub fn username(&self) -> &str {
+ self.user.as_ref()
+ }
+
+ // Getter for password (you may want to return a reference or handle it differently)
+ pub fn password(&self) -> &SecretString {
+ &self.password
+ }
+
+ pub(crate) fn connection_string(&self) -> Result<Url, crate::ServiceError> {
+ Url::parse(&format!(
+ "postgres://{}:{}@{}:{}/{}",
+ self.user,
+ self.password.expose_secret(),
+ self.host,
+ self.port,
+ self.name
+ ))
+ .map_err(|e| crate::ServiceError::Configuration(e.to_string()))
+ }
+}
+
+impl<S: State> ServicesBuilder<S> {
+ pub async fn postgres(
+ self,
+ config: &PostgresConfig,
+ ) -> Result<ServicesBuilder<SetPostgres<S>>, crate::ServiceError>
+ where
+ S::Postgres: IsUnset,
+ {
+ let pg = sqlx::postgres::PgPoolOptions::new()
+ // The default connection limit for a Postgres server is 100 connections, with 3 reserved for superusers.
+ //
+ // If you're deploying your application with multiple replicas, then the total
+ // across all replicas should not exceed the Postgres connection limit
+ // (max_connections postgresql.conf).
+ .max_connections(config.pool_size)
+ .connect(config.connection_string()?.as_ref())
+ .await?;
+ Ok(self.pg_internal(pg))
+ }
+}
+
+#[cfg(all(test, target_os = "linux"))]
+mod test {
+ use super::*;
+ use crate::Services;
+
+ #[tokio::test]
+ async fn docker_stack_db() {
+ let port = default_port();
+ let name = "";
+ let host = "localhost";
+ let user = user();
+ let pool_size = default_pool_size();
+ let password = "postgres";
+
+ let config = PostgresConfig {
+ pool_size,
+ port,
+ name: name.into(),
+ host: host.into(),
+ user: user.clone(),
+ password: secrecy::SecretString::new(password.into()),
+ };
+
+ assert_eq!(config.name(), name);
+ assert_eq!(config.pool_size(), pool_size);
+ assert_eq!(config.username(), user.as_ref());
+ assert_eq!(config.host(), host);
+ assert_eq!(config.port(), port);
+
+ assert_eq!(config.password().expose_secret(), password);
+
+ let service = Services::builder().postgres(&config).await;
+
+ assert!(service.is_ok());
+ }
+}
diff --git a/lib/warden-stack/src/tracing.rs b/lib/warden-stack/src/tracing.rs
new file mode 100644
index 0000000..1a40f4b
--- /dev/null
+++ b/lib/warden-stack/src/tracing.rs
@@ -0,0 +1,66 @@
+#[cfg(feature = "opentelemetry")]
+pub mod telemetry;
+
+#[cfg(feature = "opentelemetry")]
+pub use opentelemetry_sdk::trace::SdkTracerProvider;
+
+#[cfg(feature = "tracing-loki")]
+mod loki;
+
+use tracing_subscriber::{
+ EnvFilter, Layer, Registry, layer::SubscriberExt, util::SubscriberInitExt,
+};
+
+/// Telemetry handle
+#[derive(bon::Builder)]
+#[builder(finish_fn(vis = "", name = build_internal))]
+pub struct Tracing {
+ #[builder(field = vec![tracing_subscriber::fmt::layer().boxed()])]
+ layers: Vec<Box<dyn Layer<Registry> + Sync + Send>>,
+ #[cfg(feature = "tracing-loki")]
+ #[builder(setters(vis = "", name = loki_internal))]
+ pub loki_task: tracing_loki::BackgroundTask,
+ #[cfg(feature = "opentelemetry")]
+ #[builder(setters(vis = "", name = otel_internal))]
+ pub otel_provider: opentelemetry_sdk::trace::SdkTracerProvider,
+}
+
+// Define a custom finishing function as a method on the `UserBuilder`.
+// The builder's state must implement the `IsComplete` trait.
+// See details about it in the tip below this example.
+impl<S: tracing_builder::IsComplete> TracingBuilder<S> {
+ pub fn build(self, config: &crate::Monitoring) -> Tracing {
+ // Delegate to `build_internal()` to get the instance of user.
+ let mut tracing = self.build_internal();
+
+ let layers = std::mem::take(&mut tracing.layers);
+ tracing_subscriber::registry()
+ .with(layers)
+ .with(
+ EnvFilter::try_from_default_env()
+ .unwrap_or_else(|_| config.log_level.to_string().into()),
+ )
+ .try_init()
+ .ok();
+ tracing
+ }
+}
+
+// #[cfg(test)]
+// mod tests {
+// use super::*;
+//
+// #[test]
+// fn build() {
+// let builder = Tracing::builder().build();
+// let level = crate::Monitoring {
+// log_level: "info".to_string(),
+// #[cfg(feature = "opentelemetry")]
+// opentelemetry_endpoint: "http://localhost:4317".into(),
+// #[cfg(feature = "tracing-loki")]
+// loki_endpoint: "http://localhost:3100".into(),
+// };
+// builder.init(&level);
+// builder.loki_task
+// }
+// }
diff --git a/lib/warden-stack/src/tracing/loki.rs b/lib/warden-stack/src/tracing/loki.rs
new file mode 100644
index 0000000..cbf4e40
--- /dev/null
+++ b/lib/warden-stack/src/tracing/loki.rs
@@ -0,0 +1,29 @@
+use crate::Monitoring;
+
+use super::TracingBuilder;
+use super::tracing_builder::{IsUnset, SetLokiTask, State};
+use tracing_subscriber::Layer;
+
+impl<S: State> TracingBuilder<S> {
+ pub fn loki(
+ mut self,
+ config: &crate::AppConfig,
+ monitoring: &Monitoring,
+ ) -> Result<TracingBuilder<SetLokiTask<S>>, crate::ServiceError>
+ where
+ S::LokiTask: IsUnset,
+ {
+ use std::str::FromStr;
+ let url = FromStr::from_str(&monitoring.loki_endpoint.as_ref())
+ .map_err(|_e| crate::ServiceError::Unknown)?;
+
+ let (layer, task) = tracing_loki::builder()
+ .label("service_name", config.name.as_ref())?
+ .extra_field("pid", format!("{}", std::process::id()))?
+ .build_url(url)?;
+
+ self.layers.push(layer.boxed());
+
+ Ok(self.loki_internal(task))
+ }
+}
diff --git a/lib/warden-stack/src/tracing/telemetry.rs b/lib/warden-stack/src/tracing/telemetry.rs
new file mode 100644
index 0000000..b024937
--- /dev/null
+++ b/lib/warden-stack/src/tracing/telemetry.rs
@@ -0,0 +1,137 @@
+#[cfg(any(feature = "nats-jetstream", feature = "nats-core"))]
+pub mod nats {
+ pub mod extractor {
+ pub struct HeaderMap<'a>(pub &'a async_nats::HeaderMap);
+
+ impl opentelemetry::propagation::Extractor for HeaderMap<'_> {
+ fn get(&self, key: &str) -> Option<&str> {
+ self.0
+ .get(async_nats::header::IntoHeaderName::into_header_name(key))
+ .map(|value| value.as_str())
+ }
+
+ fn keys(&self) -> Vec<&str> {
+ self.0.iter().map(|(n, _v)| n.as_ref()).collect()
+ }
+ }
+ }
+
+ pub mod injector {
+ pub struct HeaderMap<'a>(pub &'a mut async_nats::HeaderMap);
+
+ impl opentelemetry::propagation::Injector for HeaderMap<'_> {
+ fn set(&mut self, key: &str, value: String) {
+ self.0.insert(key, value);
+ }
+ }
+ }
+}
+
+#[cfg(feature = "opentelemetry-tonic")]
+pub mod tonic {
+ pub mod extractor {
+ pub struct MetadataMap<'a>(pub &'a tonic::metadata::MetadataMap);
+ impl opentelemetry::propagation::Extractor for MetadataMap<'_> {
+ fn get(&self, key: &str) -> Option<&str> {
+ self.0.get(key).and_then(|metadata| metadata.to_str().ok())
+ }
+
+ /// Collect all the keys from the MetadataMap.
+ fn keys(&self) -> Vec<&str> {
+ self.0
+ .keys()
+ .map(|key| match key {
+ tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
+ tonic::metadata::KeyRef::Binary(v) => v.as_str(),
+ })
+ .collect::<Vec<_>>()
+ }
+ }
+ }
+
+ pub mod injector {
+ pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap);
+
+ impl opentelemetry::propagation::Injector for MetadataMap<'_> {
+ /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs
+ fn set(&mut self, key: &str, value: String) {
+ if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
+ if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) {
+ self.0.insert(key, val);
+ }
+ }
+ }
+ }
+ }
+}
+
+use crate::Monitoring;
+
+use super::TracingBuilder;
+use super::tracing_builder::{IsUnset, SetOtelProvider, State};
+use tracing_subscriber::Layer;
+
+impl<S: State> TracingBuilder<S> {
+ pub fn opentelemetry(
+ mut self,
+ config: &crate::AppConfig,
+ monitoring: &Monitoring,
+ ) -> Result<TracingBuilder<SetOtelProvider<S>>, crate::ServiceError>
+ where
+ S::OtelProvider: IsUnset,
+ {
+ use opentelemetry::{
+ KeyValue,
+ global::{self},
+ trace::TracerProvider,
+ };
+ use opentelemetry_otlp::WithExportConfig;
+ use opentelemetry_sdk::{
+ Resource,
+ trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
+ };
+ use opentelemetry_semantic_conventions::{
+ SCHEMA_URL,
+ resource::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION},
+ };
+ use tracing_opentelemetry::OpenTelemetryLayer;
+
+ global::set_text_map_propagator(
+ opentelemetry_sdk::propagation::TraceContextPropagator::new(),
+ );
+
+ let resource = Resource::builder()
+ .with_schema_url(
+ [
+ KeyValue::new(SERVICE_NAME, config.name.to_owned()),
+ KeyValue::new(SERVICE_VERSION, config.version.to_owned()),
+ KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.env.to_string()),
+ ],
+ SCHEMA_URL,
+ )
+ .with_service_name(config.name.to_string())
+ .build();
+
+ let exporter = opentelemetry_otlp::SpanExporter::builder()
+ .with_tonic()
+ .with_endpoint(monitoring.opentelemetry_endpoint.as_ref())
+ .build()
+ .unwrap();
+
+ let provider = SdkTracerProvider::builder()
+ .with_batch_exporter(exporter)
+ .with_resource(resource)
+ .with_id_generator(RandomIdGenerator::default())
+ .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
+ 1.0,
+ ))))
+ .build();
+
+ global::set_tracer_provider(provider.clone());
+
+ let layer = OpenTelemetryLayer::new(provider.tracer(config.name.as_ref().to_string()));
+ self.layers.push(layer.boxed());
+
+ Ok(self.otel_internal(provider))
+ }
+}