diff options
author | rtkay123 <dev@kanjala.com> | 2025-08-12 05:13:32 +0200 |
---|---|---|
committer | rtkay123 <dev@kanjala.com> | 2025-08-12 05:13:32 +0200 |
commit | c5ea875f544824b0c042bf7c0a58b3134f9c0373 (patch) | |
tree | 4913d4ff2b408c7157e33894e40deec570ecce9e | |
parent | 9c850d6c4d0ed468709c2eb5340d7b64bbb9aa68 (diff) | |
download | warden-c5ea875f544824b0c042bf7c0a58b3134f9c0373.tar.bz2 warden-c5ea875f544824b0c042bf7c0a58b3134f9c0373.zip |
feat(config): get active routing
29 files changed, 673 insertions, 21 deletions
@@ -3379,6 +3379,20 @@ dependencies = [ ] [[package]] +name = "tonic-reflection" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0267a0073385cd94996197d12acb1856a3a0a2367482c726a48a769f6fed8a3a" +dependencies = [ + "prost 0.14.1", + "prost-types 0.14.1", + "tokio", + "tokio-stream", + "tonic 0.14.0", + "tonic-prost", +] + +[[package]] name = "tonic-types" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3815,6 +3829,7 @@ dependencies = [ "utoipa-swagger-ui", "uuid", "warden-core", + "warden-middleware", "warden-stack", ] @@ -3823,18 +3838,33 @@ name = "warden-config" version = "0.1.0" dependencies = [ "anyhow", + "async-nats", + "axum", "clap", "config", "metrics", "metrics-exporter-prometheus", + "opentelemetry-semantic-conventions", + "prost 0.14.1", "serde", "serde_json", "sqlx", "time", "tokio", "tonic 0.14.0", + "tonic-reflection", + "tower", "tracing", + "tracing-opentelemetry", + "utoipa", + "utoipa-axum", + "utoipa-rapidoc", + "utoipa-redoc", + "utoipa-scalar", + "utoipa-swagger-ui", + "uuid", "warden-core", + "warden-middleware", "warden-stack", ] @@ -3854,6 +3884,17 @@ dependencies = [ ] [[package]] +name = "warden-middleware" +version = "0.1.0" +dependencies = [ + "axum", + "metrics", + "metrics-exporter-prometheus", + "tower-http", + "tracing", +] + +[[package]] name = "warden-pseudonyms" version = "0.1.0" dependencies = [ @@ -34,6 +34,7 @@ url = "2.5.4" time = "0.3.41" tokio = "1.47.1" tonic = "0.14.0" +tonic-reflection = "0.14.1" tower = "0.5.2" tower-http = "0.6.6" tracing = "0.1.41" @@ -45,6 +46,7 @@ utoipa-scalar = "0.3.0" utoipa-swagger-ui = "9.0.2" uuid = "1.17.0" warden-core = { path = "lib/warden-core" } +warden-middleware = { path = "lib/warden-middleware" } warden-stack = { path = "lib/warden-stack" } [profile.release] diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index 86f6923..f9b722f 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -9,10 +9,14 @@ description.workspace = true [dependencies] anyhow.workspace = true +async-nats.workspace = true +axum.workspace = true clap = { workspace = true, features = ["derive"] } config = { workspace = true, features = ["convert-case", "toml"] } metrics.workspace = true metrics-exporter-prometheus.workspace = true +opentelemetry-semantic-conventions.workspace = true +prost.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true sqlx = { workspace = true, features = [ @@ -26,9 +30,27 @@ sqlx = { workspace = true, features = [ time.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tonic.workspace = true +tonic-reflection.workspace = true +tower = { workspace = true, features = ["steer"] } tracing.workspace = true +tracing-opentelemetry.workspace = true +utoipa = { workspace = true, features = ["axum_extras"] } +utoipa-axum.workspace = true +utoipa-rapidoc = { workspace = true, optional = true } +utoipa-redoc = { workspace = true, optional = true } +utoipa-scalar = { workspace = true, optional = true } +utoipa-swagger-ui = { workspace = true, optional = true } +uuid = { workspace = true, features = ["serde"] } warden-core = { workspace = true, features = ["configuration", "serde-time"] } +warden-middleware.workspace = true + +[features] +default = [] +swagger = ["dep:utoipa-swagger-ui", "utoipa-swagger-ui/axum"] +redoc = ["dep:utoipa-redoc", "utoipa-redoc/axum"] +rapidoc = ["dep:utoipa-rapidoc", "utoipa-rapidoc/axum"] +scalar = ["dep:utoipa-scalar", "utoipa-scalar/axum"] [dependencies.warden-stack] workspace = true -features = ["api", "cache", "postgres", "opentelemetry-tonic", "tracing-loki"] +features = ["api", "cache", "nats-jetstream", "postgres", "opentelemetry-tonic", "tracing-loki"] diff --git a/crates/configuration/migrations/20250812031141_routing.sql b/crates/configuration/migrations/20250812031141_routing.sql new file mode 100644 index 0000000..f9bacbc --- /dev/null +++ b/crates/configuration/migrations/20250812031141_routing.sql @@ -0,0 +1,6 @@ +create table routing ( + id uuid primary key, + configuration jsonb not null +); + +create index idx_active_routing on routing using gin ((configuration->'active')); diff --git a/crates/configuration/src/cnfg.rs b/crates/configuration/src/cnfg.rs new file mode 100644 index 0000000..a68b740 --- /dev/null +++ b/crates/configuration/src/cnfg.rs @@ -0,0 +1,16 @@ +use std::sync::Arc; + +use serde::Deserialize; + +#[derive(Deserialize, Clone)] +pub struct LocalConfig { + pub nats: JetstreamConfig, +} + +#[derive(Deserialize, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct JetstreamConfig { + pub stream: Arc<str>, + pub max_messages: i64, + pub subject: Arc<str>, +} diff --git a/crates/configuration/src/main.rs b/crates/configuration/src/main.rs index e7a11a9..7eb5e3b 100644 --- a/crates/configuration/src/main.rs +++ b/crates/configuration/src/main.rs @@ -1,3 +1,76 @@ -fn main() { - println!("Hello, world!"); +mod cnfg; +mod server; +mod state; + +use clap::Parser; +use tracing::error; +use warden_config::state::AppState; +use warden_stack::{Configuration, Services, tracing::Tracing}; + +/// warden-config +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to config file + #[arg(short, long)] + config_file: Option<std::path::PathBuf>, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + let config = include_str!("../warden-config.toml"); + + let mut config = config::Config::builder() + .add_source(config::File::from_str(config, config::FileFormat::Toml)); + + if let Some(cf) = args.config_file.as_ref().and_then(|v| v.to_str()) { + config = config.add_source(config::File::new(cf, config::FileFormat::Toml)); + }; + + let mut config: Configuration = config.build()?.try_deserialize()?; + config.application.name = env!("CARGO_CRATE_NAME").into(); + config.application.version = env!("CARGO_PKG_VERSION").into(); + + let tracing = Tracing::builder() + .opentelemetry(&config.application, &config.monitoring)? + .loki(&config.application, &config.monitoring)? + .build(&config.monitoring); + + let provider = tracing.otel_provider; + + tokio::spawn(tracing.loki_task); + + let services = Services::builder() + .postgres(&config.database) + .await + .inspect_err(|e| error!("database: {e}"))? + .nats_jetstream(&config.nats) + .await + .inspect_err(|e| error!("nats: {e}"))? + .cache(&config.cache) + .await + .inspect_err(|e| error!("cache: {e}"))? + .build(); + + let postgres = services + .postgres + .take() + .ok_or_else(|| anyhow::anyhow!("database is not ready"))?; + + let cache = services + .cache + .take() + .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + + let jetstream = services + .jetstream + .take() + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; + + let state = AppState::new(services, config, Some(provider)) + .await + .inspect_err(|e| error!("{e}"))?; + + server::serve(state); } diff --git a/crates/configuration/src/server.rs b/crates/configuration/src/server.rs new file mode 100644 index 0000000..1131144 --- /dev/null +++ b/crates/configuration/src/server.rs @@ -0,0 +1,57 @@ +mod error; +mod http_svc; +mod interceptor; +mod version; + +use axum::http::header::CONTENT_TYPE; +use http_svc::build_router; +use interceptor::MyInterceptor; +use tonic::service::Routes; +use tower::{make::Shared, steer::Steer}; +use warden_core::{ + FILE_DESCRIPTOR_SET, + configuration::routing::{ + mutate_routing_server::MutateRoutingServer, query_routing_server::QueryRoutingServer, + }, +}; + +use crate::state::AppHandle; + +pub async fn serve<S>(state: AppHandle) -> anyhow::Result<Shared<S>> { + let app = build_router(state.clone()); + + let service = QueryRoutingServer::with_interceptor(state.clone(), MyInterceptor); + + let routing_reflector = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build_v1()?; + + let grpc_server = Routes::new(service) + .add_service(MutateRoutingServer::with_interceptor( + state.clone(), + MyInterceptor, + )) + .add_service(routing_reflector) + .into_axum_router(); + + let service = Steer::new( + vec![app, grpc_server], + |req: &axum::extract::Request, _services: &[_]| { + if req + .headers() + .get(CONTENT_TYPE) + .map(|content_type| content_type.as_bytes()) + .filter(|content_type| content_type.starts_with(b"application/grpc")) + .is_some() + { + // grpc service + 1 + } else { + // http service + 0 + } + }, + ); + + Ok(Shared::new(service)) +} diff --git a/crates/configuration/src/server/error.rs b/crates/configuration/src/server/error.rs new file mode 100644 index 0000000..730f99a --- /dev/null +++ b/crates/configuration/src/server/error.rs @@ -0,0 +1,26 @@ +use axum::{ + http::StatusCode, + response::{IntoResponse, Response}, +}; + +#[derive(Debug)] +pub struct AppError(anyhow::Error); + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Something went wrong: {}", self.0), + ) + .into_response() + } +} + +impl<E> From<E> for AppError +where + E: Into<anyhow::Error>, +{ + fn from(err: E) -> Self { + Self(err.into()) + } +} diff --git a/crates/configuration/src/server/http_svc.rs b/crates/configuration/src/server/http_svc.rs new file mode 100644 index 0000000..7b2a258 --- /dev/null +++ b/crates/configuration/src/server/http_svc.rs @@ -0,0 +1,63 @@ +mod routes; + +use axum::{Router, response::IntoResponse}; +use utoipa::OpenApi; +use utoipa_axum::router::OpenApiRouter; +#[cfg(feature = "redoc")] +use utoipa_redoc::Servable; +#[cfg(feature = "scalar")] +use utoipa_scalar::Servable as _; + +use crate::{server::http_svc, state::AppHandle}; + +const TAG_ROUTING: &str = "Routing"; + +#[derive(OpenApi)] +#[openapi( + tags( + (name = TAG_ROUTING, description = "Operations related to routing configuration"), + ) +)] +pub struct ApiDoc; + +/// Get health of the API. +#[utoipa::path( + method(get), + path = "/", + responses( + (status = OK, description = "Success", body = str, content_type = "text/plain") + ) +)] +pub async fn health_check() -> impl IntoResponse { + let name = env!("CARGO_PKG_NAME"); + let ver = env!("CARGO_PKG_VERSION"); + + format!("{name} v{ver} is live") +} + +pub fn build_router(state: AppHandle) -> Router { + let (router, _api) = OpenApiRouter::with_openapi(ApiDoc::openapi()) + .routes(utoipa_axum::routes!(health_check)) + .nest("/api", routes::router(state)) + .split_for_parts(); + + #[cfg(feature = "swagger")] + let router = router.merge( + utoipa_swagger_ui::SwaggerUi::new("/swagger-ui") + .url("/api-docs/swaggerdoc.json", _api.clone()), + ); + + #[cfg(feature = "redoc")] + let router = router.merge(utoipa_redoc::Redoc::with_url("/redoc", _api.clone())); + + #[cfg(feature = "rapidoc")] + let router = router.merge( + utoipa_rapidoc::RapiDoc::with_openapi("/api-docs/rapidoc.json", _api.clone()) + .path("/rapidoc"), + ); + + #[cfg(feature = "scalar")] + let router = router.merge(utoipa_scalar::Scalar::with_url("/scalar", _api)); + + warden_middleware::apply(router) +} diff --git a/crates/configuration/src/server/http_svc/routes.rs b/crates/configuration/src/server/http_svc/routes.rs new file mode 100644 index 0000000..cc065e8 --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes.rs @@ -0,0 +1,11 @@ +mod routing; + +use utoipa_axum::{router::OpenApiRouter, routes}; + +use crate::state::AppHandle; + +pub fn router(store: AppHandle) -> OpenApiRouter { + OpenApiRouter::new() + .routes(routes!(routing::get_active)) + .with_state(store) +} diff --git a/crates/configuration/src/server/http_svc/routes/routing.rs b/crates/configuration/src/server/http_svc/routes/routing.rs new file mode 100644 index 0000000..7c8affe --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes/routing.rs @@ -0,0 +1,4 @@ +//pub mod delete_routing; +pub mod get_active; +//pub mod post_routing; +//pub mod replace_routing; diff --git a/crates/configuration/src/server/http_svc/routes/routing/get_active.rs b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs new file mode 100644 index 0000000..562a1f1 --- /dev/null +++ b/crates/configuration/src/server/http_svc/routes/routing/get_active.rs @@ -0,0 +1,30 @@ +use axum::{extract::State, response::IntoResponse}; +use warden_core::configuration::routing::RoutingConfiguration; + +use crate::{ + server::{error::AppError, http_svc::TAG_ROUTING, version::Version}, + state::AppHandle, +}; + +#[utoipa::path( + get, + responses(( + status = OK, + body = RoutingConfiguration + )), + operation_id = "get_active_routing", // https://github.com/juhaku/utoipa/issues/1170 + path = "/{version}/routing", + params( + ("version" = Version, Path, description = "API version, e.g., v1, v2, v3") + ), + tag = TAG_ROUTING, + ) +] +#[axum::debug_handler] +#[tracing::instrument(skip(state), err(Debug), fields(method = "GET"))] +pub(super) async fn active_routing( + version: Version, + State(state): State<AppHandle>, +) -> Result<impl IntoResponse, AppError> { + Ok(String::default().into_response()) +} diff --git a/crates/configuration/src/server/interceptor.rs b/crates/configuration/src/server/interceptor.rs new file mode 100644 index 0000000..eeb36c2 --- /dev/null +++ b/crates/configuration/src/server/interceptor.rs @@ -0,0 +1,23 @@ +use tonic::{Status, service::Interceptor}; +use tracing::Span; +use warden_stack::{ + opentelemetry::global, tracing::telemetry::tonic::extractor, + tracing_opentelemetry::OpenTelemetrySpanExt, +}; + +#[derive(Clone, Copy)] +pub struct MyInterceptor; + +impl Interceptor for MyInterceptor { + fn call(&mut self, request: tonic::Request<()>) -> Result<tonic::Request<()>, Status> { + let span = Span::current(); + + let cx = global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor::MetadataMap(request.metadata())) + }); + + span.set_parent(cx); + + Ok(request) + } +} diff --git a/crates/configuration/src/server/version.rs b/crates/configuration/src/server/version.rs new file mode 100644 index 0000000..4eb5677 --- /dev/null +++ b/crates/configuration/src/server/version.rs @@ -0,0 +1,35 @@ +use std::collections::HashMap; + +use axum::{ + RequestPartsExt, + extract::{FromRequestParts, Path}, + http::{StatusCode, request::Parts}, + response::{IntoResponse, Response}, +}; +use utoipa::ToSchema; + +#[derive(Debug, ToSchema)] +pub enum Version { + V0, +} + +impl<S> FromRequestParts<S> for Version +where + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> { + let params: Path<HashMap<String, String>> = + parts.extract().await.map_err(IntoResponse::into_response)?; + + let version = params + .get("version") + .ok_or_else(|| (StatusCode::NOT_FOUND, "version param missing").into_response())?; + + match version.as_str() { + "v0" => Ok(Version::V0), + _ => Err((StatusCode::NOT_FOUND, "unknown version").into_response()), + } + } +} diff --git a/crates/configuration/src/state.rs b/crates/configuration/src/state.rs new file mode 100644 index 0000000..7672891 --- /dev/null +++ b/crates/configuration/src/state.rs @@ -0,0 +1,51 @@ +mod cache_key; +mod routing; + +use async_nats::jetstream::Context; +use sqlx::PgPool; +use std::{ops::Deref, sync::Arc}; +use tonic::transport::Endpoint; +use tracing::error; +use warden_stack::{Configuration, cache::RedisManager}; + +use crate::{ + cnfg::LocalConfig, + server::grpc::interceptor::{Intercepted, MyInterceptor}, +}; + +#[derive(Clone)] +pub struct AppHandle(Arc<AppState>); + +impl Deref for AppHandle { + type Target = Arc<AppState>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[derive(Clone)] +pub struct Services { + pub postgres: PgPool, + pub cache: RedisManager, + pub jetstream: Context, +} + +pub struct AppState { + pub services: Services, + pub app_config: LocalConfig, +} + +impl AppState { + pub async fn create( + services: Services, + configuration: &Configuration, + ) -> Result<AppHandle, AppError> { + let local_config: LocalConfig = serde_json::from_value(configuration.misc.clone())?; + + Ok(AppHandle(Arc::new(Self { + services, + app_config: local_config, + }))) + } +} diff --git a/crates/configuration/src/state/cache_key.rs b/crates/configuration/src/state/cache_key.rs new file mode 100644 index 0000000..a63b15d --- /dev/null +++ b/crates/configuration/src/state/cache_key.rs @@ -0,0 +1,21 @@ +use warden_stack::redis::ToRedisArgs; + +#[derive(Clone, Copy, Debug)] +pub enum CacheKey<'a> { + ActiveRouting, + Routing(&'a uuid::Uuid), +} + +impl ToRedisArgs for CacheKey<'_> { + fn write_redis_args<W>(&self, out: &mut W) + where + W: ?Sized + warden_stack::redis::RedisWrite, + { + let value = match self { + CacheKey::ActiveRouting => "routing.active".into(), + CacheKey::Routing(uuid) => format!("routing.{uuid}"), + }; + + out.write_arg(value.as_bytes()); + } +} diff --git a/crates/configuration/src/state/routing.rs b/crates/configuration/src/state/routing.rs new file mode 100644 index 0000000..ea51c17 --- /dev/null +++ b/crates/configuration/src/state/routing.rs @@ -0,0 +1 @@ +mod query; diff --git a/crates/configuration/src/state/routing/query.rs b/crates/configuration/src/state/routing/query.rs new file mode 100644 index 0000000..3c6814d --- /dev/null +++ b/crates/configuration/src/state/routing/query.rs @@ -0,0 +1,106 @@ +use opentelemetry_semantic_conventions::attribute; +use prost::Message; +use tonic::{Request, Status}; +use tracing::{Instrument, debug, info_span, instrument, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use warden_stack::redis::AsyncCommands; + +use uuid::Uuid; +use warden_core::{ + configuration::routing::{ + GetActiveRoutingResponse, RoutingConfiguration, query_routing_server::QueryRouting, + }, + google, +}; + +use crate::state::{AppHandle, cache_key::CacheKey}; + +pub struct RoutingRow { + id: Uuid, + configuration: sqlx::types::Json<RoutingConfiguration>, +} + +#[tonic::async_trait] +impl QueryRouting for AppHandle { + #[instrument(skip(self, _request), Err(Debug))] + async fn get_active_routing_configuration( + &self, + _request: Request<google::protobuf::Empty>, + ) -> Result<tonic::Response<GetActiveRoutingResponse>, Status> { + let mut cache = self + .services + .cache + .get() + .await + .map_err(|e| tonic::Status::internal(e.to_string()))?; + + let span = info_span!("cache.get"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "get"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + let routing_config = cache + .get::<_, Vec<u8>>(CacheKey::ActiveRouting) + .instrument(span) + .await + .map(|value| { + if !value.is_empty() { + RoutingConfiguration::decode(value.as_ref()).ok() + } else { + None + } + }); + + if let Ok(Some(routing_config)) = routing_config { + if routing_config.active { + return Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(routing_config), + })); + } + } + + let span = info_span!("db.get.routing.active"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "postgres"); + span.set_attribute(attribute::DB_OPERATION_NAME, "select"); + span.set_attribute(attribute::DB_COLLECTION_NAME, "routing"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "active"); + + let config = sqlx::query_as!( + RoutingRow, + r#"select id, configuration as "configuration: sqlx::types::Json<RoutingConfiguration>" from routing where + configuration->>'active' = 'true'"#, + ) + .fetch_optional(&self.services.postgres) + .instrument(span) + .await.map_err(|e| tonic::Status::internal(e.to_string()))?; + + let config = config.map(|transaction| { + debug!(id = ?transaction.id, "found active config"); + transaction.configuration.0 + }); + + match config { + Some(config) => { + let bytes = config.encode_to_vec(); + let span = info_span!("cache.set"); + span.set_attribute(attribute::DB_SYSTEM_NAME, "valkey"); + span.set_attribute(attribute::DB_OPERATION_NAME, "set"); + span.set_attribute(attribute::DB_OPERATION_PARAMETER, "routing.active"); + + if let Err(e) = cache + .set::<_, _, ()>(CacheKey::ActiveRouting, bytes) + .instrument(span) + .await + { + warn!("{e}"); + }; + + Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: Some(config), + })) + } + None => Ok(tonic::Response::new(GetActiveRoutingResponse { + configuration: None, + })), + } + } +} diff --git a/crates/configuration/warden-config.toml b/crates/configuration/warden-config.toml new file mode 100644 index 0000000..1f613f6 --- /dev/null +++ b/crates/configuration/warden-config.toml @@ -0,0 +1,40 @@ +[application] +env = "development" +port = 1304 + +[monitoring] +log-level = "warden_config=trace,info" +opentelemetry-endpoint = "http://localhost:4317" +loki-endpoint = "http://localhost:3100" + +[misc.nats] +stream = "configuration" +max-messages = 10000 +subject = "configuration.reload" + +[database] +pool_size = 100 +port = 5432 +name = "configuration" +host = "localhost" +password = "password" +user = "postgres" + +[nats] +hosts = ["nats://localhost:4222"] + +[cache] +dsn = "redis://localhost:6379" +pooled = true +type = "non-clustered" # clustered, non-clustered or sentinel +max-connections = 100 + +[cache.sentinel] +master-name = "mymaster" +nodes = [ + { host = "127.0.0.1", port = 26379 }, + { host = "127.0.0.2", port = 26379 }, + { host = "127.0.0.3", port = 26379 }, +] + +# vim:ft=toml diff --git a/crates/pseudonyms/tests/helpers.rs b/crates/pseudonyms/tests/helpers.rs index 5e2545b..d39686b 100644 --- a/crates/pseudonyms/tests/helpers.rs +++ b/crates/pseudonyms/tests/helpers.rs @@ -24,7 +24,6 @@ impl TestApp { .build() .unwrap(); - let mut config = config.try_deserialize::<Configuration>().unwrap(); config.application.port = 0; diff --git a/crates/warden/Cargo.toml b/crates/warden/Cargo.toml index 033937b..06408c9 100644 --- a/crates/warden/Cargo.toml +++ b/crates/warden/Cargo.toml @@ -48,6 +48,7 @@ utoipa-scalar = { workspace = true, optional = true } utoipa-swagger-ui = { workspace = true, optional = true } uuid = { workspace = true, features = ["v7", "serde"] } warden-core = { workspace = true, features = ["message", "pseudonyms", "serde", "openapi"] } +warden-middleware.workspace = true [features] default = [] diff --git a/crates/warden/src/main.rs b/crates/warden/src/main.rs index 7734ecc..9e33700 100644 --- a/crates/warden/src/main.rs +++ b/crates/warden/src/main.rs @@ -69,7 +69,7 @@ async fn main() -> Result<(), error::AppError> { let jetstream = services .jetstream .take() - .ok_or_else(|| anyhow::anyhow!("cache is not ready"))?; + .ok_or_else(|| anyhow::anyhow!("jetstream is not ready"))?; let services = state::Services { postgres, diff --git a/crates/warden/src/server.rs b/crates/warden/src/server.rs index 72e79d9..2db9f0f 100644 --- a/crates/warden/src/server.rs +++ b/crates/warden/src/server.rs @@ -1,5 +1,4 @@ pub mod grpc; -mod middleware; mod publish; mod routes; pub use routes::metrics::metrics_app; @@ -39,7 +38,7 @@ pub fn router(state: AppHandle) -> Router { #[cfg(feature = "scalar")] let router = router.merge(utoipa_scalar::Scalar::with_url("/scalar", _api)); - middleware::apply(router) + warden_middleware::apply(router) } /// Get health of the API. diff --git a/crates/warden/src/server/routes/processor/pacs008.rs b/crates/warden/src/server/routes/processor/pacs008.rs index fded5f0..b7f495e 100644 --- a/crates/warden/src/server/routes/processor/pacs008.rs +++ b/crates/warden/src/server/routes/processor/pacs008.rs @@ -570,7 +570,8 @@ mod tests { }); let body = serde_json::to_vec(&v).unwrap(); - let response = app.clone() + let response = app + .clone() .oneshot( Request::builder() .method("POST") @@ -592,7 +593,13 @@ mod tests { id.replace("-", "") } - async fn post_clearance(app: Router, end_to_end_id: &str, ccy: &str, debtor_fsp: &str, creditor_fsp: &str) { + async fn post_clearance( + app: Router, + end_to_end_id: &str, + ccy: &str, + debtor_fsp: &str, + creditor_fsp: &str, + ) { let msg_id = generate_id(); let cre_dt_tm = OffsetDateTime::now_utc().format(&Rfc3339).unwrap(); diff --git a/lib/warden-core/build.rs b/lib/warden-core/build.rs index 9088fd7..5f1e898 100644 --- a/lib/warden-core/build.rs +++ b/lib/warden-core/build.rs @@ -18,12 +18,9 @@ impl Entity { vec!["proto/warden_message.proto"] } - #[cfg(feature = "configuration")] fn configuration_protos() -> Vec<&'static str> { - vec![ - "proto/configuration/routing.proto", - ] + vec!["proto/configuration/routing.proto"] } #[cfg(feature = "pseudonyms")] @@ -72,7 +69,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> { #[cfg(any(feature = "message", feature = "pseudonyms", feature = "configuration"))] build_proto(&protos)?; - Ok(()) } @@ -100,7 +96,10 @@ fn build_proto(protos: &[&str]) -> Result<(), Box<dyn std::error::Error>> { Ok(()) } -#[cfg(all(feature = "serde", any(feature = "pseudonyms", feature = "message", feature = "configuration")))] +#[cfg(all( + feature = "serde", + any(feature = "pseudonyms", feature = "message", feature = "configuration") +))] fn add_serde(config: tonic_prost_build::Builder) -> tonic_prost_build::Builder { let config = config.type_attribute( ".", @@ -116,7 +115,10 @@ fn add_serde(config: tonic_prost_build::Builder) -> tonic_prost_build::Builder { config } -#[cfg(all(feature = "openapi", any(feature = "message", feature = "pseudonyms", feature = "configuration")))] +#[cfg(all( + feature = "openapi", + any(feature = "message", feature = "pseudonyms", feature = "configuration") +))] fn add_openapi(config: tonic_prost_build::Builder) -> tonic_prost_build::Builder { config.type_attribute(".", "#[derive(utoipa::ToSchema)]") } diff --git a/lib/warden-middleware/Cargo.toml b/lib/warden-middleware/Cargo.toml new file mode 100644 index 0000000..b23e61e --- /dev/null +++ b/lib/warden-middleware/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "warden-middleware" +version = "0.1.0" +edition = "2024" +license.workspace = true +homepage.workspace = true +documentation.workspace = true +description.workspace = true +publish = false + +[dependencies] +axum.workspace = true +metrics.workspace = true +metrics-exporter-prometheus.workspace = true +tower-http = { workspace = true, features = [ + "request-id", +] } +tracing.workspace = true diff --git a/crates/warden/src/server/middleware.rs b/lib/warden-middleware/src/lib.rs index 2118fcf..6e3a0f4 100644 --- a/crates/warden/src/server/middleware.rs +++ b/lib/warden-middleware/src/lib.rs @@ -1,15 +1,13 @@ mod metrics; mod trace_layer; -pub use metrics::*; -pub use trace_layer::*; +use metrics::*; +use trace_layer::*; use axum::{Router, http::HeaderName, middleware}; use tower_http::request_id::{MakeRequestUuid, PropagateRequestIdLayer, SetRequestIdLayer}; -use crate::server::middleware::apply_metrics_middleware; - -pub const REQUEST_ID_HEADER: &str = "x-request-id"; +const REQUEST_ID_HEADER: &str = "x-request-id"; pub fn apply<T: Clone + Send + Sync + 'static>(router: Router<T>) -> Router<T> { let x_request_id = HeaderName::from_static(REQUEST_ID_HEADER); diff --git a/crates/warden/src/server/middleware/metrics.rs b/lib/warden-middleware/src/metrics.rs index 8644160..8644160 100644 --- a/crates/warden/src/server/middleware/metrics.rs +++ b/lib/warden-middleware/src/metrics.rs diff --git a/crates/warden/src/server/middleware/trace_layer.rs b/lib/warden-middleware/src/trace_layer.rs index 5173e8d..5173e8d 100644 --- a/crates/warden/src/server/middleware/trace_layer.rs +++ b/lib/warden-middleware/src/trace_layer.rs |