diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/warden-core/build.rs | 10 | ||||
-rw-r--r-- | lib/warden-stack/Cargo.toml | 75 | ||||
-rw-r--r-- | lib/warden-stack/LICENSE-APACHE | 201 | ||||
-rw-r--r-- | lib/warden-stack/LICENSE-MIT | 21 | ||||
-rw-r--r-- | lib/warden-stack/README.md | 22 | ||||
-rw-r--r-- | lib/warden-stack/examples/tracing.rs | 10 | ||||
-rw-r--r-- | lib/warden-stack/src/cache.rs | 292 | ||||
-rw-r--r-- | lib/warden-stack/src/cache/cluster.rs | 52 | ||||
-rw-r--r-- | lib/warden-stack/src/cache/sentinel.rs | 65 | ||||
-rw-r--r-- | lib/warden-stack/src/config.rs | 139 | ||||
-rw-r--r-- | lib/warden-stack/src/lib.rs | 95 | ||||
-rw-r--r-- | lib/warden-stack/src/nats.rs | 61 | ||||
-rw-r--r-- | lib/warden-stack/src/postgres.rs | 137 | ||||
-rw-r--r-- | lib/warden-stack/src/tracing.rs | 66 | ||||
-rw-r--r-- | lib/warden-stack/src/tracing/loki.rs | 29 | ||||
-rw-r--r-- | lib/warden-stack/src/tracing/telemetry.rs | 137 |
16 files changed, 1404 insertions, 8 deletions
diff --git a/lib/warden-core/build.rs b/lib/warden-core/build.rs index 57e20e0..6d5efbb 100644 --- a/lib/warden-core/build.rs +++ b/lib/warden-core/build.rs @@ -9,17 +9,11 @@ enum Entity { #[cfg(any(feature = "message", feature = "pseudonyms"))] impl Entity { fn protos(&self) -> Vec<&'static str> { - let mut res: Vec<&'static str> = vec![ - // "proto/googleapis/google/type/date.proto", - // "proto/googleapis/google/type/money.proto", - // "proto/googleapis/google/type/latlng.proto", - ]; + let mut res: Vec<&'static str> = vec![]; #[cfg(feature = "message")] fn iso20022_protos() -> Vec<&'static str> { vec![ - // "proto/iso20022/pacs_008_001_12.proto", - // "proto/iso20022/pacs_002_001_12.proto", "proto/warden_message.proto", ] } @@ -60,7 +54,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> { #[cfg(feature = "pseudonyms")] protos.extend(Entity::Pseudonyms.protos()); -#[cfg(any(feature = "message", feature = "pseudonyms"))] + #[cfg(any(feature = "message", feature = "pseudonyms"))] build_proto(&protos)?; Ok(()) diff --git a/lib/warden-stack/Cargo.toml b/lib/warden-stack/Cargo.toml new file mode 100644 index 0000000..d7c1eb8 --- /dev/null +++ b/lib/warden-stack/Cargo.toml @@ -0,0 +1,75 @@ +[package] +name = "warden-stack" +version = "0.1.0" +edition = "2024" +license = "MIT OR Apache-2.0" + +[dependencies] +async-nats = { workspace = true, optional = true } +bb8 = { version = "0.9.0", optional = true } +bb8-redis = { version = "0.24.0", optional = true } +bon.workspace = true +opentelemetry = { workspace = true, optional = true } +opentelemetry-http = { workspace = true, optional = true } +opentelemetry-otlp = { workspace = true, optional = true } +opentelemetry-semantic-conventions = { workspace = true, optional = true } +opentelemetry_sdk = { workspace = true, optional = true } +redis = { workspace = true, optional = true } +secrecy = { workspace = true, optional = true } +serde = { workspace = true, features = ["derive", "rc"] } +serde_json.workspace = true +sqlx = { workspace = true, optional = true } +thiserror.workspace = true +tokio = { workspace = true, optional = true } +tonic = { workspace = true, optional = true } +tracing = { workspace = true, optional = true } +tracing-loki = { version = "0.2.6", optional = true, default-features = false, features = ["compat-0-2-1", "rustls"] } +tracing-opentelemetry = { workspace = true, optional = true } +tracing-subscriber = { version = "0.3.19", optional = true } +url = { workspace = true, optional = true } + +[features] +default = [] +api = [] +cache = [ + "dep:redis", + "redis/cluster-async", + "redis/connection-manager", + "redis/tokio-comp", + "redis/sentinel", + "tokio/sync", + "dep:bb8", + "dep:bb8-redis", + "url/serde", +] +nats-core = ["dep:async-nats"] +nats-jetstream = ["dep:async-nats"] +opentelemetry = [ + "dep:opentelemetry", + "dep:tracing-opentelemetry", + "tracing", + "opentelemetry_sdk/rt-tokio", + "opentelemetry_sdk/trace", + "opentelemetry/trace", + "opentelemetry-http", + "opentelemetry-otlp/grpc-tonic", + "opentelemetry-otlp/http-proto", + "opentelemetry-semantic-conventions/semconv_experimental", +] +postgres = ["sqlx/postgres", "url/serde", "secrecy/serde"] +tracing = ["dep:tracing", "tracing-subscriber/env-filter"] +opentelemetry-tonic = ["dep:tonic"] +tracing-loki = ["dep:tracing-loki", "tracing"] + +[[example]] +name = "tracing" +path = "examples/tracing.rs" +required-features = ["tracing"] + +[dev-dependencies] +tokio = { version = "*", features = ["macros", "rt"] } +sqlx = { version = "*", features = ["runtime-tokio"] } + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] diff --git a/lib/warden-stack/LICENSE-APACHE b/lib/warden-stack/LICENSE-APACHE new file mode 100644 index 0000000..4c986ff --- /dev/null +++ b/lib/warden-stack/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2025 rtkay + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/lib/warden-stack/LICENSE-MIT b/lib/warden-stack/LICENSE-MIT new file mode 100644 index 0000000..7eed760 --- /dev/null +++ b/lib/warden-stack/LICENSE-MIT @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 rtkay + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/lib/warden-stack/README.md b/lib/warden-stack/README.md new file mode 100644 index 0000000..12f9f3a --- /dev/null +++ b/lib/warden-stack/README.md @@ -0,0 +1,22 @@ +warden-stack! + +A crate centralising the configuration and initialisation of various services I find myself using in a lot of projects. + +# TL:DR +Find a feature, activate it, initialise it in your builder, then you're good to go + +## Features + +Each feature unlocks configuration methods on the `ServicesBuilder`, allowing you to selectively wire up what you need. + +| Feature | Description | +|----------------|-----------------------------------------------------------| +| `api` | Enables `port` configuration | +| `cache` | Enables redis/valkey caching support | +| `nats-core` | Enables core NATS messaging via `async-nats` | +| `nats-jetstream`| Enables NATS JetStream support via `async-nats` | +| `opentelemetry`| Enables distributed tracing with OpenTelemetry | +| `postgres` | Enables PostgreSQL support using `sqlx` | +| `tracing` | Enables tracing setup via `tracing` and `tracing-subscriber` | +| `opentelemetry-tonic` | Enables opentelemetry injector and extractor utilities for `tonic` | +| `tracing-loki` | Enables tracing output to Loki. | diff --git a/lib/warden-stack/examples/tracing.rs b/lib/warden-stack/examples/tracing.rs new file mode 100644 index 0000000..7ebe41e --- /dev/null +++ b/lib/warden-stack/examples/tracing.rs @@ -0,0 +1,10 @@ +use warden_stack::{Monitoring, tracing::TracingBuilder}; + +fn main() { + let config = Monitoring { + log_level: "info".to_string(), + }; + let _tracing = TracingBuilder::default().build(&config); + + tracing::info!("hello from tracing"); +} diff --git a/lib/warden-stack/src/cache.rs b/lib/warden-stack/src/cache.rs new file mode 100644 index 0000000..9be3778 --- /dev/null +++ b/lib/warden-stack/src/cache.rs @@ -0,0 +1,292 @@ +// https://github.com/svix/svix-webhooks/tree/4ede01a3209658615bb8d3153965c5c3a2e1b7ff/server/svix-server/src/redis +pub mod cluster; +pub mod sentinel; + +use std::{sync::Arc, time::Duration}; + +use bb8::{Pool, RunError}; +use bb8_redis::RedisConnectionManager; +use redis::{ + AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode, + aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo, +}; +use sentinel::{RedisSentinelConnectionManager, SentinelConfig}; +use serde::Deserialize; +use tokio::sync::Mutex; + +use crate::{ + ServiceError, ServicesBuilder, + services_builder::{IsUnset, SetCache, State}, +}; + +pub use self::cluster::RedisClusterConnectionManager; + +pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2); + +impl<S: State> ServicesBuilder<S> { + pub async fn cache( + self, + config: &CacheConfig, + ) -> Result<ServicesBuilder<SetCache<S>>, crate::ServiceError> + where + S::Cache: IsUnset, + { + Ok(self.cache_internal(RedisManager::new(config).await?)) + } +} + +fn default_max_conns() -> u16 { + 100 +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "lowercase")] +pub struct CacheConfig { + #[serde(rename = "dsn")] + redis_dsn: Arc<str>, + #[serde(default)] + pooled: bool, + #[serde(rename = "type")] + kind: RedisVariant, + #[serde(default = "default_max_conns")] + #[serde(rename = "max-connections")] + max_connections: u16, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub enum RedisVariant { + Clustered, + NonClustered, + Sentinel(SentinelConfig), +} + +#[derive(Clone)] +pub enum RedisManager { + Clustered(Pool<RedisClusterConnectionManager>), + NonClustered(Pool<RedisConnectionManager>), + Sentinel(Pool<RedisSentinelConnectionManager>), + ClusteredUnpooled(redis::cluster_async::ClusterConnection), + NonClusteredUnpooled(redis::aio::ConnectionManager), + SentinelUnpooled(Arc<Mutex<redis::sentinel::SentinelClient>>), +} + +impl RedisManager { + pub async fn new(config: &CacheConfig) -> Result<Self, ServiceError> { + if config.pooled { + Self::new_pooled( + config.redis_dsn.as_ref(), + &config.kind, + config.max_connections, + ) + .await + } else { + Self::new_unpooled(config.redis_dsn.as_ref(), &config.kind).await + } + } + async fn new_pooled( + dsn: &str, + variant: &RedisVariant, + max_conns: u16, + ) -> Result<Self, ServiceError> { + match variant { + RedisVariant::Clustered => { + let mgr = RedisClusterConnectionManager::new(dsn)?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::Clustered(pool)) + } + RedisVariant::NonClustered => { + let mgr = RedisConnectionManager::new(dsn)?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::NonClustered(pool)) + } + RedisVariant::Sentinel(cfg) => { + let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure); + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + let mgr = RedisSentinelConnectionManager::new( + vec![dsn], + cfg.service_name.clone(), + Some(SentinelNodeConnectionInfo { + tls_mode, + redis_connection_info: Some(RedisConnectionInfo { + db: cfg.redis_db.unwrap_or(0), + username: cfg.redis_username.clone(), + password: cfg.redis_password.clone(), + protocol, + }), + }), + )?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::Sentinel(pool)) + } + } + } + + async fn new_unpooled(dsn: &str, variant: &RedisVariant) -> Result<Self, ServiceError> { + match variant { + RedisVariant::Clustered => { + let cli = redis::cluster::ClusterClient::builder(vec![dsn]) + .retries(1) + .connection_timeout(REDIS_CONN_TIMEOUT) + .build()?; + let con = cli.get_async_connection().await?; + Ok(RedisManager::ClusteredUnpooled(con)) + } + RedisVariant::NonClustered => { + let cli = redis::Client::open(dsn)?; + let con = redis::aio::ConnectionManager::new_with_config( + cli, + ConnectionManagerConfig::new() + .set_number_of_retries(1) + .set_connection_timeout(REDIS_CONN_TIMEOUT), + ) + .await?; + Ok(RedisManager::NonClusteredUnpooled(con)) + } + RedisVariant::Sentinel(cfg) => { + let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure); + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + let cli = redis::sentinel::SentinelClient::build( + vec![dsn], + cfg.service_name.clone(), + Some(SentinelNodeConnectionInfo { + tls_mode, + redis_connection_info: Some(RedisConnectionInfo { + db: cfg.redis_db.unwrap_or(0), + username: cfg.redis_username.clone(), + password: cfg.redis_password.clone(), + protocol, + }), + }), + redis::sentinel::SentinelServerType::Master, + )?; + + Ok(RedisManager::SentinelUnpooled(Arc::new(Mutex::new(cli)))) + } + } + } + + pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> { + match self { + Self::Clustered(pool) => Ok(RedisConnection::Clustered(pool.get().await?)), + Self::NonClustered(pool) => Ok(RedisConnection::NonClustered(pool.get().await?)), + Self::Sentinel(pool) => Ok(RedisConnection::SentinelPooled(pool.get().await?)), + Self::ClusteredUnpooled(conn) => Ok(RedisConnection::ClusteredUnpooled(conn.clone())), + Self::NonClusteredUnpooled(conn) => { + Ok(RedisConnection::NonClusteredUnpooled(conn.clone())) + } + Self::SentinelUnpooled(conn) => { + let mut conn = conn.lock().await; + let con = conn + .get_async_connection_with_config( + &AsyncConnectionConfig::new().set_response_timeout(REDIS_CONN_TIMEOUT), + ) + .await?; + Ok(RedisConnection::SentinelUnpooled(con)) + } + } + } +} + +pub enum RedisConnection<'a> { + Clustered(bb8::PooledConnection<'a, RedisClusterConnectionManager>), + NonClustered(bb8::PooledConnection<'a, RedisConnectionManager>), + SentinelPooled(bb8::PooledConnection<'a, RedisSentinelConnectionManager>), + ClusteredUnpooled(redis::cluster_async::ClusterConnection), + NonClusteredUnpooled(redis::aio::ConnectionManager), + SentinelUnpooled(redis::aio::MultiplexedConnection), +} + +impl redis::aio::ConnectionLike for RedisConnection<'_> { + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_command(cmd), + RedisConnection::NonClustered(conn) => conn.req_packed_command(cmd), + RedisConnection::ClusteredUnpooled(conn) => conn.req_packed_command(cmd), + RedisConnection::NonClusteredUnpooled(conn) => conn.req_packed_command(cmd), + RedisConnection::SentinelPooled(conn) => conn.req_packed_command(cmd), + RedisConnection::SentinelUnpooled(conn) => conn.req_packed_command(cmd), + } + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec<redis::Value>> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::NonClustered(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::ClusteredUnpooled(conn) => { + conn.req_packed_commands(cmd, offset, count) + } + RedisConnection::NonClusteredUnpooled(conn) => { + conn.req_packed_commands(cmd, offset, count) + } + RedisConnection::SentinelPooled(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::SentinelUnpooled(conn) => conn.req_packed_commands(cmd, offset, count), + } + } + + fn get_db(&self) -> i64 { + match self { + RedisConnection::Clustered(conn) => conn.get_db(), + RedisConnection::NonClustered(conn) => conn.get_db(), + RedisConnection::ClusteredUnpooled(conn) => conn.get_db(), + RedisConnection::NonClusteredUnpooled(conn) => conn.get_db(), + RedisConnection::SentinelPooled(conn) => conn.get_db(), + RedisConnection::SentinelUnpooled(conn) => conn.get_db(), + } + } +} + +#[cfg(test)] +mod tests { + use redis::AsyncCommands; + + use crate::cache::CacheConfig; + + use super::RedisManager; + + // Ensure basic set/get works -- should test sharding as well: + #[tokio::test] + // run with `cargo test -- --ignored redis` only when redis is up and configured + #[ignore] + async fn test_set_read_random_keys() { + let config = CacheConfig { + redis_dsn: "redis://localhost:6379".into(), + pooled: false, + kind: crate::cache::RedisVariant::NonClustered, + max_connections: 10, + }; + let mgr = RedisManager::new(&config).await.unwrap(); + let mut conn = mgr.get().await.unwrap(); + + for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() { + let key = key.to_string(); + let _: () = conn.set(key.clone(), val).await.unwrap(); + assert_eq!(conn.get::<_, usize>(&key).await.unwrap(), val); + } + } +} diff --git a/lib/warden-stack/src/cache/cluster.rs b/lib/warden-stack/src/cache/cluster.rs new file mode 100644 index 0000000..91e3b24 --- /dev/null +++ b/lib/warden-stack/src/cache/cluster.rs @@ -0,0 +1,52 @@ +use redis::{ + ErrorKind, FromRedisValue, IntoConnectionInfo, RedisError, + cluster::{ClusterClient, ClusterClientBuilder}, + cluster_routing::{MultipleNodeRoutingInfo, ResponsePolicy, RoutingInfo}, +}; + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous clustered connections via `redis_cluster_async::Connection` +#[derive(Clone)] +pub struct RedisClusterConnectionManager { + client: ClusterClient, +} + +impl RedisClusterConnectionManager { + pub fn new<T: IntoConnectionInfo>( + info: T, + ) -> Result<RedisClusterConnectionManager, RedisError> { + Ok(RedisClusterConnectionManager { + client: ClusterClientBuilder::new(vec![info]).retries(0).build()?, + }) + } +} + +impl bb8::ManageConnection for RedisClusterConnectionManager { + type Connection = redis::cluster_async::ClusterConnection; + type Error = RedisError; + + async fn connect(&self) -> Result<Self::Connection, Self::Error> { + self.client.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let pong = conn + .route_command( + &redis::cmd("PING"), + RoutingInfo::MultiNode(( + MultipleNodeRoutingInfo::AllMasters, + Some(ResponsePolicy::OneSucceeded), + )), + ) + .await + .and_then(|v| String::from_redis_value(&v))?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err((ErrorKind::ResponseError, "ping request").into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} diff --git a/lib/warden-stack/src/cache/sentinel.rs b/lib/warden-stack/src/cache/sentinel.rs new file mode 100644 index 0000000..c9f787a --- /dev/null +++ b/lib/warden-stack/src/cache/sentinel.rs @@ -0,0 +1,65 @@ +use redis::{ + ErrorKind, IntoConnectionInfo, RedisError, + sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType}, +}; +use serde::Deserialize; +use tokio::sync::Mutex; + +struct LockedSentinelClient(pub(crate) Mutex<SentinelClient>); + +/// ConnectionManager that implements `bb8::ManageConnection` and supports +/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient` +pub struct RedisSentinelConnectionManager { + client: LockedSentinelClient, +} + +impl RedisSentinelConnectionManager { + pub fn new<T: IntoConnectionInfo>( + info: Vec<T>, + service_name: String, + node_connection_info: Option<SentinelNodeConnectionInfo>, + ) -> Result<RedisSentinelConnectionManager, RedisError> { + Ok(RedisSentinelConnectionManager { + client: LockedSentinelClient(Mutex::new(SentinelClient::build( + info, + service_name, + node_connection_info, + SentinelServerType::Master, + )?)), + }) + } +} + +impl bb8::ManageConnection for RedisSentinelConnectionManager { + type Connection = redis::aio::MultiplexedConnection; + type Error = RedisError; + + async fn connect(&self) -> Result<Self::Connection, Self::Error> { + self.client.0.lock().await.get_async_connection().await + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + let pong: String = redis::cmd("PING").query_async(conn).await?; + match pong.as_str() { + "PONG" => Ok(()), + _ => Err((ErrorKind::ResponseError, "ping request").into()), + } + } + + fn has_broken(&self, _: &mut Self::Connection) -> bool { + false + } +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +pub struct SentinelConfig { + #[serde(rename = "sentinel_service_name")] + pub service_name: String, + #[serde(default)] + pub redis_tls_mode_secure: bool, + pub redis_db: Option<i64>, + pub redis_username: Option<String>, + pub redis_password: Option<String>, + #[serde(default)] + pub redis_use_resp3: bool, +} diff --git a/lib/warden-stack/src/config.rs b/lib/warden-stack/src/config.rs new file mode 100644 index 0000000..9f42f43 --- /dev/null +++ b/lib/warden-stack/src/config.rs @@ -0,0 +1,139 @@ +use serde::Deserialize; + +use std::{fmt::Display, sync::Arc}; + +#[derive(Clone, Debug, Deserialize)] +pub struct AppConfig { + #[serde(skip)] + pub name: Arc<str>, + #[serde(skip)] + pub version: Arc<str>, + #[serde(default)] + pub env: Environment, + #[cfg(feature = "api")] + #[serde(default = "default_port")] + pub port: u16, +} + +#[cfg(feature = "api")] +pub(crate) fn default_port() -> u16 { + 2210 +} + +#[cfg(feature = "tracing")] +pub(crate) fn default_log() -> String { + #[cfg(debug_assertions)] + return "debug".into(); + #[cfg(not(debug_assertions))] + "info".into() +} + +#[derive(Clone, Debug, Deserialize)] +pub struct Configuration { + pub application: AppConfig, + #[cfg(feature = "postgres")] + pub database: crate::postgres::PostgresConfig, + #[cfg(feature = "cache")] + pub cache: crate::cache::CacheConfig, + #[serde(default)] + pub misc: serde_json::Value, + #[cfg(feature = "tracing")] + pub monitoring: Monitoring, + #[cfg(any(feature = "nats-core", feature = "nats-jetstream"))] + pub nats: crate::nats::NatsConfig, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct Monitoring { + #[serde(rename = "log-level")] + #[cfg(feature = "tracing")] + #[serde(default = "default_log")] + pub log_level: String, + #[cfg(feature = "opentelemetry")] + #[serde(rename = "opentelemetry-endpoint")] + #[serde(default = "default_opentelemetry")] + pub opentelemetry_endpoint: Arc<str>, + #[cfg(feature = "tracing-loki")] + #[serde(rename = "loki-endpoint")] + #[serde(default = "default_loki")] + pub loki_endpoint: Arc<str>, +} + +#[cfg(feature = "tracing-loki")] +pub(crate) fn default_loki() -> Arc<str> { + "http://localhost:3100".into() +} + +#[cfg(feature = "opentelemetry")] +pub(crate) fn default_opentelemetry() -> Arc<str> { + "http://localhost:4317".into() +} + +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Default)] +#[cfg_attr(test, derive(serde::Serialize))] +#[serde(rename_all = "lowercase")] +pub enum Environment { + #[default] + Development, + Production, +} + +impl Display for Environment { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + Environment::Development => "development", + Environment::Production => "production", + } + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Test that the enum is correctly serialized and deserialized + #[test] + fn test_environment_serialization() { + // Test serialization for Development + let dev = Environment::Development; + let dev_json = serde_json::to_string(&dev).unwrap(); + assert_eq!(dev_json, "\"development\""); + + // Test serialization for Production + let prod = Environment::Production; + let prod_json = serde_json::to_string(&prod).unwrap(); + assert_eq!(prod_json, "\"production\""); + + // Test deserialization for Development + let dev_str = "\"development\""; + let deserialized_dev: Environment = serde_json::from_str(dev_str).unwrap(); + assert_eq!(deserialized_dev, Environment::Development); + + // Test deserialization for Production + let prod_str = "\"production\""; + let deserialized_prod: Environment = serde_json::from_str(prod_str).unwrap(); + assert_eq!(deserialized_prod, Environment::Production); + } + + // Test Display implementation + #[test] + fn test_environment_display() { + let dev = Environment::Development; + assert_eq!(format!("{}", dev), "development"); + + let prod = Environment::Production; + assert_eq!(format!("{}", prod), "production"); + } + + #[test] + #[cfg(feature = "api")] + fn test_port() { + let listen_address = + std::net::SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, default_port())); + assert_eq!(listen_address.port(), default_port()); + } +} diff --git a/lib/warden-stack/src/lib.rs b/lib/warden-stack/src/lib.rs new file mode 100644 index 0000000..efd6862 --- /dev/null +++ b/lib/warden-stack/src/lib.rs @@ -0,0 +1,95 @@ +#![cfg_attr(docsrs, feature(doc_cfg))] + +#[cfg(feature = "tracing")] +#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))] +pub mod tracing; + +#[cfg(feature = "cache")] +#[cfg_attr(docsrs, doc(cfg(feature = "cache")))] +pub mod cache; + +#[cfg(feature = "cache")] +pub use redis; + +#[cfg(feature = "postgres")] +pub use sqlx; + +#[cfg(any(feature = "nats-core", feature = "nats-jetstream"))] +pub use async_nats; + +#[cfg(feature = "opentelemetry")] +mod otel { + pub use opentelemetry; + pub use opentelemetry_http; + pub use opentelemetry_otlp; + pub use opentelemetry_sdk; + pub use opentelemetry_semantic_conventions; + pub use tracing_opentelemetry; +} + +#[cfg(feature = "opentelemetry")] +pub use otel::*; + +#[cfg(feature = "postgres")] +#[cfg_attr(docsrs, doc(cfg(feature = "postgres")))] +pub mod postgres; + +#[cfg(any(feature = "nats-core", feature = "nats-jetstream"))] +#[cfg_attr( + docsrs, + doc(cfg(any(feature = "nats-core", feature = "nats-jetstream"))) +)] +pub mod nats; + +mod config; +pub use config::*; + +#[derive(Clone, bon::Builder)] +pub struct Services { + #[cfg(feature = "postgres")] + #[builder(setters(vis = "", name = pg_internal))] + pub postgres: Option<sqlx::PgPool>, + #[cfg(feature = "cache")] + #[builder(setters(vis = "", name = cache_internal))] + pub cache: Option<cache::RedisManager>, + #[cfg(feature = "nats-core")] + #[cfg_attr(docsrs, doc(cfg(feature = "nats-core")))] + #[builder(setters(vis = "", name = nats_internal))] + /// NATS connection handle + pub nats: Option<async_nats::Client>, + #[cfg(feature = "nats-jetstream")] + #[cfg_attr(docsrs, doc(cfg(feature = "nats-jetstream")))] + #[builder(setters(vis = "", name = jetstream_internal))] + /// NATS-Jetstream connection handle + pub jetstream: Option<async_nats::jetstream::Context>, +} + +#[derive(thiserror::Error, Debug)] +pub enum ServiceError { + #[error("service was not initialised")] + NotInitialised, + #[error("unknown data store error")] + Unknown, + #[error("invalid config `{0}`")] + Configuration(String), + #[cfg(feature = "postgres")] + #[error(transparent)] + /// Postgres error + Postgres(#[from] sqlx::Error), + #[cfg(feature = "cache")] + #[error(transparent)] + /// Redis error + Cache(#[from] redis::RedisError), + #[cfg(feature = "opentelemetry")] + #[error(transparent)] + /// When creating the tracing layer + Opentelemetry(#[from] opentelemetry_sdk::trace::TraceError), + #[cfg(any(feature = "nats-core", feature = "nats-jetstream"))] + #[error(transparent)] + /// NATS error + Nats(#[from] async_nats::error::Error<async_nats::ConnectErrorKind>), + #[cfg(feature = "tracing-loki")] + #[error(transparent)] + /// When creating the tracing layer + Loki(#[from] tracing_loki::Error), +} diff --git a/lib/warden-stack/src/nats.rs b/lib/warden-stack/src/nats.rs new file mode 100644 index 0000000..952490c --- /dev/null +++ b/lib/warden-stack/src/nats.rs @@ -0,0 +1,61 @@ +use std::sync::Arc; + +use serde::Deserialize; + +#[derive(Deserialize, Clone, Debug)] +/// Nats configuration +pub struct NatsConfig { + /// Hosts dsn + #[serde(default = "nats")] + pub hosts: Arc<[String]>, +} + +pub(crate) fn nats() -> Arc<[String]> { + let hosts = vec!["nats://localhost:4222".to_string()]; + hosts.into() +} + +impl NatsConfig { + fn hosts(&self) -> Vec<String> { + self.hosts.iter().map(ToString::to_string).collect() + } +} + +use crate::{ + ServiceError, ServicesBuilder, + services_builder::{IsUnset, State}, +}; + +#[cfg(feature = "nats-jetstream")] +impl<S: State> ServicesBuilder<S> { + /// create a Jetstream Context using the provided [NatsConfig] + pub async fn nats_jetstream( + self, + config: &NatsConfig, + ) -> Result<ServicesBuilder<crate::services_builder::SetJetstream<S>>, ServiceError> + where + S::Jetstream: IsUnset, + { + let hosts = config.hosts(); + let client = async_nats::connect(hosts).await?; + + Ok(self.jetstream_internal(async_nats::jetstream::new(client))) + } +} + +#[cfg(feature = "nats-core")] +impl<S: State> ServicesBuilder<S> { + /// create a NATS connection using the provided [NatsConfig] + pub async fn nats( + self, + config: &NatsConfig, + ) -> Result<ServicesBuilder<crate::services_builder::SetNats<S>>, ServiceError> + where + S::Nats: IsUnset, + { + let hosts = config.hosts(); + let client = async_nats::connect(hosts).await?; + + Ok(self.nats_internal(client)) + } +} diff --git a/lib/warden-stack/src/postgres.rs b/lib/warden-stack/src/postgres.rs new file mode 100644 index 0000000..3264368 --- /dev/null +++ b/lib/warden-stack/src/postgres.rs @@ -0,0 +1,137 @@ +use std::sync::Arc; + +use secrecy::{ExposeSecret, SecretString}; +use serde::Deserialize; +use url::Url; + +use crate::{ + ServicesBuilder, + services_builder::{IsUnset, SetPostgres, State}, +}; + +#[derive(Debug, Deserialize, Clone)] +pub struct PostgresConfig { + #[serde(default = "default_pool_size")] + pool_size: u32, + #[serde(default = "default_port")] + port: u32, + name: Arc<str>, + host: Arc<str>, + #[serde(default = "user")] + user: Arc<str>, + password: SecretString, +} + +fn default_pool_size() -> u32 { + 100 +} + +fn user() -> Arc<str> { + "postgres".into() +} + +fn default_port() -> u32 { + 5432 +} + +impl PostgresConfig { + // Getter for size + pub fn pool_size(&self) -> u32 { + self.pool_size + } + + // Getter for port + pub fn port(&self) -> u32 { + self.port + } + + // Getter for name + pub fn name(&self) -> &str { + self.name.as_ref() + } + + // Getter for host + pub fn host(&self) -> &str { + self.host.as_ref() + } + + // Getter for username + pub fn username(&self) -> &str { + self.user.as_ref() + } + + // Getter for password (you may want to return a reference or handle it differently) + pub fn password(&self) -> &SecretString { + &self.password + } + + pub(crate) fn connection_string(&self) -> Result<Url, crate::ServiceError> { + Url::parse(&format!( + "postgres://{}:{}@{}:{}/{}", + self.user, + self.password.expose_secret(), + self.host, + self.port, + self.name + )) + .map_err(|e| crate::ServiceError::Configuration(e.to_string())) + } +} + +impl<S: State> ServicesBuilder<S> { + pub async fn postgres( + self, + config: &PostgresConfig, + ) -> Result<ServicesBuilder<SetPostgres<S>>, crate::ServiceError> + where + S::Postgres: IsUnset, + { + let pg = sqlx::postgres::PgPoolOptions::new() + // The default connection limit for a Postgres server is 100 connections, with 3 reserved for superusers. + // + // If you're deploying your application with multiple replicas, then the total + // across all replicas should not exceed the Postgres connection limit + // (max_connections postgresql.conf). + .max_connections(config.pool_size) + .connect(config.connection_string()?.as_ref()) + .await?; + Ok(self.pg_internal(pg)) + } +} + +#[cfg(all(test, target_os = "linux"))] +mod test { + use super::*; + use crate::Services; + + #[tokio::test] + async fn docker_stack_db() { + let port = default_port(); + let name = ""; + let host = "localhost"; + let user = user(); + let pool_size = default_pool_size(); + let password = "postgres"; + + let config = PostgresConfig { + pool_size, + port, + name: name.into(), + host: host.into(), + user: user.clone(), + password: secrecy::SecretString::new(password.into()), + }; + + assert_eq!(config.name(), name); + assert_eq!(config.pool_size(), pool_size); + assert_eq!(config.username(), user.as_ref()); + assert_eq!(config.host(), host); + assert_eq!(config.port(), port); + + assert_eq!(config.password().expose_secret(), password); + + let service = Services::builder().postgres(&config).await; + + assert!(service.is_ok()); + } +} diff --git a/lib/warden-stack/src/tracing.rs b/lib/warden-stack/src/tracing.rs new file mode 100644 index 0000000..1a40f4b --- /dev/null +++ b/lib/warden-stack/src/tracing.rs @@ -0,0 +1,66 @@ +#[cfg(feature = "opentelemetry")] +pub mod telemetry; + +#[cfg(feature = "opentelemetry")] +pub use opentelemetry_sdk::trace::SdkTracerProvider; + +#[cfg(feature = "tracing-loki")] +mod loki; + +use tracing_subscriber::{ + EnvFilter, Layer, Registry, layer::SubscriberExt, util::SubscriberInitExt, +}; + +/// Telemetry handle +#[derive(bon::Builder)] +#[builder(finish_fn(vis = "", name = build_internal))] +pub struct Tracing { + #[builder(field = vec![tracing_subscriber::fmt::layer().boxed()])] + layers: Vec<Box<dyn Layer<Registry> + Sync + Send>>, + #[cfg(feature = "tracing-loki")] + #[builder(setters(vis = "", name = loki_internal))] + pub loki_task: tracing_loki::BackgroundTask, + #[cfg(feature = "opentelemetry")] + #[builder(setters(vis = "", name = otel_internal))] + pub otel_provider: opentelemetry_sdk::trace::SdkTracerProvider, +} + +// Define a custom finishing function as a method on the `UserBuilder`. +// The builder's state must implement the `IsComplete` trait. +// See details about it in the tip below this example. +impl<S: tracing_builder::IsComplete> TracingBuilder<S> { + pub fn build(self, config: &crate::Monitoring) -> Tracing { + // Delegate to `build_internal()` to get the instance of user. + let mut tracing = self.build_internal(); + + let layers = std::mem::take(&mut tracing.layers); + tracing_subscriber::registry() + .with(layers) + .with( + EnvFilter::try_from_default_env() + .unwrap_or_else(|_| config.log_level.to_string().into()), + ) + .try_init() + .ok(); + tracing + } +} + +// #[cfg(test)] +// mod tests { +// use super::*; +// +// #[test] +// fn build() { +// let builder = Tracing::builder().build(); +// let level = crate::Monitoring { +// log_level: "info".to_string(), +// #[cfg(feature = "opentelemetry")] +// opentelemetry_endpoint: "http://localhost:4317".into(), +// #[cfg(feature = "tracing-loki")] +// loki_endpoint: "http://localhost:3100".into(), +// }; +// builder.init(&level); +// builder.loki_task +// } +// } diff --git a/lib/warden-stack/src/tracing/loki.rs b/lib/warden-stack/src/tracing/loki.rs new file mode 100644 index 0000000..cbf4e40 --- /dev/null +++ b/lib/warden-stack/src/tracing/loki.rs @@ -0,0 +1,29 @@ +use crate::Monitoring; + +use super::TracingBuilder; +use super::tracing_builder::{IsUnset, SetLokiTask, State}; +use tracing_subscriber::Layer; + +impl<S: State> TracingBuilder<S> { + pub fn loki( + mut self, + config: &crate::AppConfig, + monitoring: &Monitoring, + ) -> Result<TracingBuilder<SetLokiTask<S>>, crate::ServiceError> + where + S::LokiTask: IsUnset, + { + use std::str::FromStr; + let url = FromStr::from_str(&monitoring.loki_endpoint.as_ref()) + .map_err(|_e| crate::ServiceError::Unknown)?; + + let (layer, task) = tracing_loki::builder() + .label("service_name", config.name.as_ref())? + .extra_field("pid", format!("{}", std::process::id()))? + .build_url(url)?; + + self.layers.push(layer.boxed()); + + Ok(self.loki_internal(task)) + } +} diff --git a/lib/warden-stack/src/tracing/telemetry.rs b/lib/warden-stack/src/tracing/telemetry.rs new file mode 100644 index 0000000..b024937 --- /dev/null +++ b/lib/warden-stack/src/tracing/telemetry.rs @@ -0,0 +1,137 @@ +#[cfg(any(feature = "nats-jetstream", feature = "nats-core"))] +pub mod nats { + pub mod extractor { + pub struct HeaderMap<'a>(pub &'a async_nats::HeaderMap); + + impl opentelemetry::propagation::Extractor for HeaderMap<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0 + .get(async_nats::header::IntoHeaderName::into_header_name(key)) + .map(|value| value.as_str()) + } + + fn keys(&self) -> Vec<&str> { + self.0.iter().map(|(n, _v)| n.as_ref()).collect() + } + } + } + + pub mod injector { + pub struct HeaderMap<'a>(pub &'a mut async_nats::HeaderMap); + + impl opentelemetry::propagation::Injector for HeaderMap<'_> { + fn set(&mut self, key: &str, value: String) { + self.0.insert(key, value); + } + } + } +} + +#[cfg(feature = "opentelemetry-tonic")] +pub mod tonic { + pub mod extractor { + pub struct MetadataMap<'a>(pub &'a tonic::metadata::MetadataMap); + impl opentelemetry::propagation::Extractor for MetadataMap<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).and_then(|metadata| metadata.to_str().ok()) + } + + /// Collect all the keys from the MetadataMap. + fn keys(&self) -> Vec<&str> { + self.0 + .keys() + .map(|key| match key { + tonic::metadata::KeyRef::Ascii(v) => v.as_str(), + tonic::metadata::KeyRef::Binary(v) => v.as_str(), + }) + .collect::<Vec<_>>() + } + } + } + + pub mod injector { + pub struct MetadataMap<'a>(pub &'a mut tonic::metadata::MetadataMap); + + impl opentelemetry::propagation::Injector for MetadataMap<'_> { + /// Set a key and value in the MetadataMap. Does nothing if the key or value are not valid inputs + fn set(&mut self, key: &str, value: String) { + if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) { + if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) { + self.0.insert(key, val); + } + } + } + } + } +} + +use crate::Monitoring; + +use super::TracingBuilder; +use super::tracing_builder::{IsUnset, SetOtelProvider, State}; +use tracing_subscriber::Layer; + +impl<S: State> TracingBuilder<S> { + pub fn opentelemetry( + mut self, + config: &crate::AppConfig, + monitoring: &Monitoring, + ) -> Result<TracingBuilder<SetOtelProvider<S>>, crate::ServiceError> + where + S::OtelProvider: IsUnset, + { + use opentelemetry::{ + KeyValue, + global::{self}, + trace::TracerProvider, + }; + use opentelemetry_otlp::WithExportConfig; + use opentelemetry_sdk::{ + Resource, + trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, + }; + use opentelemetry_semantic_conventions::{ + SCHEMA_URL, + resource::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION}, + }; + use tracing_opentelemetry::OpenTelemetryLayer; + + global::set_text_map_propagator( + opentelemetry_sdk::propagation::TraceContextPropagator::new(), + ); + + let resource = Resource::builder() + .with_schema_url( + [ + KeyValue::new(SERVICE_NAME, config.name.to_owned()), + KeyValue::new(SERVICE_VERSION, config.version.to_owned()), + KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, config.env.to_string()), + ], + SCHEMA_URL, + ) + .with_service_name(config.name.to_string()) + .build(); + + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(monitoring.opentelemetry_endpoint.as_ref()) + .build() + .unwrap(); + + let provider = SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .with_resource(resource) + .with_id_generator(RandomIdGenerator::default()) + .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( + 1.0, + )))) + .build(); + + global::set_tracer_provider(provider.clone()); + + let layer = OpenTelemetryLayer::new(provider.tracer(config.name.as_ref().to_string())); + self.layers.push(layer.boxed()); + + Ok(self.otel_internal(provider)) + } +} |